Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
74fff4e289466ebba6b7e119169fead00d3e566a
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-5 Martin Quinson. All rights reserved.                */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/ex.h"
11 #include "amok/Bandwidth/bandwidth_private.h"
12 #include "gras/messages.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
15
16 static short _amok_bw_initialized = 0;
17
18 /** @brief module initialization; all participating nodes must run this */
19 void amok_bw_init(void) {
20   gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
21
22   amok_base_init();
23
24   if (! _amok_bw_initialized) {
25         
26      /* Build the datatype descriptions */ 
27      bw_request_desc = gras_datadesc_struct("s_bw_request_t");
28      gras_datadesc_struct_append(bw_request_desc,"host",
29                                  gras_datadesc_by_name("xbt_host_t"));
30      gras_datadesc_struct_append(bw_request_desc,"buf_size",
31                                  gras_datadesc_by_name("unsigned long int"));
32      gras_datadesc_struct_append(bw_request_desc,"exp_size",
33                                  gras_datadesc_by_name("unsigned long int"));
34      gras_datadesc_struct_append(bw_request_desc,"msg_size",
35                                  gras_datadesc_by_name("unsigned long int"));
36      gras_datadesc_struct_close(bw_request_desc);
37      bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
38
39      bw_res_desc = gras_datadesc_struct("s_bw_res_t");
40      gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("s_amok_remoterr_t"));
41      gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
42      gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
43      gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
44      gras_datadesc_struct_close(bw_res_desc);
45      bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
46
47      sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
48      gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
49      gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
50      gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
51      gras_datadesc_struct_close(sat_request_desc);
52      sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
53      
54      /* Register the bandwidth messages */
55      gras_msgtype_declare("BW handshake",     bw_request_desc);
56      gras_msgtype_declare("BW handshake ACK", bw_request_desc);
57      gras_msgtype_declare("BW request",       bw_request_desc);
58      gras_msgtype_declare("BW result",        bw_res_desc);
59      
60      /* Register the saturation messages */
61      gras_msgtype_declare("SAT start",   sat_request_desc);
62      gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
63      gras_msgtype_declare("SAT begin",   sat_request_desc);
64      gras_msgtype_declare("SAT begun",   gras_datadesc_by_name("amok_remoterr_t"));
65      gras_msgtype_declare("SAT end",     NULL);
66      gras_msgtype_declare("SAT ended",   gras_datadesc_by_name("amok_remoterr_t"));
67      gras_msgtype_declare("SAT stop",    NULL);
68      gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
69   }
70    
71   /* Register the callbacks */
72   gras_cb_register(gras_msgtype_by_name("BW request"),
73                    &amok_bw_cb_bw_request);
74   gras_cb_register(gras_msgtype_by_name("BW handshake"),
75                    &amok_bw_cb_bw_handshake);
76
77   gras_cb_register(gras_msgtype_by_name("SAT start"),
78                    &amok_bw_cb_sat_start);
79   gras_cb_register(gras_msgtype_by_name("SAT begin"),
80                    &amok_bw_cb_sat_begin);
81   
82   _amok_bw_initialized =1;
83 }
84
85 /** @brief module finalization */
86 void amok_bw_exit(void) {
87   if (! _amok_bw_initialized)
88     return;
89    
90   gras_cb_unregister(gras_msgtype_by_name("BW request"),
91                      &amok_bw_cb_bw_request);
92   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
93                      &amok_bw_cb_bw_handshake);
94
95   gras_cb_unregister(gras_msgtype_by_name("SAT start"),
96                      &amok_bw_cb_sat_start);
97   gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
98                      &amok_bw_cb_sat_begin);
99
100   _amok_bw_initialized = 0;
101 }
102
103 /* ***************************************************************************
104  * Bandwidth tests
105  * ***************************************************************************/
106
107 /**
108  * \brief bandwidth measurement between localhost and \e peer
109  * 
110  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
111  * \arg buf_size: Size of the socket buffer
112  * \arg exp_size: Total size of data sent across the network
113  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
114  * \arg sec: where the result (in seconds) should be stored.
115  * \arg bw: observed Bandwidth (in kb/s) 
116  *
117  * Conduct a bandwidth test from the local process to the given peer.
118  * This call is blocking until the end of the experiment.
119  *
120  * Results are reported in last args, and sizes are in kb.
121  */
122 xbt_error_t amok_bw_test(gras_socket_t peer,
123                          unsigned long int buf_size,
124                          unsigned long int exp_size,
125                          unsigned long int msg_size,
126                          /*OUT*/ double *sec, double *bw) {
127
128   /* Measurement sockets for the experiments */
129   gras_socket_t measMasterIn=NULL,measIn,measOut;
130   int port;
131   xbt_error_t errcode;
132   bw_request_t request,request_ack;
133   xbt_ex_t e;
134   
135   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
136     TRY {
137       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
138     } CATCH(e) {
139       measMasterIn = NULL;
140       if (port < 10000) {
141         xbt_ex_free(e);
142       } else {
143         RETHROW0("Error caught while opening a measurement socket: %s");
144       }
145     }
146   }
147   
148   request=xbt_new0(s_bw_request_t,1);
149   request->buf_size=buf_size*1024;
150   request->exp_size=exp_size*1024;
151   request->msg_size=msg_size*1024;
152   request->host.name = NULL;
153   request->host.port = gras_socket_my_port(measMasterIn);
154   VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld kb= %ld b)", 
155         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
156         buf_size,request->buf_size);
157
158   TRY {
159     gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request);
160   } CATCH(e) {
161     RETHROW0("Error encountered while sending the BW request: %s");
162   }
163   measIn = gras_socket_meas_accept(measMasterIn);
164
165   TRY {
166     gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),NULL,&request_ack);
167   } CATCH(e) {
168     RETHROW0("Error encountered while waiting for the answer to BW request: %s");
169   }
170   
171   /* FIXME: What if there is a remote error? */
172    
173   TRY {
174     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
175                                    request_ack->host.port, 
176                                    request->buf_size,1);
177   } CATCH(e) {
178     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
179              gras_socket_peer_name(peer),request_ack->host.port);
180   }
181   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
182
183   *sec=gras_os_time();
184   gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
185   gras_socket_meas_recv(measIn,120,1,1);
186
187   /*catch
188     ERROR1("Error %s encountered while sending the BW experiment.",
189             xbt_error_name(errcode));
190     gras_socket_close(measOut);
191     gras_socket_close(measMasterIn);
192     gras_socket_close(measIn);
193   */
194
195   *sec = gras_os_time() - *sec;
196   *bw = ((double)exp_size) / *sec;
197
198   free(request_ack);
199   free(request);
200   if (measIn != measMasterIn)
201     gras_socket_close(measIn);
202   gras_socket_close(measMasterIn);
203   gras_socket_close(measOut);
204   return errcode;
205 }
206
207
208 /* Callback to the "BW handshake" message: 
209    opens a server measurement socket,
210    indicate its port in an "BW handshaked" message,
211    receive the corresponding data on the measurement socket, 
212    close the measurment socket
213
214    sizes are in byte (got converted from kb my expeditor)
215 */
216 int amok_bw_cb_bw_handshake(gras_socket_t  expeditor,
217                             void          *payload) {
218   gras_socket_t measMasterIn=NULL,measIn,measOut;
219   bw_request_t request=*(bw_request_t*)payload;
220   bw_request_t answer;
221   xbt_ex_t e;
222   int port;
223   
224   VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
225         gras_socket_peer_name(expeditor),request->host.port,
226         request->buf_size,request->exp_size,request->msg_size);     
227
228   /* Build our answer */
229   answer = xbt_new0(s_bw_request_t,1);
230   
231   for (port = 6000; port < 10000 && measMasterIn == NULL; port++) {
232     TRY {
233       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
234     } CATCH(e) {
235       measMasterIn = NULL;
236       if (port < 10000)
237         xbt_ex_free(e);
238       else
239         /* FIXME: tell error to remote */
240         RETHROW0("Error encountered while opening a measurement server socket: %s");
241     }
242   }
243    
244   answer->buf_size=request->buf_size;
245   answer->exp_size=request->exp_size;
246   answer->msg_size=request->msg_size;
247   answer->host.port=gras_socket_my_port(measMasterIn);
248
249   /* Don't connect asap to leave time to other side to enter the accept() */
250   TRY {
251     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
252                                      request->host.port,
253                                      request->buf_size,1);
254   } CATCH(e) {
255     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
256              gras_socket_peer_name(expeditor),request->host.port);
257     /* FIXME: tell error to remote */
258   }
259
260   TRY {
261     gras_msg_send(expeditor, gras_msgtype_by_name("BW handshake ACK"), &answer);
262   } CATCH(e) { 
263     gras_socket_close(measMasterIn);
264     gras_socket_close(measOut);
265     /* FIXME: tell error to remote */
266     RETHROW0("Error encountered while sending the answer: %s");
267   }
268
269   TRY {
270     measIn = gras_socket_meas_accept(measMasterIn);
271     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
272            answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
273
274     gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
275     gras_socket_meas_send(measOut,120,1,1);
276   } CATCH(e) {
277     gras_socket_close(measMasterIn);
278     gras_socket_close(measIn);
279     gras_socket_close(measOut);
280     /* FIXME: tell error to remote ? */
281     RETHROW0("Error encountered while receiving the experiment: %s");
282   }
283
284   if (measIn != measMasterIn)
285     gras_socket_close(measMasterIn);
286   gras_socket_close(measIn);
287   gras_socket_close(measOut);
288   free(answer);
289   free(request);
290   DEBUG0("BW experiment done.");
291   return 1;
292 }
293
294 /**
295  * \brief request a bandwidth measurement between two remote hosts
296  *
297  * \arg from_name: Name of the first host 
298  * \arg from_port: port on which the first process is listening for messages
299  * \arg to_name: Name of the second host 
300  * \arg to_port: port on which the second process is listening (for messages, do not 
301  * give a measurement socket here. The needed measurement sockets will be created 
302  * automatically and negociated between the peers)
303  * \arg buf_size: Size of the socket buffer
304  * \arg exp_size: Total size of data sent across the network
305  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
306  * \arg sec: where the result (in seconds) should be stored.
307  * \arg bw: observed Bandwidth (in kb/s)
308  *
309  * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
310  * This call is blocking until the end of the experiment.
311  *
312  * Results are reported in last args, and sizes are in kb.
313  */
314 xbt_error_t amok_bw_request(const char* from_name,unsigned int from_port,
315                             const char* to_name,unsigned int to_port,
316                             unsigned long int buf_size,
317                             unsigned long int exp_size,
318                             unsigned long int msg_size,
319                             /*OUT*/ double *sec, double*bw) {
320   
321   gras_socket_t sock;
322   /* The request */
323   bw_request_t request;
324   bw_res_t result;
325
326   request=xbt_new0(s_bw_request_t,1);
327   request->buf_size=buf_size;
328   request->exp_size=exp_size;
329   request->msg_size=msg_size;
330
331   request->host.name = (char*)to_name;
332   request->host.port = to_port;
333
334   sock = gras_socket_client(from_name,from_port);
335   gras_msg_send(sock,gras_msgtype_by_name("BW request"),&request);
336   free(request);
337
338   gras_msg_wait(240,gras_msgtype_by_name("BW result"),NULL, &result);
339   
340   *sec=result->sec;
341   *bw =result->bw;
342
343   VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
344         from_name,from_port, to_name,to_port,
345         *sec,*bw);
346
347   gras_socket_close(sock);
348   free(result);
349   return no_error;
350 }
351
352 int amok_bw_cb_bw_request(gras_socket_t    expeditor,
353                           void            *payload) {
354                           
355   /* specification of the test to run, and our answer */
356   bw_request_t request = *(bw_request_t*)payload;
357   bw_res_t result = xbt_new0(s_bw_res,1);
358   gras_socket_t peer;
359
360   peer = gras_socket_client(request->host.name,request->host.port);
361   amok_bw_test(peer,
362                request->buf_size,request->exp_size,request->msg_size,
363                &(result->sec),&(result->bw));
364
365   gras_msg_send(expeditor,gras_msgtype_by_name("BW result"),&result);
366
367   gras_os_sleep(1);
368   gras_socket_close(peer);
369   free(request);
370   free(result);
371   
372   return 1;
373
374 #if 0
375   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
376   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
377
378   unsigned int bufSize=gras_msg_ctn(msg,1,0,BwExp_t).bufSize;
379   unsigned int expSize=gras_msg_ctn(msg,1,0,BwExp_t).expSize;
380   unsigned int msgSize=gras_msg_ctn(msg,1,0,BwExp_t).msgSize;
381   /* our answer */
382   msgError_t *error;
383   msgResult_t *res;
384
385   if ((error->errcode=grasbw_test(to_name,to_port,bufSize,expSize,msgSize,
386                                   &(res[0].value),&(res[1].value) ))) {
387     fprintf(stderr,
388             "%s:%d:grasbw_cbRequest: Error %s encountered while doing the test\n",
389             __FILE__,__LINE__,xbt_error_name(error->errcode));
390     strncpy(error->errmsg,"Error within grasbw_test",ERRMSG_LEN);
391     gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
392                    error,1,
393                    res,2);
394     return 1;
395   }
396   res[0].timestamp = (unsigned int) gras_time();
397   res[1].timestamp = (unsigned int) gras_time();
398   gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
399                  error,1,
400                  res,2);
401   gras_msg_free(msg);
402   return 1;
403 #endif
404 }
405
406 int amok_bw_cb_sat_start(gras_socket_t     expeditor,
407                          void             *payload) {
408    CRITICAL0("amok_bw_cb_sat_start; not implemented");
409    return 1;
410
411 int amok_bw_cb_sat_begin(gras_socket_t     expeditor,
412                          void             *payload) {
413    CRITICAL0("amok_bw_cb_sat_begin: not implemented");
414    return 1;
415 }
416 #if 0
417 /* ***************************************************************************
418  * Link saturation
419  * ***************************************************************************/
420
421 xbt_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
422                                   const char* to_name,unsigned int to_port,
423                                   unsigned int msgSize, unsigned int timeout) {
424   gras_sock_t *sock;
425   xbt_error_t errcode;
426   /* The request */
427   SatExp_t *request;
428   msgHost_t *target;
429   /* answer */
430   gras_msg_t *answer;
431
432   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
433     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
434             __FILE__,__LINE__,xbt_error_name(errcode));
435     return errcode;
436   }
437   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
438       !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
439     fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
440     gras_sock_close(sock);
441     return malloc_error;    
442   }
443
444   request->timeout=timeout;
445   request->msgSize=msgSize;
446
447   strcpy(target->host,to_name);
448   target->port=to_port;
449
450   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2, 
451                               target,1,
452                               request,1))) {
453     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
454             __FILE__,__LINE__,xbt_error_name(errcode));
455     gras_sock_close(sock);
456     return errcode;
457   }
458   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
459     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
460             __FILE__,__LINE__,xbt_error_name(errcode));
461     gras_sock_close(sock);
462     return errcode;
463   }
464
465   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
466     fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
467             __FILE__,__LINE__,xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
468     gras_msg_free(answer);
469     gras_sock_close(sock);
470     return errcode;
471   }
472
473   gras_msg_free(answer);
474   gras_sock_close(sock);
475   return no_error;
476 }
477
478 int grasbw_cbSatStart(gras_msg_t *msg) {
479   gras_rawsock_t *raw;
480   gras_sock_t *sock;
481   xbt_error_t errcode;
482   double start; /* time to timeout */
483
484   /* specification of the test to run */
485   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
486   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
487
488   unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
489   unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
490   unsigned int raw_port;
491
492   /* The request */
493   SatExp_t *request;
494   /* answer */
495   gras_msg_t *answer;
496
497   /*
498   fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
499   fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
500           msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
501           msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
502   */
503
504   /* Negociate the saturation with the peer */
505   if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
506     fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
507             xbt_error_name(errcode));
508     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
509                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
510                      errcode,"Cannot contact peer.\n");
511     return 1;
512   }
513   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
514     fprintf(stderr,"cbSatStart(): Malloc error\n");
515     gras_sock_close(sock);
516     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
517                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
518                      malloc_error,"Cannot build request.\n");
519     return 1;    
520   }
521
522   request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
523   request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
524
525   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1, 
526                               request,1))) {
527     fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
528             xbt_error_name(errcode));
529     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
530                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
531                      errcode,"Cannot send request.\n");
532     gras_sock_close(sock);
533     return 1;
534   }
535
536   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
537     fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
538             xbt_error_name(errcode));
539     gras_sock_close(sock);
540
541     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
542                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
543                      errcode,
544                      "Cannot receive the ACK.\n");
545     return 1;
546   }
547
548   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
549     fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
550             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
551
552     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
553                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
554                      errcode,
555                      "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
556     gras_msg_free(answer);
557     gras_sock_close(sock);
558     return 1;
559   }
560
561   raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
562
563   if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
564     fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
565             xbt_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
566
567     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
568                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
569                      errcode,"Cannot open raw socket.\n");
570     gras_sock_close(sock);
571     return 1;
572   }
573
574   /* send a train of data before repporting that XP is started */
575   if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
576     fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
577     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
578                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
579                      errcode,"Cannot raw send.\n");
580     gras_sock_close(sock);
581     gras_rawsock_close(raw);
582     return 1;
583   }
584   
585   grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
586                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
587                    no_error,"Saturation started");
588   gras_msg_free(answer);
589   gras_msg_free(msg);
590   
591   /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
592   start=gras_time();
593   while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error && 
594          gras_time()-start < timeout) {
595     if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
596       fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
597       /* our error message do not interess anyone. SAT_STOP will do nothing. */
598       gras_sock_close(sock);
599       gras_rawsock_close(raw);
600       return 1;
601     } 
602   }
603   if (gras_time()-start > timeout) {
604     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
605     gras_sock_close(sock);
606     gras_rawsock_close(raw);
607     return 1;
608   }
609
610   /* Handle the SAT_STOP which broke the previous while */
611   
612   if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
613     fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
614
615     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
616                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
617                      errcode,"Sending SAT_END to peer failed.\n");
618     gras_sock_close(sock);
619     gras_rawsock_close(raw);
620     return 1;
621   }
622   
623   if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
624     fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
625
626     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
627                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
628                      errcode,"Receiving SAT_ENDED from peer failed.\n");
629     gras_sock_close(sock);
630     gras_rawsock_close(raw);
631     return 1;
632   }
633   grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
634                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
635                    no_error,"");
636
637   gras_sock_close(sock);
638   gras_rawsock_close(raw);
639   gras_msg_free(answer);
640   gras_msg_free(msg);
641
642   return 1;  
643 }
644
645 int grasbw_cbSatBegin(gras_msg_t *msg) {
646   gras_rawsock_t *raw;
647   xbt_error_t errcode;
648   double start; /* timer */
649   /* request */
650   unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
651   unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
652   /* answer */
653   SatExp_t *request;
654   msgError_t *error;
655
656   if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
657       !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
658     fprintf(stderr,"cbSatBegin(): Malloc error\n");
659     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
660                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
661                      malloc_error,"Malloc error");
662     return 1;
663   }
664
665   if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) { 
666     fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
667             xbt_error_name(errcode));
668     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
669                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
670                      errcode,"Cannot open raw socket");
671     return 1;
672   }
673   request->port=gras_rawsock_get_peer_port(raw);
674   request->msgSize=msgSize;
675   error->errcode=no_error;
676   error->errmsg[0]='\0';
677   if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
678                               error,1,
679                               request,1))) {
680     fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
681             xbt_error_name(errcode));
682     return 1;
683   }
684   gras_msg_free(msg);
685
686   start=gras_time();
687   while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
688          gras_time() - start < timeout) {
689     errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
690     if (errcode != timeout_error && errcode != no_error) {
691       fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",xbt_error_name(errcode));
692       /* our error message do not interess anyone. SAT_END will do nothing. */
693       /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
694       return 1;
695     } 
696   }
697   if (gras_time()-start > timeout) {
698     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
699     gras_rawsock_close(raw);
700     return 1;
701   }
702
703   grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
704                    "cbSatBegin: Cannot send SAT_ENDED.\n",
705                    no_error,"");
706   gras_rawsock_close(raw);
707   gras_msg_free(msg);
708   return 1;
709 }
710
711 xbt_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
712                                  const char* to_name,unsigned int to_port) {
713   xbt_error_t errcode;
714   gras_sock_t *sock;
715   gras_msg_t *answer;
716
717   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
718     fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
719             xbt_error_name(errcode));
720     return errcode;
721   }
722
723   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
724     fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
725             xbt_error_name(errcode));
726     gras_sock_close(sock);
727     return errcode;
728   }
729
730   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
731     fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
732             xbt_error_name(errcode));
733     gras_sock_close(sock);
734     return errcode;
735   }
736
737   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
738     fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
739             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
740     gras_msg_free(answer);
741     gras_sock_close(sock);
742     return errcode;
743   }
744
745   gras_msg_free(answer);
746   gras_sock_close(sock);
747
748   return no_error;
749 }
750 #endif