Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fixed licence and copyright. No more reference to da GRAS possee or the
[simgrid.git] / src / amok / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities    */
4
5 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13
14 #include "gras/messages.h"
15 #include "amok/bandwidth.h"
16
17 /**
18  * bw_request_t:
19  *
20  * Request for a BW experiment.
21  * If host==NULL, it should be between the sender and the receiver.
22  * If not, it should be between between the receiver and @host (3-tiers).
23  */
24 typedef struct {
25   xbt_host_t host; /* host+raw socket to use */
26   unsigned int buf_size;
27   unsigned int exp_size;
28   unsigned int msg_size;
29 } bw_request_t;
30
31 /**
32  * bw_res_t:
33  *
34  * Result of a BW experiment (payload when answering).
35  * if err.msg != NULL, it wasn't sucessful. Check err.msg and err.code to see why.
36  * else
37  */
38 typedef struct {
39   amok_remoterr_t err;
40   unsigned int timestamp;
41   double seconds;
42   double bw;
43 } bw_res_t;
44
45
46 /**
47  * sat_request_t:
48  *
49  * Description of a saturation experiment (payload asking some host to collaborate for that)
50  */
51 typedef struct {
52   xbt_host_t host; /* host+raw socket to use */
53   unsigned int msg_size;
54   unsigned int timeout;
55 } sat_request_t;
56
57 /* Prototypes of local callbacks */
58 int amok_bw_cb_bw_handshake(gras_socket_t *expeditor,
59                             void          *payload);
60 int amok_bw_cb_bw_request(gras_socket_t   *expeditor,
61                           void            *payload);
62
63 int amok_bw_cb_sat_start(gras_socket_t    *expeditor,
64                          void             *payload);
65 int amok_bw_cb_sat_begin(gras_socket_t    *expeditor,
66                          void             *payload);
67
68 /**** code ****/
69 void amok_bw_init(void) {
70   xbt_error_t errcode;
71   gras_datadesc_type_t *bw_request_desc, *bw_res_desc, *sat_request_desc;
72
73   amok_base_init();
74    
75   /* Build the datatype descriptions */ 
76   bw_request_desc = gras_datadesc_struct("bw_request_t");
77   gras_datadesc_struct_append(bw_request_desc,"host",gras_datadesc_by_name("xbt_host_t*"));
78   gras_datadesc_struct_append(bw_request_desc,"buf_size",gras_datadesc_by_name("unsigned int"));
79   gras_datadesc_struct_append(bw_request_desc,"exp_size",gras_datadesc_by_name("unsigned int"));
80   gras_datadesc_struct_append(bw_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
81   gras_datadesc_struct_close(bw_request_desc);
82   bw_request_desc = gras_datadesc_ref("bw_request_t*",bw_request_desc);
83
84   bw_res_desc = gras_datadesc_struct("bw_res_t");
85   gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("amok_remoterr_t"));
86   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
87   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
88   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
89   gras_datadesc_struct_close(bw_res_desc);
90   bw_res_desc = gras_datadesc_ref("bw_res_t*",bw_res_desc);
91
92   sat_request_desc = gras_datadesc_struct("sat_request_desc");
93   gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
94   gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
95   gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
96   gras_datadesc_struct_close(sat_request_desc);
97   sat_request_desc = gras_datadesc_ref("sat_request_t*",sat_request_desc);
98    
99   /* Register the bandwidth messages */
100   gras_msgtype_declare("BW request",       bw_request_desc);
101   gras_msgtype_declare("BW result",        bw_res_desc);
102   gras_msgtype_declare("BW handshake",     bw_request_desc);
103   gras_msgtype_declare("BW handshake ACK", bw_request_desc);
104
105   /* Register the saturation messages */
106   gras_msgtype_declare("SAT start",   sat_request_desc);
107   gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
108   gras_msgtype_declare("SAT begin",   sat_request_desc);
109   gras_msgtype_declare("SAT begun",   gras_datadesc_by_name("amok_remoterr_t"));
110   gras_msgtype_declare("SAT end",     NULL);
111   gras_msgtype_declare("SAT ended",   gras_datadesc_by_name("amok_remoterr_t"));
112   gras_msgtype_declare("SAT stop",    NULL);
113   gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
114
115   /* Register the callbacks */
116   gras_cb_register(gras_msgtype_by_name("BW request"),
117                    &amok_bw_cb_bw_request);
118   gras_cb_register(gras_msgtype_by_name("BW handshake"),
119                    &amok_bw_cb_bw_handshake);
120
121   gras_cb_register(gras_msgtype_by_name("SAT start"),
122                    &amok_bw_cb_sat_start);
123   gras_cb_register(gras_msgtype_by_name("SAT begin"),
124                    &amok_bw_cb_sat_begin);
125 }
126
127 /* ***************************************************************************
128  * Bandwidth tests
129  * ***************************************************************************/
130 /* Function to do a test from local to given host */
131 /**
132  * amok_bw_test:
133  * 
134  * Conduct a test between the local host and @peer, and 
135  * report the result in last args
136  */
137 xbt_error_t amok_bw_test(gras_socket_t *peer,
138                           unsigned int bufSize,unsigned int expSize,unsigned int msgSize,
139                           /*OUT*/ double *sec, double *bw) {
140   gras_socket_t *rawIn,*rawOut;
141   int port;
142   xbt_error_t errcode;
143   bw_request_t *request;
144   gras_msg_t *answer;
145   
146   for (port = 5000, errcode = system_error;
147        errcode == system_error;
148        errcode = gras_socket_server_ext(++port,bufSize,1,&rawIn));
149   if (errcode != no_error) {
150     ERROR1("Error %s encountered while opening a raw socket\n",
151            xbt_error_name(errcode));
152     return errcode;
153   }
154
155   request=xbt_new(bw_request_t);
156   request->bufSize=bufSize;
157   request->expSize=expSize;
158   request->msgSize=msgSize;
159   request->host.name = NULL;
160   request->host.port = gras_socket_peer_port(rawIn);
161
162   if ((errcode=gras_send(peer,gras_msgtype_by_name("BW handshake"),request))) {
163     ERROR1("Error %s encountered while sending the request.", xbt_error_name(errcode));
164     gras_sock_close(sock);
165     return errcode;
166   }
167   if ((errcode=gras_msg_wait(60,GRASMSG_BW_HANDSHAKED,&answer))) {
168     fprintf(stderr,"grasbw_test(): Error %s encountered while waiting for the answer.\n",
169             xbt_error_name(errcode));
170     gras_sock_close(sock);
171     return errcode;
172   }
173   if((errcode=gras_rawsock_client_open(to_name,gras_msg_ctn(answer,0,0,BwExp_t).port,
174                                     bufSize,&rawOut))) {
175     fprintf(stderr,"grasbw_test(): Error %s encountered while opening the raw socket to %s:%d\n",
176             xbt_error_name(errcode),to_name,gras_msg_ctn(answer,0,0,BwExp_t).port);
177     return errcode;
178   }
179
180   *sec=gras_time();
181   if ((errcode=gras_rawsock_send(rawOut,expSize,msgSize)) ||
182       (errcode=gras_rawsock_recv(rawIn,1,1,120))) {
183     fprintf(stderr,"grasbw_test(): Error %s encountered while sending the experiment.\n",
184             xbt_error_name(errcode));
185     gras_rawsock_close(rawOut);
186     gras_rawsock_close(rawIn);
187     return errcode;
188   }
189   *sec = gras_time() - *sec;
190   *bw = ((double)expSize /* 8.0*/) / *sec / (1024.0 *1024.0);
191
192   gras_rawsock_close(rawIn);
193   gras_rawsock_close(rawOut);
194   gras_sock_close(sock);
195   gras_msg_free(answer);
196   return no_error;
197 }
198
199 #if 0
200
201
202 /* Callback to the GRASMSG_BW_HANDSHAKE message: 
203    opens a server raw socket,
204    indicate its port in an answer GRASMSG_BW_HANDSHAKED message,
205    receive the corresponding data on the raw socket, 
206    close the raw socket
207 */
208 int grasbw_cbBWHandshake(gras_msg_t *msg) {
209   gras_rawsock_t *rawIn,*rawOut;
210   BwExp_t *ans;
211   xbt_error_t errcode;
212   
213   if ((errcode=gras_rawsock_server_open(6666,8000,gras_msg_ctn(msg,0,0,BwExp_t).bufSize,&rawIn))) { 
214     fprintf(stderr,"grasbw_cbHandshake(): Error %s encountered while opening a raw socket\n",
215             xbt_error_name(errcode));
216     return 1;
217   }
218   if ((errcode=gras_rawsock_client_open(gras_sock_get_peer_name(msg->sock),gras_msg_ctn(msg,0,0,BwExp_t).port,
219                                      gras_msg_ctn(msg,0,0,BwExp_t).bufSize,&rawOut))) { 
220     fprintf(stderr,"grasbw_cbHandshake(): Error %s encountered while opening a raw socket\n",
221             xbt_error_name(errcode));
222     return 1;
223   }
224   if (!(ans=(BwExp_t *)malloc(sizeof(BwExp_t)))) {
225     fprintf(stderr,"grasbw_cbHandshake(): Malloc error.\n");
226     gras_rawsock_close(rawIn);
227     gras_rawsock_close(rawOut);
228     return 1;
229   }
230   ans->bufSize=gras_msg_ctn(msg,0,0,BwExp_t).bufSize;
231   ans->expSize=gras_msg_ctn(msg,0,0,BwExp_t).expSize;
232   ans->msgSize=gras_msg_ctn(msg,0,0,BwExp_t).msgSize;
233   ans->port=gras_rawsock_get_peer_port(rawIn);
234   /*  fprintf(stderr,"grasbw_cbHandshake. bufSize=%d expSize=%d msgSize=%d port=%d\n",
235           ans->bufSize,ans->expSize,ans->msgSize,ans->port);*/
236
237   if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_BW_HANDSHAKED, 1,
238                               ans, 1))) {
239     fprintf(stderr,"grasbw_cbHandshake(): Error %s encountered while sending the answer.\n",
240             xbt_error_name(errcode));
241     gras_rawsock_close(rawIn);
242     gras_rawsock_close(rawOut);
243     return 1;
244   }
245     
246   if ((errcode=gras_rawsock_recv(rawIn,
247                                gras_msg_ctn(msg,0,0,BwExp_t).expSize,
248                                gras_msg_ctn(msg,0,0,BwExp_t).msgSize,
249                                120)) ||
250       (errcode=gras_rawsock_send(rawOut,1,1))) {
251     fprintf(stderr,"grasbw_cbHandshake(): Error %s encountered while receiving the experiment.\n",
252             xbt_error_name(errcode));
253     gras_rawsock_close(rawIn);
254     gras_rawsock_close(rawOut);
255     return 1;
256   }
257   gras_msg_free(msg);
258   gras_rawsock_close(rawIn);
259   gras_rawsock_close(rawOut);
260   return 1;
261 }
262
263 /* function to request a BW test between to external hosts */
264 xbt_error_t grasbw_request(const char* from_name,unsigned int from_port,
265                            const char* to_name,unsigned int to_port,
266                            unsigned int bufSize,unsigned int expSize,unsigned int msgSize,
267                            /*OUT*/ double *sec, double*bw) {
268   
269   gras_sock_t *sock;
270   gras_msg_t *answer;
271   xbt_error_t errcode;
272   /* The request */
273   BwExp_t *request;
274   msgHost_t *target;
275
276   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
277     fprintf(stderr,"grasbw_request(): Error %s encountered while contacting the actuator\n",
278             xbt_error_name(errcode));
279     return errcode;
280   }
281   if (!(request=(BwExp_t *)malloc(sizeof(BwExp_t))) ||
282       !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
283     fprintf(stderr,"grasbw_test(): Malloc error\n");
284     gras_sock_close(sock);
285     return malloc_error;    
286   }
287
288   request->bufSize=bufSize;
289   request->expSize=expSize;
290   request->msgSize=msgSize;
291   strcpy(target->host,to_name);
292   target->port=to_port;
293   
294   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_BW_REQUEST, 2, 
295                               target,1,
296                               request,1))) {
297     fprintf(stderr,"grasbw_request(): Error %s encountered while sending the request.\n",
298             xbt_error_name(errcode));
299     gras_sock_close(sock);
300     return errcode;
301   }
302   if ((errcode=gras_msg_wait(240,GRASMSG_BW_RESULT,&answer))) {
303     fprintf(stderr,"grasbw_request(): Error %s encountered while waiting for the answer.\n",
304             xbt_error_name(errcode));
305     gras_sock_close(sock);
306     return errcode;
307   }
308
309   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
310     fprintf(stderr,"grasbw_request(): Peer reported error %s (%s).\n",
311             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
312     gras_msg_free(answer);
313     gras_sock_close(sock);
314     return errcode;
315   }
316
317   /*  fprintf(stderr,"sec=%p",gras_msg_ctn(answer,1,0,msgResult_t)); */
318   *sec=gras_msg_ctn(answer,1,0,msgResult_t).value;
319   *bw=gras_msg_ctn(answer,1,1,msgResult_t).value;
320
321   gras_msg_free(answer);
322   gras_sock_close(sock);
323   return no_error;
324 }
325
326 int grasbw_cbBWRequest(gras_msg_t *msg) {
327   /* specification of the test to run */
328   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
329   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
330
331   unsigned int bufSize=gras_msg_ctn(msg,1,0,BwExp_t).bufSize;
332   unsigned int expSize=gras_msg_ctn(msg,1,0,BwExp_t).expSize;
333   unsigned int msgSize=gras_msg_ctn(msg,1,0,BwExp_t).msgSize;
334   /* our answer */
335   msgError_t *error;
336   msgResult_t *res;
337
338   if (!(error=(msgError_t *)malloc(sizeof(msgError_t))) ||
339       !(res=(msgResult_t *)malloc(sizeof(msgResult_t) * 2))) {
340     fprintf(stderr,"%s:%d:grasbw_cbRequest: Malloc error\n",__FILE__,__LINE__);
341     return malloc_error;    
342   }
343
344   if ((error->errcode=grasbw_test(to_name,to_port,bufSize,expSize,msgSize,
345                                   &(res[0].value),&(res[1].value) ))) {
346     fprintf(stderr,
347             "%s:%d:grasbw_cbRequest: Error %s encountered while doing the test\n",
348             __FILE__,__LINE__,xbt_error_name(error->errcode));
349     strncpy(error->errmsg,"Error within grasbw_test",ERRMSG_LEN);
350     gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
351                    error,1,
352                    res,2);
353     return 1;
354   }
355   res[0].timestamp = (unsigned int) gras_time();
356   res[1].timestamp = (unsigned int) gras_time();
357   gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
358                  error,1,
359                  res,2);
360   gras_msg_free(msg);
361   return 1;
362 }
363
364 /* ***************************************************************************
365  * Link saturation
366  * ***************************************************************************/
367
368 xbt_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
369                                   const char* to_name,unsigned int to_port,
370                                   unsigned int msgSize, unsigned int timeout) {
371   gras_sock_t *sock;
372   xbt_error_t errcode;
373   /* The request */
374   SatExp_t *request;
375   msgHost_t *target;
376   /* answer */
377   gras_msg_t *answer;
378
379   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
380     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
381             __FILE__,__LINE__,xbt_error_name(errcode));
382     return errcode;
383   }
384   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
385       !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
386     fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
387     gras_sock_close(sock);
388     return malloc_error;    
389   }
390
391   request->timeout=timeout;
392   request->msgSize=msgSize;
393
394   strcpy(target->host,to_name);
395   target->port=to_port;
396
397   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2, 
398                               target,1,
399                               request,1))) {
400     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
401             __FILE__,__LINE__,xbt_error_name(errcode));
402     gras_sock_close(sock);
403     return errcode;
404   }
405   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
406     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
407             __FILE__,__LINE__,xbt_error_name(errcode));
408     gras_sock_close(sock);
409     return errcode;
410   }
411
412   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
413     fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
414             __FILE__,__LINE__,xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
415     gras_msg_free(answer);
416     gras_sock_close(sock);
417     return errcode;
418   }
419
420   gras_msg_free(answer);
421   gras_sock_close(sock);
422   return no_error;
423 }
424
425 int grasbw_cbSatStart(gras_msg_t *msg) {
426   gras_rawsock_t *raw;
427   gras_sock_t *sock;
428   xbt_error_t errcode;
429   double start; /* time to timeout */
430
431   /* specification of the test to run */
432   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
433   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
434
435   unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
436   unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
437   unsigned int raw_port;
438
439   /* The request */
440   SatExp_t *request;
441   /* answer */
442   gras_msg_t *answer;
443
444   /*
445   fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
446   fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
447           msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
448           msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
449   */
450
451   /* Negociate the saturation with the peer */
452   if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
453     fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
454             xbt_error_name(errcode));
455     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
456                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
457                      errcode,"Cannot contact peer.\n");
458     return 1;
459   }
460   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
461     fprintf(stderr,"cbSatStart(): Malloc error\n");
462     gras_sock_close(sock);
463     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
464                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
465                      malloc_error,"Cannot build request.\n");
466     return 1;    
467   }
468
469   request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
470   request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
471
472   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1, 
473                               request,1))) {
474     fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
475             xbt_error_name(errcode));
476     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
477                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
478                      errcode,"Cannot send request.\n");
479     gras_sock_close(sock);
480     return 1;
481   }
482
483   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
484     fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
485             xbt_error_name(errcode));
486     gras_sock_close(sock);
487
488     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
489                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
490                      errcode,
491                      "Cannot receive the ACK.\n");
492     return 1;
493   }
494
495   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
496     fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
497             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
498
499     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
500                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
501                      errcode,
502                      "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
503     gras_msg_free(answer);
504     gras_sock_close(sock);
505     return 1;
506   }
507
508   raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
509
510   if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
511     fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
512             xbt_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
513
514     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
515                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
516                      errcode,"Cannot open raw socket.\n");
517     gras_sock_close(sock);
518     return 1;
519   }
520
521   /* send a train of data before repporting that XP is started */
522   if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
523     fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
524     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
525                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
526                      errcode,"Cannot raw send.\n");
527     gras_sock_close(sock);
528     gras_rawsock_close(raw);
529     return 1;
530   }
531   
532   grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
533                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
534                    no_error,"Saturation started");
535   gras_msg_free(answer);
536   gras_msg_free(msg);
537   
538   /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
539   start=gras_time();
540   while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error && 
541          gras_time()-start < timeout) {
542     if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
543       fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
544       /* our error message do not interess anyone. SAT_STOP will do nothing. */
545       gras_sock_close(sock);
546       gras_rawsock_close(raw);
547       return 1;
548     } 
549   }
550   if (gras_time()-start > timeout) {
551     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
552     gras_sock_close(sock);
553     gras_rawsock_close(raw);
554     return 1;
555   }
556
557   /* Handle the SAT_STOP which broke the previous while */
558   
559   if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
560     fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
561
562     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
563                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
564                      errcode,"Sending SAT_END to peer failed.\n");
565     gras_sock_close(sock);
566     gras_rawsock_close(raw);
567     return 1;
568   }
569   
570   if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
571     fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
572
573     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
574                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
575                      errcode,"Receiving SAT_ENDED from peer failed.\n");
576     gras_sock_close(sock);
577     gras_rawsock_close(raw);
578     return 1;
579   }
580   grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
581                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
582                    no_error,"");
583
584   gras_sock_close(sock);
585   gras_rawsock_close(raw);
586   gras_msg_free(answer);
587   gras_msg_free(msg);
588
589   return 1;  
590 }
591
592 int grasbw_cbSatBegin(gras_msg_t *msg) {
593   gras_rawsock_t *raw;
594   xbt_error_t errcode;
595   double start; /* timer */
596   /* request */
597   unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
598   unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
599   /* answer */
600   SatExp_t *request;
601   msgError_t *error;
602
603   if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
604       !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
605     fprintf(stderr,"cbSatBegin(): Malloc error\n");
606     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
607                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
608                      malloc_error,"Malloc error");
609     return 1;
610   }
611
612   if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) { 
613     fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
614             xbt_error_name(errcode));
615     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
616                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
617                      errcode,"Cannot open raw socket");
618     return 1;
619   }
620   request->port=gras_rawsock_get_peer_port(raw);
621   request->msgSize=msgSize;
622   error->errcode=no_error;
623   error->errmsg[0]='\0';
624   if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
625                               error,1,
626                               request,1))) {
627     fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
628             xbt_error_name(errcode));
629     return 1;
630   }
631   gras_msg_free(msg);
632
633   start=gras_time();
634   while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
635          gras_time() - start < timeout) {
636     errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
637     if (errcode != timeout_error && errcode != no_error) {
638       fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",xbt_error_name(errcode));
639       /* our error message do not interess anyone. SAT_END will do nothing. */
640       /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
641       return 1;
642     } 
643   }
644   if (gras_time()-start > timeout) {
645     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
646     gras_rawsock_close(raw);
647     return 1;
648   }
649
650   grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
651                    "cbSatBegin: Cannot send SAT_ENDED.\n",
652                    no_error,"");
653   gras_rawsock_close(raw);
654   gras_msg_free(msg);
655   return 1;
656 }
657
658 xbt_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
659                                  const char* to_name,unsigned int to_port) {
660   xbt_error_t errcode;
661   gras_sock_t *sock;
662   gras_msg_t *answer;
663
664   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
665     fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
666             xbt_error_name(errcode));
667     return errcode;
668   }
669
670   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
671     fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
672             xbt_error_name(errcode));
673     gras_sock_close(sock);
674     return errcode;
675   }
676
677   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
678     fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
679             xbt_error_name(errcode));
680     gras_sock_close(sock);
681     return errcode;
682   }
683
684   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
685     fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
686             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
687     gras_msg_free(answer);
688     gras_sock_close(sock);
689     return errcode;
690   }
691
692   gras_msg_free(answer);
693   gras_sock_close(sock);
694
695   return no_error;
696 }
697 #endif