Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Initial revision
[simgrid.git] / src / amok / bandwidth.c
1 /* $Id$ */
2
3 /* gras_bandwidth - GRAS mecanism to do Bandwidth tests between to hosts    */
4
5 /* Authors: Martin Quinson                                                  */
6 /* Copyright (C) 2003 the OURAGAN project.                                  */
7
8 /* This program is free software; you can redistribute it and/or modify it
9    under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <string.h>
14
15 #include <gras.h>
16
17 /**
18  * BwExp_t:
19  *
20  * Description of a BW experiment (payload when asking an host to do a BW experiment with us)
21  */
22 typedef struct {
23   unsigned int bufSize;
24   unsigned int expSize;
25   unsigned int msgSize;
26   unsigned int port;    /* raw socket to use */
27 } BwExp_t;
28
29 static const DataDescriptor BwExp_Desc[] = 
30  { SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(BwExp_t,bufSize)),
31    SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(BwExp_t,expSize)),
32    SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(BwExp_t,msgSize)),
33    SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(BwExp_t,port))};
34 #define BwExp_Len 4
35
36 /**
37  * SatExp_t:
38  *
39  * Description of a BW experiment (payload when asking an host to do a BW experiment with us)
40  */
41 typedef struct {
42   unsigned int msgSize;
43   unsigned int timeout;
44   unsigned int port;    /* raw socket to use */
45 } SatExp_t;
46
47 static const DataDescriptor SatExp_Desc[] = 
48  { SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(SatExp_t,msgSize)),
49    SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(SatExp_t,timeout)),
50    SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(SatExp_t,port))};
51 #define SatExp_Len 3
52
53
54 /* Prototypes of local callbacks */
55 int grasbw_cbBWHandshake(gras_msg_t *msg);
56 int grasbw_cbBWRequest(gras_msg_t *msg);
57
58 int grasbw_cbSatStart(gras_msg_t *msg);
59 int grasbw_cbSatBegin(gras_msg_t *msg);
60
61 /**** code ****/
62 gras_error_t grasbw_register_messages(void) {
63   gras_error_t errcode;
64
65   
66   if ( /* Bandwidth */
67       (errcode=gras_msgtype_register(GRASMSG_BW_REQUEST,"BW request",2,
68                                msgHostDesc,msgHostLen,
69                                BwExp_Desc,BwExp_Len)) ||
70       (errcode=gras_msgtype_register(GRASMSG_BW_RESULT, "BW result",2,
71                                msgErrorDesc,msgErrorLen,
72                                msgResultDesc,msgResultLen /* first=seconds, second=bw */)) ||
73       
74       (errcode=gras_msgtype_register(GRASMSG_BW_HANDSHAKE, "BW handshake",1,
75                                BwExp_Desc,BwExp_Len))  || 
76       (errcode=gras_msgtype_register(GRASMSG_BW_HANDSHAKED, "BW handshake ACK",1,
77                                BwExp_Desc,BwExp_Len)) ||
78       
79       /* Saturation */
80       (errcode=gras_msgtype_register(GRASMSG_SAT_START,"SAT_START",2,
81                                msgHostDesc,msgHostLen,
82                                SatExp_Desc,SatExp_Len)) ||
83       (errcode=gras_msgtype_register(GRASMSG_SAT_STARTED, "SAT_STARTED",1,
84                                msgErrorDesc,msgErrorLen)) ||
85       
86       (errcode=gras_msgtype_register(GRASMSG_SAT_BEGIN,"SAT_BEGIN",1,
87                                SatExp_Desc,SatExp_Len)) ||
88       (errcode=gras_msgtype_register(GRASMSG_SAT_BEGUN, "SAT_BEGUN",2,
89                                msgErrorDesc,msgErrorLen,
90                                SatExp_Desc,SatExp_Len)) ||
91       
92       (errcode=gras_msgtype_register(GRASMSG_SAT_END,"SAT_END",0)) ||
93       (errcode=gras_msgtype_register(GRASMSG_SAT_ENDED, "SAT_ENDED",1,
94                                msgErrorDesc,msgErrorLen)) ||
95       
96       (errcode=gras_msgtype_register(GRASMSG_SAT_STOP,"SAT_STOP",0)) ||
97       (errcode=gras_msgtype_register(GRASMSG_SAT_STOPPED, "SAT_STOPPED",1,
98                                msgErrorDesc,msgErrorLen)) ) 
99     {
100
101       fprintf(stderr,"GRASBW: Unable register the messages (got error %s)\n",
102               gras_error_name(errcode));
103       return errcode;
104     }
105
106   if ((errcode=gras_cb_register(GRASMSG_BW_HANDSHAKE,-1,&grasbw_cbBWHandshake)) ||
107       (errcode=gras_cb_register(GRASMSG_BW_REQUEST,-1,&grasbw_cbBWRequest))  ||
108
109       (errcode=gras_cb_register(GRASMSG_SAT_START,-1,&grasbw_cbSatStart))  ||
110       (errcode=gras_cb_register(GRASMSG_SAT_BEGIN,-1,&grasbw_cbSatBegin)) ) {
111
112     fprintf(stderr,"GRASBW: Unable register the callbacks (got error %s)\n",
113             gras_error_name(errcode));
114     return errcode;
115   }
116
117   return no_error;
118 }
119
120 /* ***************************************************************************
121  * Bandwidth tests
122  * ***************************************************************************/
123 /* Function to do a test from local to given host */
124 gras_error_t grasbw_test(const char*to_name,unsigned int to_port,
125                         unsigned int bufSize,unsigned int expSize,unsigned int msgSize,
126                         /*OUT*/ double *sec, double *bw) {
127   gras_rawsock_t *rawIn,*rawOut;
128   gras_sock_t *sock;
129   gras_error_t errcode;
130   BwExp_t *request;
131   gras_msg_t *answer;
132   
133   if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
134     fprintf(stderr,"grasbw_test(): Error %s encountered while contacting peer\n",
135             gras_error_name(errcode));
136     return errcode;
137   }
138   if ((errcode=gras_rawsock_server_open(6666,8000,bufSize,&rawIn))) { 
139     fprintf(stderr,"grasbw_test(): Error %s encountered while opening a raw socket\n",
140             gras_error_name(errcode));
141     return errcode;
142   }
143
144   if (!(request=(BwExp_t *)malloc(sizeof(BwExp_t)))) {
145     fprintf(stderr,"grasbw_test(): Malloc error\n");
146     gras_sock_close(sock);
147     return malloc_error;    
148   }
149   request->bufSize=bufSize;
150   request->expSize=expSize;
151   request->msgSize=msgSize;
152   request->port=gras_rawsock_get_peer_port(rawIn);
153
154   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_BW_HANDSHAKE, 1, 
155                               request,1))) {
156     fprintf(stderr,"grasbw_test(): Error %s encountered while sending the request.\n",
157             gras_error_name(errcode));
158     gras_sock_close(sock);
159     return errcode;
160   }
161   if ((errcode=gras_msg_wait(60,GRASMSG_BW_HANDSHAKED,&answer))) {
162     fprintf(stderr,"grasbw_test(): Error %s encountered while waiting for the answer.\n",
163             gras_error_name(errcode));
164     gras_sock_close(sock);
165     return errcode;
166   }
167   if((errcode=gras_rawsock_client_open(to_name,gras_msg_ctn(answer,0,0,BwExp_t).port,
168                                     bufSize,&rawOut))) {
169     fprintf(stderr,"grasbw_test(): Error %s encountered while opening the raw socket to %s:%d\n",
170             gras_error_name(errcode),to_name,gras_msg_ctn(answer,0,0,BwExp_t).port);
171     return errcode;
172   }
173
174   *sec=gras_time();
175   if ((errcode=gras_rawsock_send(rawOut,expSize,msgSize)) ||
176       (errcode=gras_rawsock_recv(rawIn,1,1,120))) {
177     fprintf(stderr,"grasbw_test(): Error %s encountered while sending the experiment.\n",
178             gras_error_name(errcode));
179     gras_rawsock_close(rawOut);
180     gras_rawsock_close(rawIn);
181     return errcode;
182   }
183   *sec = gras_time() - *sec;
184   *bw = ((double)expSize /* 8.0*/) / *sec / (1024.0 *1024.0);
185
186   gras_rawsock_close(rawIn);
187   gras_rawsock_close(rawOut);
188   gras_sock_close(sock);
189   gras_msg_free(answer);
190   return no_error;
191 }
192
193 /* Callback to the GRASMSG_BW_HANDSHAKE message: 
194    opens a server raw socket,
195    indicate its port in an answer GRASMSG_BW_HANDSHAKED message,
196    receive the corresponding data on the raw socket, 
197    close the raw socket
198 */
199 int grasbw_cbBWHandshake(gras_msg_t *msg) {
200   gras_rawsock_t *rawIn,*rawOut;
201   BwExp_t *ans;
202   gras_error_t errcode;
203   
204   if ((errcode=gras_rawsock_server_open(6666,8000,gras_msg_ctn(msg,0,0,BwExp_t).bufSize,&rawIn))) { 
205     fprintf(stderr,"grasbw_cbHandshake(): Error %s encountered while opening a raw socket\n",
206             gras_error_name(errcode));
207     return 1;
208   }
209   if ((errcode=gras_rawsock_client_open(gras_sock_get_peer_name(msg->sock),gras_msg_ctn(msg,0,0,BwExp_t).port,
210                                      gras_msg_ctn(msg,0,0,BwExp_t).bufSize,&rawOut))) { 
211     fprintf(stderr,"grasbw_cbHandshake(): Error %s encountered while opening a raw socket\n",
212             gras_error_name(errcode));
213     return 1;
214   }
215   if (!(ans=(BwExp_t *)malloc(sizeof(BwExp_t)))) {
216     fprintf(stderr,"grasbw_cbHandshake(): Malloc error.\n");
217     gras_rawsock_close(rawIn);
218     gras_rawsock_close(rawOut);
219     return 1;
220   }
221   ans->bufSize=gras_msg_ctn(msg,0,0,BwExp_t).bufSize;
222   ans->expSize=gras_msg_ctn(msg,0,0,BwExp_t).expSize;
223   ans->msgSize=gras_msg_ctn(msg,0,0,BwExp_t).msgSize;
224   ans->port=gras_rawsock_get_peer_port(rawIn);
225   //  fprintf(stderr,"grasbw_cbHandshake. bufSize=%d expSize=%d msgSize=%d port=%d\n",
226   //      ans->bufSize,ans->expSize,ans->msgSize,ans->port);
227
228   if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_BW_HANDSHAKED, 1,
229                               ans, 1))) {
230     fprintf(stderr,"grasbw_cbHandshake(): Error %s encountered while sending the answer.\n",
231             gras_error_name(errcode));
232     gras_rawsock_close(rawIn);
233     gras_rawsock_close(rawOut);
234     return 1;
235   }
236     
237   if ((errcode=gras_rawsock_recv(rawIn,
238                                gras_msg_ctn(msg,0,0,BwExp_t).expSize,
239                                gras_msg_ctn(msg,0,0,BwExp_t).msgSize,
240                                120)) ||
241       (errcode=gras_rawsock_send(rawOut,1,1))) {
242     fprintf(stderr,"grasbw_cbHandshake(): Error %s encountered while receiving the experiment.\n",
243             gras_error_name(errcode));
244     gras_rawsock_close(rawIn);
245     gras_rawsock_close(rawOut);
246     return 1;
247   }
248   gras_msg_free(msg);
249   gras_rawsock_close(rawIn);
250   gras_rawsock_close(rawOut);
251   return 1;
252 }
253
254 /* function to request a BW test between to external hosts */
255 gras_error_t grasbw_request(const char* from_name,unsigned int from_port,
256                            const char* to_name,unsigned int to_port,
257                            unsigned int bufSize,unsigned int expSize,unsigned int msgSize,
258                            /*OUT*/ double *sec, double*bw) {
259   
260   gras_sock_t *sock;
261   gras_msg_t *answer;
262   gras_error_t errcode;
263   /* The request */
264   BwExp_t *request;
265   msgHost_t *target;
266
267   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
268     fprintf(stderr,"grasbw_request(): Error %s encountered while contacting the actuator\n",
269             gras_error_name(errcode));
270     return errcode;
271   }
272   if (!(request=(BwExp_t *)malloc(sizeof(BwExp_t))) ||
273       !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
274     fprintf(stderr,"grasbw_test(): Malloc error\n");
275     gras_sock_close(sock);
276     return malloc_error;    
277   }
278
279   request->bufSize=bufSize;
280   request->expSize=expSize;
281   request->msgSize=msgSize;
282   strcpy(target->host,to_name);
283   target->port=to_port;
284   
285   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_BW_REQUEST, 2, 
286                               target,1,
287                               request,1))) {
288     fprintf(stderr,"grasbw_request(): Error %s encountered while sending the request.\n",
289             gras_error_name(errcode));
290     gras_sock_close(sock);
291     return errcode;
292   }
293   if ((errcode=gras_msg_wait(240,GRASMSG_BW_RESULT,&answer))) {
294     fprintf(stderr,"grasbw_request(): Error %s encountered while waiting for the answer.\n",
295             gras_error_name(errcode));
296     gras_sock_close(sock);
297     return errcode;
298   }
299
300   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
301     fprintf(stderr,"grasbw_request(): Peer reported error %s (%s).\n",
302             gras_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
303     gras_msg_free(answer);
304     gras_sock_close(sock);
305     return errcode;
306   }
307
308   //  fprintf(stderr,"sec=%p",gras_msg_ctn(answer,1,0,msgResult_t));
309   *sec=gras_msg_ctn(answer,1,0,msgResult_t).value;
310   *bw=gras_msg_ctn(answer,1,1,msgResult_t).value;
311
312   gras_msg_free(answer);
313   gras_sock_close(sock);
314   return no_error;
315 }
316
317 int grasbw_cbBWRequest(gras_msg_t *msg) {
318   /* specification of the test to run */
319   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
320   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
321
322   unsigned int bufSize=gras_msg_ctn(msg,1,0,BwExp_t).bufSize;
323   unsigned int expSize=gras_msg_ctn(msg,1,0,BwExp_t).expSize;
324   unsigned int msgSize=gras_msg_ctn(msg,1,0,BwExp_t).msgSize;
325   /* our answer */
326   msgError_t *error;
327   msgResult_t *res;
328
329   if (!(error=(msgError_t *)malloc(sizeof(msgError_t))) ||
330       !(res=(msgResult_t *)malloc(sizeof(msgResult_t) * 2))) {
331     fprintf(stderr,"%s:%d:grasbw_cbRequest: Malloc error\n",__FILE__,__LINE__);
332     return malloc_error;    
333   }
334
335   if ((error->errcode=grasbw_test(to_name,to_port,bufSize,expSize,msgSize,
336                                   &(res[0].value),&(res[1].value) ))) {
337     fprintf(stderr,
338             "%s:%d:grasbw_cbRequest: Error %s encountered while doing the test\n",
339             __FILE__,__LINE__,gras_error_name(error->errcode));
340     strncpy(error->errmsg,"Error within grasbw_test",ERRMSG_LEN);
341     gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
342                    error,1,
343                    res,2);
344     return 1;
345   }
346   res[0].timestamp = (unsigned int) gras_time();
347   res[1].timestamp = (unsigned int) gras_time();
348   gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
349                  error,1,
350                  res,2);
351   gras_msg_free(msg);
352   return 1;
353 }
354
355 /* ***************************************************************************
356  * Link saturation
357  * ***************************************************************************/
358
359 gras_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
360                                   const char* to_name,unsigned int to_port,
361                                   unsigned int msgSize, unsigned int timeout) {
362   gras_sock_t *sock;
363   gras_error_t errcode;
364   /* The request */
365   SatExp_t *request;
366   msgHost_t *target;
367   /* answer */
368   gras_msg_t *answer;
369
370   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
371     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
372             __FILE__,__LINE__,gras_error_name(errcode));
373     return errcode;
374   }
375   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
376       !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
377     fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
378     gras_sock_close(sock);
379     return malloc_error;    
380   }
381
382   request->timeout=timeout;
383   request->msgSize=msgSize;
384
385   strcpy(target->host,to_name);
386   target->port=to_port;
387
388   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2, 
389                               target,1,
390                               request,1))) {
391     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
392             __FILE__,__LINE__,gras_error_name(errcode));
393     gras_sock_close(sock);
394     return errcode;
395   }
396   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
397     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
398             __FILE__,__LINE__,gras_error_name(errcode));
399     gras_sock_close(sock);
400     return errcode;
401   }
402
403   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
404     fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
405             __FILE__,__LINE__,gras_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
406     gras_msg_free(answer);
407     gras_sock_close(sock);
408     return errcode;
409   }
410
411   gras_msg_free(answer);
412   gras_sock_close(sock);
413   return no_error;
414 }
415
416 int grasbw_cbSatStart(gras_msg_t *msg) {
417   gras_rawsock_t *raw;
418   gras_sock_t *sock;
419   gras_error_t errcode;
420   double start; /* time to timeout */
421
422   /* specification of the test to run */
423   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
424   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
425
426   unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
427   unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
428   unsigned int raw_port;
429
430   /* The request */
431   SatExp_t *request;
432   /* answer */
433   gras_msg_t *answer;
434
435   /*
436   fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
437   fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
438           msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
439           msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
440   */
441
442   /* Negociate the saturation with the peer */
443   if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
444     fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
445             gras_error_name(errcode));
446     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
447                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
448                      errcode,"Cannot contact peer.\n");
449     return 1;
450   }
451   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
452     fprintf(stderr,"cbSatStart(): Malloc error\n");
453     gras_sock_close(sock);
454     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
455                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
456                      malloc_error,"Cannot build request.\n");
457     return 1;    
458   }
459
460   request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
461   request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
462
463   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1, 
464                               request,1))) {
465     fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
466             gras_error_name(errcode));
467     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
468                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
469                      errcode,"Cannot send request.\n");
470     gras_sock_close(sock);
471     return 1;
472   }
473
474   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
475     fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
476             gras_error_name(errcode));
477     gras_sock_close(sock);
478
479     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
480                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
481                      errcode,
482                      "Cannot receive the ACK.\n");
483     return 1;
484   }
485
486   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
487     fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
488             gras_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
489
490     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
491                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
492                      errcode,
493                      "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
494     gras_msg_free(answer);
495     gras_sock_close(sock);
496     return 1;
497   }
498
499   raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
500
501   if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
502     fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
503             gras_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
504
505     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
506                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
507                      errcode,"Cannot open raw socket.\n");
508     gras_sock_close(sock);
509     return 1;
510   }
511
512   /* send a train of data before repporting that XP is started */
513   if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
514     fprintf(stderr,"cbSatStart: Failure %s during raw send\n",gras_error_name(errcode));
515     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
516                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
517                      errcode,"Cannot raw send.\n");
518     gras_sock_close(sock);
519     gras_rawsock_close(raw);
520     return 1;
521   }
522   
523   grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
524                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
525                    no_error,"Saturation started");
526   gras_msg_free(answer);
527   gras_msg_free(msg);
528   
529   /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
530   start=gras_time();
531   while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error && 
532          gras_time()-start < timeout) {
533     if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
534       fprintf(stderr,"cbSatStart: Failure %s during raw send\n",gras_error_name(errcode));
535       /* our error message do not interess anyone. SAT_STOP will do nothing. */
536       gras_sock_close(sock);
537       gras_rawsock_close(raw);
538       return 1;
539     } 
540   }
541   if (gras_time()-start > timeout) {
542     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
543     gras_sock_close(sock);
544     gras_rawsock_close(raw);
545     return 1;
546   }
547
548   /* Handle the SAT_STOP which broke the previous while */
549   
550   if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
551     fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
552
553     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
554                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
555                      errcode,"Sending SAT_END to peer failed.\n");
556     gras_sock_close(sock);
557     gras_rawsock_close(raw);
558     return 1;
559   }
560   
561   if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
562     fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
563
564     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
565                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
566                      errcode,"Receiving SAT_ENDED from peer failed.\n");
567     gras_sock_close(sock);
568     gras_rawsock_close(raw);
569     return 1;
570   }
571   grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
572                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
573                    no_error,"");
574
575   gras_sock_close(sock);
576   gras_rawsock_close(raw);
577   gras_msg_free(answer);
578   gras_msg_free(msg);
579
580   return 1;  
581 }
582
583 int grasbw_cbSatBegin(gras_msg_t *msg) {
584   gras_rawsock_t *raw;
585   gras_error_t errcode;
586   double start; /* timer */
587   /* request */
588   unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
589   unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
590   /* answer */
591   SatExp_t *request;
592   msgError_t *error;
593
594   if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
595       !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
596     fprintf(stderr,"cbSatBegin(): Malloc error\n");
597     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
598                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
599                      malloc_error,"Malloc error");
600     return 1;
601   }
602
603   if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) { 
604     fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
605             gras_error_name(errcode));
606     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
607                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
608                      errcode,"Cannot open raw socket");
609     return 1;
610   }
611   request->port=gras_rawsock_get_peer_port(raw);
612   request->msgSize=msgSize;
613   error->errcode=no_error;
614   error->errmsg[0]='\0';
615   if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
616                               error,1,
617                               request,1))) {
618     fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
619             gras_error_name(errcode));
620     return 1;
621   }
622   gras_msg_free(msg);
623
624   start=gras_time();
625   while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
626          gras_time() - start < timeout) {
627     errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
628     if (errcode != timeout_error && errcode != no_error) {
629       fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",gras_error_name(errcode));
630       /* our error message do not interess anyone. SAT_END will do nothing. */
631       /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
632       return 1;
633     } 
634   }
635   if (gras_time()-start > timeout) {
636     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
637     gras_rawsock_close(raw);
638     return 1;
639   }
640
641   grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
642                    "cbSatBegin: Cannot send SAT_ENDED.\n",
643                    no_error,"");
644   gras_rawsock_close(raw);
645   gras_msg_free(msg);
646   return 1;
647 }
648
649 gras_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
650                                  const char* to_name,unsigned int to_port) {
651   gras_error_t errcode;
652   gras_sock_t *sock;
653   gras_msg_t *answer;
654
655   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
656     fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
657             gras_error_name(errcode));
658     return errcode;
659   }
660
661   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
662     fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
663             gras_error_name(errcode));
664     gras_sock_close(sock);
665     return errcode;
666   }
667
668   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
669     fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
670             gras_error_name(errcode));
671     gras_sock_close(sock);
672     return errcode;
673   }
674
675   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
676     fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
677             gras_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
678     gras_msg_free(answer);
679     gras_sock_close(sock);
680     return errcode;
681   }
682
683   gras_msg_free(answer);
684   gras_sock_close(sock);
685
686   return no_error;
687 }