Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
let it work in RL (more work needed for SG)
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-5 Martin Quinson. All rights reserved.                */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "amok/Bandwidth/bandwidth_private.h"
11 #include "gras/messages.h"
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
14
15 static short _amok_bw_initialized = 0;
16
17 /**** code ****/
18 void amok_bw_init(void) {
19   gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
20
21   amok_base_init();
22
23   if (! _amok_bw_initialized) {
24         
25      /* Build the datatype descriptions */ 
26      bw_request_desc = gras_datadesc_struct("s_bw_request_t");
27      gras_datadesc_struct_append(bw_request_desc,"host",
28                                  gras_datadesc_by_name("xbt_host_t"));
29      gras_datadesc_struct_append(bw_request_desc,"buf_size",
30                                  gras_datadesc_by_name("unsigned long int"));
31      gras_datadesc_struct_append(bw_request_desc,"exp_size",
32                                  gras_datadesc_by_name("unsigned long int"));
33      gras_datadesc_struct_append(bw_request_desc,"msg_size",
34                                  gras_datadesc_by_name("unsigned long int"));
35      gras_datadesc_struct_close(bw_request_desc);
36      bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
37
38      bw_res_desc = gras_datadesc_struct("s_bw_res_t");
39      gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("s_amok_remoterr_t"));
40      gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
41      gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
42      gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
43      gras_datadesc_struct_close(bw_res_desc);
44      bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
45
46      sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
47      gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
48      gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
49      gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
50      gras_datadesc_struct_close(sat_request_desc);
51      sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
52      
53      /* Register the bandwidth messages */
54      gras_msgtype_declare("BW request",       bw_request_desc);
55      gras_msgtype_declare("BW result",        bw_res_desc);
56      gras_msgtype_declare("BW handshake",     bw_request_desc);
57      gras_msgtype_declare("BW handshake ACK", bw_request_desc);
58      
59      /* Register the saturation messages */
60      gras_msgtype_declare("SAT start",   sat_request_desc);
61      gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
62      gras_msgtype_declare("SAT begin",   sat_request_desc);
63      gras_msgtype_declare("SAT begun",   gras_datadesc_by_name("amok_remoterr_t"));
64      gras_msgtype_declare("SAT end",     NULL);
65      gras_msgtype_declare("SAT ended",   gras_datadesc_by_name("amok_remoterr_t"));
66      gras_msgtype_declare("SAT stop",    NULL);
67      gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
68   }
69    
70   /* Register the callbacks */
71   gras_cb_register(gras_msgtype_by_name("BW request"),
72                    &amok_bw_cb_bw_request);
73   gras_cb_register(gras_msgtype_by_name("BW handshake"),
74                    &amok_bw_cb_bw_handshake);
75
76   gras_cb_register(gras_msgtype_by_name("SAT start"),
77                    &amok_bw_cb_sat_start);
78   gras_cb_register(gras_msgtype_by_name("SAT begin"),
79                    &amok_bw_cb_sat_begin);
80   
81   _amok_bw_initialized =1;
82 }
83
84 void amok_bw_exit(void) {
85   if (! _amok_bw_initialized)
86     return;
87    
88   gras_cb_unregister(gras_msgtype_by_name("BW request"),
89                      &amok_bw_cb_bw_request);
90   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
91                      &amok_bw_cb_bw_handshake);
92
93   gras_cb_unregister(gras_msgtype_by_name("SAT start"),
94                      &amok_bw_cb_sat_start);
95   gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
96                      &amok_bw_cb_sat_begin);
97
98   _amok_bw_initialized = 0;
99 }
100
101    
102
103 /* ***************************************************************************
104  * Bandwidth tests
105  * ***************************************************************************/
106
107 /**
108  * \brief bandwidth measurement between localhost and @peer
109  * 
110  * Results are reported in last args, and sizes are in kb.
111  */
112 xbt_error_t amok_bw_test(gras_socket_t peer,
113                          unsigned long int buf_size,
114                          unsigned long int exp_size,
115                          unsigned long int msg_size,
116                          /*OUT*/ double *sec, double *bw) {
117
118   /* Measurement sockets for the experiments */
119   gras_socket_t measMasterIn,measIn,measOut;
120   gras_socket_t sock_dummy; /* ignored arg to msg_wait */
121   int port;
122   xbt_error_t errcode;
123   bw_request_t request,request_ack;
124   
125   for (port = 5000, errcode = system_error;
126        errcode == system_error && port < 10000;
127        errcode = gras_socket_server_ext(++port,buf_size,1,&measMasterIn));
128   if (errcode != no_error) {
129     ERROR1("Error %s encountered while opening a measurement socket",
130            xbt_error_name(errcode));
131     return errcode;
132   }
133         
134   request=xbt_new0(s_bw_request_t,1);
135   request->buf_size=buf_size*1024;
136   request->exp_size=exp_size*1024;
137   request->msg_size=msg_size*1024;
138   request->host.name = NULL;
139   request->host.port = gras_socket_my_port(measMasterIn);
140   VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld kb= %ld b)", 
141         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
142         buf_size,request->buf_size);
143
144   if ((errcode=gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request))) {
145     ERROR1("Error %s encountered while sending the BW request.", xbt_error_name(errcode));
146     return errcode;
147   }
148   TRY(gras_socket_meas_accept(measMasterIn,&measIn));
149
150   if ((errcode=gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),
151                              &sock_dummy,&request_ack))) {
152     ERROR1("Error %s encountered while waiting for the answer to BW request.\n",
153             xbt_error_name(errcode));
154     return errcode;
155   }
156    
157   /* FIXME: What if there is a remote error? */
158    
159   if((errcode=gras_socket_client_ext(gras_socket_peer_name(peer),
160                                      request_ack->host.port, 
161                                      request->buf_size,1,&measOut))) {
162     ERROR3("Error %s encountered while opening the measurement socket to %s:%d for BW test\n",
163             xbt_error_name(errcode),gras_socket_peer_name(peer),request_ack->host.port);
164     return errcode;
165   }
166   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
167
168   *sec=gras_os_time();
169   TRY(gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size));
170   TRY(gras_socket_meas_recv(measIn,120,1,1));
171
172   /*catch
173     ERROR1("Error %s encountered while sending the BW experiment.",
174             xbt_error_name(errcode));
175     gras_socket_close(measOut);
176     gras_socket_close(measMasterIn);
177     gras_socket_close(measIn);
178   */
179
180   *sec = gras_os_time() - *sec;
181   *bw = ((double)exp_size) / *sec;
182
183   free(request_ack);
184   free(request);
185   if (measIn != measMasterIn)
186     gras_socket_close(measIn);
187   gras_socket_close(measMasterIn);
188   gras_socket_close(measOut);
189   return errcode;
190 }
191
192
193 /* Callback to the "BW handshake" message: 
194    opens a server measurement socket,
195    indicate its port in an "BW handshaked" message,
196    receive the corresponding data on the measurement socket, 
197    close the measurment socket
198
199    sizes are in byte (got converted from kb my expeditor)
200 */
201 int amok_bw_cb_bw_handshake(gras_socket_t  expeditor,
202                             void          *payload) {
203   gras_socket_t measMasterIn,measIn,measOut;
204   bw_request_t request=*(bw_request_t*)payload;
205   bw_request_t answer;
206   xbt_error_t errcode;
207   int port;
208   
209   VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
210         gras_socket_peer_name(expeditor),request->host.port,
211         request->buf_size,request->exp_size,request->msg_size);     
212
213   /* Build our answer */
214   answer = xbt_new0(s_bw_request_t,1);
215   
216   for (port = 6000, errcode = system_error;
217        errcode == system_error;
218        errcode = gras_socket_server_ext(++port,request->buf_size,1,&measMasterIn));
219   if (errcode != no_error) {
220     ERROR1("Error %s encountered while opening a measurement server socket", xbt_error_name(errcode));
221     /* FIXME: tell error to remote */
222     return 1;
223   }
224    
225   answer->buf_size=request->buf_size;
226   answer->exp_size=request->exp_size;
227   answer->msg_size=request->msg_size;
228   answer->host.port=gras_socket_my_port(measMasterIn);
229
230   /* Don't connect asap to leave time to other side to enter the accept() */
231   if ((errcode=gras_socket_client_ext(gras_socket_peer_name(expeditor),
232                                       request->host.port,
233                                       request->buf_size,1,&measOut))) { 
234     ERROR3("Error '%s' encountered while opening a measurement socket back to %s:%d", 
235            xbt_error_name(errcode),gras_socket_peer_name(expeditor),request->host.port);
236     /* FIXME: tell error to remote */
237     return 1;
238   }
239
240
241   if ((errcode=gras_msg_send(expeditor,
242                              gras_msgtype_by_name("BW handshake ACK"),
243                              &answer))) {
244     ERROR1("Error %s encountered while sending the answer.",
245             xbt_error_name(errcode));
246     gras_socket_close(measMasterIn);
247     gras_socket_close(measOut);
248     /* FIXME: tell error to remote */
249     return 1;
250   }
251   TRY(gras_socket_meas_accept(measMasterIn,&measIn));
252   DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
253         answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
254
255   TRY(gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size));
256   TRY(gras_socket_meas_send(measOut,120,1,1));
257
258   /*catch
259     ERROR1("Error %s encountered while receiving the experiment.",
260             xbt_error_name(errcode));
261     gras_socket_close(measMasterIn);
262     gras_socket_close(measIn);
263     gras_socket_close(measOut);
264     * FIXME: tell error to remote ? *
265     return 1;
266     }*/
267
268   if (measIn != measMasterIn)
269     gras_socket_close(measMasterIn);
270   gras_socket_close(measIn);
271   gras_socket_close(measOut);
272   free(answer);
273   free(request);
274   DEBUG0("BW experiment done.");
275   return 1;
276 }
277
278 int amok_bw_cb_bw_request(gras_socket_t    expeditor,
279                           void            *payload) {
280    CRITICAL0("Not implemented");
281    return 1;
282 }
283
284 int amok_bw_cb_sat_start(gras_socket_t     expeditor,
285                          void             *payload) {
286    CRITICAL0("Not implemented");
287    return 1;
288
289 int amok_bw_cb_sat_begin(gras_socket_t     expeditor,
290                          void             *payload) {
291    CRITICAL0("Not implemented");
292    return 1;
293 }
294
295 #if 0
296 /* function to request a BW test between two external hosts */
297 xbt_error_t grasbw_request(const char* from_name,unsigned int from_port,
298                            const char* to_name,unsigned int to_port,
299                            unsigned int bufSize,unsigned int expSize,unsigned int msgSize,
300                            /*OUT*/ double *sec, double*bw) {
301   
302   gras_sock_t *sock;
303   gras_msg_t *answer;
304   xbt_error_t errcode;
305   /* The request */
306   BwExp_t *request;
307   msgHost_t *target;
308
309   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
310     fprintf(stderr,"grasbw_request(): Error %s encountered while contacting the actuator\n",
311             xbt_error_name(errcode));
312     return errcode;
313   }
314   if (!(request=(BwExp_t *)malloc(sizeof(BwExp_t))) ||
315       !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
316     fprintf(stderr,"grasbw_test(): Malloc error\n");
317     gras_sock_close(sock);
318     return malloc_error;    
319   }
320
321   request->bufSize=bufSize;
322   request->expSize=expSize;
323   request->msgSize=msgSize;
324   strcpy(target->host,to_name);
325   target->port=to_port;
326   
327   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_BW_REQUEST, 2, 
328                               target,1,
329                               request,1))) {
330     fprintf(stderr,"grasbw_request(): Error %s encountered while sending the request.\n",
331             xbt_error_name(errcode));
332     gras_sock_close(sock);
333     return errcode;
334   }
335   if ((errcode=gras_msg_wait(240,GRASMSG_BW_RESULT,&answer))) {
336     fprintf(stderr,"grasbw_request(): Error %s encountered while waiting for the answer.\n",
337             xbt_error_name(errcode));
338     gras_sock_close(sock);
339     return errcode;
340   }
341
342   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
343     fprintf(stderr,"grasbw_request(): Peer reported error %s (%s).\n",
344             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
345     gras_msg_free(answer);
346     gras_sock_close(sock);
347     return errcode;
348   }
349
350   /*  fprintf(stderr,"sec=%p",gras_msg_ctn(answer,1,0,msgResult_t)); */
351   *sec=gras_msg_ctn(answer,1,0,msgResult_t).value;
352   *bw=gras_msg_ctn(answer,1,1,msgResult_t).value;
353
354   gras_msg_free(answer);
355   gras_sock_close(sock);
356   return no_error;
357 }
358
359 int grasbw_cbBWRequest(gras_msg_t *msg) {
360   /* specification of the test to run */
361   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
362   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
363
364   unsigned int bufSize=gras_msg_ctn(msg,1,0,BwExp_t).bufSize;
365   unsigned int expSize=gras_msg_ctn(msg,1,0,BwExp_t).expSize;
366   unsigned int msgSize=gras_msg_ctn(msg,1,0,BwExp_t).msgSize;
367   /* our answer */
368   msgError_t *error;
369   msgResult_t *res;
370
371   if (!(error=(msgError_t *)malloc(sizeof(msgError_t))) ||
372       !(res=(msgResult_t *)malloc(sizeof(msgResult_t) * 2))) {
373     fprintf(stderr,"%s:%d:grasbw_cbRequest: Malloc error\n",__FILE__,__LINE__);
374     return malloc_error;    
375   }
376
377   if ((error->errcode=grasbw_test(to_name,to_port,bufSize,expSize,msgSize,
378                                   &(res[0].value),&(res[1].value) ))) {
379     fprintf(stderr,
380             "%s:%d:grasbw_cbRequest: Error %s encountered while doing the test\n",
381             __FILE__,__LINE__,xbt_error_name(error->errcode));
382     strncpy(error->errmsg,"Error within grasbw_test",ERRMSG_LEN);
383     gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
384                    error,1,
385                    res,2);
386     return 1;
387   }
388   res[0].timestamp = (unsigned int) gras_time();
389   res[1].timestamp = (unsigned int) gras_time();
390   gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
391                  error,1,
392                  res,2);
393   gras_msg_free(msg);
394   return 1;
395 }
396
397 /* ***************************************************************************
398  * Link saturation
399  * ***************************************************************************/
400
401 xbt_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
402                                   const char* to_name,unsigned int to_port,
403                                   unsigned int msgSize, unsigned int timeout) {
404   gras_sock_t *sock;
405   xbt_error_t errcode;
406   /* The request */
407   SatExp_t *request;
408   msgHost_t *target;
409   /* answer */
410   gras_msg_t *answer;
411
412   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
413     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
414             __FILE__,__LINE__,xbt_error_name(errcode));
415     return errcode;
416   }
417   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
418       !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
419     fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
420     gras_sock_close(sock);
421     return malloc_error;    
422   }
423
424   request->timeout=timeout;
425   request->msgSize=msgSize;
426
427   strcpy(target->host,to_name);
428   target->port=to_port;
429
430   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2, 
431                               target,1,
432                               request,1))) {
433     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
434             __FILE__,__LINE__,xbt_error_name(errcode));
435     gras_sock_close(sock);
436     return errcode;
437   }
438   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
439     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
440             __FILE__,__LINE__,xbt_error_name(errcode));
441     gras_sock_close(sock);
442     return errcode;
443   }
444
445   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
446     fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
447             __FILE__,__LINE__,xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
448     gras_msg_free(answer);
449     gras_sock_close(sock);
450     return errcode;
451   }
452
453   gras_msg_free(answer);
454   gras_sock_close(sock);
455   return no_error;
456 }
457
458 int grasbw_cbSatStart(gras_msg_t *msg) {
459   gras_rawsock_t *raw;
460   gras_sock_t *sock;
461   xbt_error_t errcode;
462   double start; /* time to timeout */
463
464   /* specification of the test to run */
465   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
466   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
467
468   unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
469   unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
470   unsigned int raw_port;
471
472   /* The request */
473   SatExp_t *request;
474   /* answer */
475   gras_msg_t *answer;
476
477   /*
478   fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
479   fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
480           msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
481           msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
482   */
483
484   /* Negociate the saturation with the peer */
485   if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
486     fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
487             xbt_error_name(errcode));
488     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
489                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
490                      errcode,"Cannot contact peer.\n");
491     return 1;
492   }
493   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
494     fprintf(stderr,"cbSatStart(): Malloc error\n");
495     gras_sock_close(sock);
496     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
497                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
498                      malloc_error,"Cannot build request.\n");
499     return 1;    
500   }
501
502   request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
503   request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
504
505   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1, 
506                               request,1))) {
507     fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
508             xbt_error_name(errcode));
509     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
510                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
511                      errcode,"Cannot send request.\n");
512     gras_sock_close(sock);
513     return 1;
514   }
515
516   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
517     fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
518             xbt_error_name(errcode));
519     gras_sock_close(sock);
520
521     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
522                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
523                      errcode,
524                      "Cannot receive the ACK.\n");
525     return 1;
526   }
527
528   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
529     fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
530             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
531
532     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
533                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
534                      errcode,
535                      "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
536     gras_msg_free(answer);
537     gras_sock_close(sock);
538     return 1;
539   }
540
541   raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
542
543   if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
544     fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
545             xbt_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
546
547     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
548                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
549                      errcode,"Cannot open raw socket.\n");
550     gras_sock_close(sock);
551     return 1;
552   }
553
554   /* send a train of data before repporting that XP is started */
555   if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
556     fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
557     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
558                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
559                      errcode,"Cannot raw send.\n");
560     gras_sock_close(sock);
561     gras_rawsock_close(raw);
562     return 1;
563   }
564   
565   grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
566                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
567                    no_error,"Saturation started");
568   gras_msg_free(answer);
569   gras_msg_free(msg);
570   
571   /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
572   start=gras_time();
573   while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error && 
574          gras_time()-start < timeout) {
575     if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
576       fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
577       /* our error message do not interess anyone. SAT_STOP will do nothing. */
578       gras_sock_close(sock);
579       gras_rawsock_close(raw);
580       return 1;
581     } 
582   }
583   if (gras_time()-start > timeout) {
584     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
585     gras_sock_close(sock);
586     gras_rawsock_close(raw);
587     return 1;
588   }
589
590   /* Handle the SAT_STOP which broke the previous while */
591   
592   if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
593     fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
594
595     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
596                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
597                      errcode,"Sending SAT_END to peer failed.\n");
598     gras_sock_close(sock);
599     gras_rawsock_close(raw);
600     return 1;
601   }
602   
603   if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
604     fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
605
606     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
607                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
608                      errcode,"Receiving SAT_ENDED from peer failed.\n");
609     gras_sock_close(sock);
610     gras_rawsock_close(raw);
611     return 1;
612   }
613   grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
614                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
615                    no_error,"");
616
617   gras_sock_close(sock);
618   gras_rawsock_close(raw);
619   gras_msg_free(answer);
620   gras_msg_free(msg);
621
622   return 1;  
623 }
624
625 int grasbw_cbSatBegin(gras_msg_t *msg) {
626   gras_rawsock_t *raw;
627   xbt_error_t errcode;
628   double start; /* timer */
629   /* request */
630   unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
631   unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
632   /* answer */
633   SatExp_t *request;
634   msgError_t *error;
635
636   if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
637       !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
638     fprintf(stderr,"cbSatBegin(): Malloc error\n");
639     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
640                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
641                      malloc_error,"Malloc error");
642     return 1;
643   }
644
645   if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) { 
646     fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
647             xbt_error_name(errcode));
648     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
649                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
650                      errcode,"Cannot open raw socket");
651     return 1;
652   }
653   request->port=gras_rawsock_get_peer_port(raw);
654   request->msgSize=msgSize;
655   error->errcode=no_error;
656   error->errmsg[0]='\0';
657   if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
658                               error,1,
659                               request,1))) {
660     fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
661             xbt_error_name(errcode));
662     return 1;
663   }
664   gras_msg_free(msg);
665
666   start=gras_time();
667   while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
668          gras_time() - start < timeout) {
669     errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
670     if (errcode != timeout_error && errcode != no_error) {
671       fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",xbt_error_name(errcode));
672       /* our error message do not interess anyone. SAT_END will do nothing. */
673       /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
674       return 1;
675     } 
676   }
677   if (gras_time()-start > timeout) {
678     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
679     gras_rawsock_close(raw);
680     return 1;
681   }
682
683   grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
684                    "cbSatBegin: Cannot send SAT_ENDED.\n",
685                    no_error,"");
686   gras_rawsock_close(raw);
687   gras_msg_free(msg);
688   return 1;
689 }
690
691 xbt_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
692                                  const char* to_name,unsigned int to_port) {
693   xbt_error_t errcode;
694   gras_sock_t *sock;
695   gras_msg_t *answer;
696
697   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
698     fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
699             xbt_error_name(errcode));
700     return errcode;
701   }
702
703   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
704     fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
705             xbt_error_name(errcode));
706     gras_sock_close(sock);
707     return errcode;
708   }
709
710   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
711     fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
712             xbt_error_name(errcode));
713     gras_sock_close(sock);
714     return errcode;
715   }
716
717   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
718     fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
719             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
720     gras_msg_free(answer);
721     gras_sock_close(sock);
722     return errcode;
723   }
724
725   gras_msg_free(answer);
726   gras_sock_close(sock);
727
728   return no_error;
729 }
730 #endif