Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
rename raw sock to meas ones; do declare msgtypes on all hosts in SG, not only the...
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003, 2004 Martin Quinson. All rights reserved.            */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "amok/Bandwidth/bandwidth_private.h"
11 #include "gras/messages.h"
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
14
15 static short _amok_bw_initialized = 0;
16
17 /**** code ****/
18 void amok_bw_init(void) {
19   gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
20
21   amok_base_init();
22
23   if (! _amok_bw_initialized) {
24         
25      /* Build the datatype descriptions */ 
26      bw_request_desc = gras_datadesc_struct("s_bw_request_t");
27      gras_datadesc_struct_append(bw_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
28      gras_datadesc_struct_append(bw_request_desc,"buf_size",gras_datadesc_by_name("unsigned int"));
29      gras_datadesc_struct_append(bw_request_desc,"exp_size",gras_datadesc_by_name("unsigned int"));
30      gras_datadesc_struct_append(bw_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
31      gras_datadesc_struct_close(bw_request_desc);
32      bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
33
34      bw_res_desc = gras_datadesc_struct("s_bw_res_t");
35      gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("s_amok_remoterr_t"));
36      gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
37      gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
38      gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
39      gras_datadesc_struct_close(bw_res_desc);
40      bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
41
42      sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
43      gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
44      gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
45      gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
46      gras_datadesc_struct_close(sat_request_desc);
47      sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
48      
49      /* Register the bandwidth messages */
50      gras_msgtype_declare("BW request",       bw_request_desc);
51      gras_msgtype_declare("BW result",        bw_res_desc);
52      gras_msgtype_declare("BW handshake",     bw_request_desc);
53      gras_msgtype_declare("BW handshake ACK", bw_request_desc);
54      
55      /* Register the saturation messages */
56      gras_msgtype_declare("SAT start",   sat_request_desc);
57      gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
58      gras_msgtype_declare("SAT begin",   sat_request_desc);
59      gras_msgtype_declare("SAT begun",   gras_datadesc_by_name("amok_remoterr_t"));
60      gras_msgtype_declare("SAT end",     NULL);
61      gras_msgtype_declare("SAT ended",   gras_datadesc_by_name("amok_remoterr_t"));
62      gras_msgtype_declare("SAT stop",    NULL);
63      gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
64   }
65    
66   /* Register the callbacks */
67   gras_cb_register(gras_msgtype_by_name("BW request"),
68                    &amok_bw_cb_bw_request);
69   gras_cb_register(gras_msgtype_by_name("BW handshake"),
70                    &amok_bw_cb_bw_handshake);
71
72   gras_cb_register(gras_msgtype_by_name("SAT start"),
73                    &amok_bw_cb_sat_start);
74   gras_cb_register(gras_msgtype_by_name("SAT begin"),
75                    &amok_bw_cb_sat_begin);
76   
77   _amok_bw_initialized =1;
78 }
79
80 void amok_bw_exit(void) {
81   if (! _amok_bw_initialized)
82     return;
83    
84   gras_cb_unregister(gras_msgtype_by_name("BW request"),
85                      &amok_bw_cb_bw_request);
86   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
87                      &amok_bw_cb_bw_handshake);
88
89   gras_cb_unregister(gras_msgtype_by_name("SAT start"),
90                      &amok_bw_cb_sat_start);
91   gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
92                      &amok_bw_cb_sat_begin);
93
94   _amok_bw_initialized = 0;
95 }
96
97    
98
99 /* ***************************************************************************
100  * Bandwidth tests
101  * ***************************************************************************/
102
103 /**
104  * amok_bw_test:
105  * 
106  * Conduct a test between the local host and @peer, and 
107  * report the result in last args
108  */
109 xbt_error_t amok_bw_test(gras_socket_t peer,
110                           unsigned int buf_size,unsigned int exp_size,unsigned int msg_size,
111                           /*OUT*/ double *sec, double *bw) {
112   gras_socket_t measIn,measOut; /* Measurement sockets for the experiments */
113   gras_socket_t sock_dummy; /* ignored arg to msg_wait */
114   int port;
115   xbt_error_t errcode;
116   bw_request_t request,request_ack;
117   
118   for (port = 5000, errcode = system_error;
119        errcode == system_error;
120        errcode = gras_socket_server_ext(++port,buf_size,1,&measIn));
121   if (errcode != no_error) {
122     ERROR1("Error %s encountered while opening a measurement socket",
123            xbt_error_name(errcode));
124     return errcode;
125   }
126         
127   request=xbt_new0(s_bw_request_t,1);
128   request->buf_size=buf_size;
129   request->exp_size=exp_size;
130   request->msg_size=msg_size;
131   request->host.name = NULL;
132   request->host.port = gras_socket_my_port(measIn);
133   INFO3("Handshaking with %s:%d to connect it back on my %d", 
134         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port);
135
136   if ((errcode=gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request))) {
137     ERROR1("Error %s encountered while sending the BW request.", xbt_error_name(errcode));
138     return errcode;
139   }
140   if ((errcode=gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),&sock_dummy,&request_ack))) {
141     ERROR1("Error %s encountered while waiting for the answer to BW request.\n",
142             xbt_error_name(errcode));
143     return errcode;
144   }
145    
146   /* FIXME: What if there is a remote error? */
147    
148   if((errcode=gras_socket_client_ext(gras_socket_peer_name(peer),request_ack->host.port, buf_size,1,&measOut))) {
149     ERROR3("Error %s encountered while opening the measurement socket to %s:%d for BW test\n",
150             xbt_error_name(errcode),gras_socket_peer_name(peer),request_ack->host.port);
151     return errcode;
152   }
153   free(request_ack);
154   INFO0("Got ACK");
155
156   *sec=gras_os_time();
157   if ((errcode=gras_socket_meas_send(measOut,120,exp_size,msg_size)) ||
158       (errcode=gras_socket_meas_recv(measIn,120,1,1))) {
159     ERROR1("Error %s encountered while sending the BW experiment.",
160             xbt_error_name(errcode));
161     gras_socket_close(measOut);
162     gras_socket_close(measIn);
163     return errcode;
164   }
165   *sec = gras_os_time() - *sec;
166   *bw = ((double)exp_size /* 8.0*/) / *sec / (1024.0 *1024.0);
167 INFO0("DOOONE");
168    
169   gras_socket_close(measIn);
170   gras_socket_close(measOut);
171   return no_error;
172 }
173
174
175 /* Callback to the "BW handshake" message: 
176    opens a server measurement socket,
177    indicate its port in an "BW handshaked" message,
178    receive the corresponding data on the measurement socket, 
179    close the measurment socket
180 */
181 int amok_bw_cb_bw_handshake(gras_socket_t  expeditor,
182                             void          *payload) {
183   gras_socket_t measIn,measOut;
184   bw_request_t request=*(bw_request_t*)payload;
185   bw_request_t answer;
186   xbt_error_t errcode;
187   int port;
188   
189   INFO2("Handshaked to connect to %s:%d",
190         gras_socket_peer_name(expeditor),request->host.port);
191      
192   answer = xbt_new0(s_bw_request_t,1);
193   
194   for (port = 6000, errcode = system_error;
195        errcode == system_error;
196        errcode = gras_socket_server_ext(++port,request->buf_size,1,&measIn));
197   if (errcode != no_error) {
198     ERROR1("Error %s encountered while opening a measurement server socket", xbt_error_name(errcode));
199     /* FIXME: tell error to remote */
200     return 1;
201   }
202
203   if ((errcode=gras_socket_client_ext(gras_socket_peer_name(expeditor),request->host.port,
204                                       request->buf_size,1,&measOut))) { 
205     ERROR3("Error '%s' encountered while opening a measurement socket back to %s:%d", 
206            xbt_error_name(errcode),gras_socket_peer_name(expeditor),request->host.port);
207     /* FIXME: tell error to remote */
208     return 1;
209   }
210    
211   answer->buf_size=request->buf_size;
212   answer->exp_size=request->exp_size;
213   answer->msg_size=request->msg_size;
214   answer->host.port=gras_socket_my_port(measIn);
215
216   if ((errcode=gras_msg_send(expeditor,gras_msgtype_by_name("BW handshake ACK"),&answer))) {
217     ERROR1("Error %s encountered while sending the answer.",
218             xbt_error_name(errcode));
219     gras_socket_close(measIn);
220     gras_socket_close(measOut);
221     /* FIXME: tell error to remote */
222     return 1;
223   }
224   INFO4("BW handshake answered. buf_size=%d exp_size=%d msg_size=%d port=%d",
225         answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
226     
227   if ((errcode=gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size)) ||
228       (errcode=gras_socket_meas_send(measOut,120,1,1))) {
229     ERROR1("Error %s encountered while receiving the experiment.",
230             xbt_error_name(errcode));
231     gras_socket_close(measIn);
232     gras_socket_close(measOut);
233     /* FIXME: tell error to remote ? */
234     return 1;
235   }
236   gras_socket_close(measIn);
237   gras_socket_close(measOut);
238   return 1;
239 }
240
241 int amok_bw_cb_bw_request(gras_socket_t    expeditor,
242                           void            *payload) {
243    CRITICAL0("Not implemented");
244    return 1;
245 }
246
247 int amok_bw_cb_sat_start(gras_socket_t     expeditor,
248                          void             *payload) {
249    CRITICAL0("Not implemented");
250    return 1;
251
252 int amok_bw_cb_sat_begin(gras_socket_t     expeditor,
253                          void             *payload) {
254    CRITICAL0("Not implemented");
255    return 1;
256 }
257
258 #if 0
259 /* function to request a BW test between two external hosts */
260 xbt_error_t grasbw_request(const char* from_name,unsigned int from_port,
261                            const char* to_name,unsigned int to_port,
262                            unsigned int bufSize,unsigned int expSize,unsigned int msgSize,
263                            /*OUT*/ double *sec, double*bw) {
264   
265   gras_sock_t *sock;
266   gras_msg_t *answer;
267   xbt_error_t errcode;
268   /* The request */
269   BwExp_t *request;
270   msgHost_t *target;
271
272   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
273     fprintf(stderr,"grasbw_request(): Error %s encountered while contacting the actuator\n",
274             xbt_error_name(errcode));
275     return errcode;
276   }
277   if (!(request=(BwExp_t *)malloc(sizeof(BwExp_t))) ||
278       !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
279     fprintf(stderr,"grasbw_test(): Malloc error\n");
280     gras_sock_close(sock);
281     return malloc_error;    
282   }
283
284   request->bufSize=bufSize;
285   request->expSize=expSize;
286   request->msgSize=msgSize;
287   strcpy(target->host,to_name);
288   target->port=to_port;
289   
290   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_BW_REQUEST, 2, 
291                               target,1,
292                               request,1))) {
293     fprintf(stderr,"grasbw_request(): Error %s encountered while sending the request.\n",
294             xbt_error_name(errcode));
295     gras_sock_close(sock);
296     return errcode;
297   }
298   if ((errcode=gras_msg_wait(240,GRASMSG_BW_RESULT,&answer))) {
299     fprintf(stderr,"grasbw_request(): Error %s encountered while waiting for the answer.\n",
300             xbt_error_name(errcode));
301     gras_sock_close(sock);
302     return errcode;
303   }
304
305   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
306     fprintf(stderr,"grasbw_request(): Peer reported error %s (%s).\n",
307             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
308     gras_msg_free(answer);
309     gras_sock_close(sock);
310     return errcode;
311   }
312
313   /*  fprintf(stderr,"sec=%p",gras_msg_ctn(answer,1,0,msgResult_t)); */
314   *sec=gras_msg_ctn(answer,1,0,msgResult_t).value;
315   *bw=gras_msg_ctn(answer,1,1,msgResult_t).value;
316
317   gras_msg_free(answer);
318   gras_sock_close(sock);
319   return no_error;
320 }
321
322 int grasbw_cbBWRequest(gras_msg_t *msg) {
323   /* specification of the test to run */
324   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
325   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
326
327   unsigned int bufSize=gras_msg_ctn(msg,1,0,BwExp_t).bufSize;
328   unsigned int expSize=gras_msg_ctn(msg,1,0,BwExp_t).expSize;
329   unsigned int msgSize=gras_msg_ctn(msg,1,0,BwExp_t).msgSize;
330   /* our answer */
331   msgError_t *error;
332   msgResult_t *res;
333
334   if (!(error=(msgError_t *)malloc(sizeof(msgError_t))) ||
335       !(res=(msgResult_t *)malloc(sizeof(msgResult_t) * 2))) {
336     fprintf(stderr,"%s:%d:grasbw_cbRequest: Malloc error\n",__FILE__,__LINE__);
337     return malloc_error;    
338   }
339
340   if ((error->errcode=grasbw_test(to_name,to_port,bufSize,expSize,msgSize,
341                                   &(res[0].value),&(res[1].value) ))) {
342     fprintf(stderr,
343             "%s:%d:grasbw_cbRequest: Error %s encountered while doing the test\n",
344             __FILE__,__LINE__,xbt_error_name(error->errcode));
345     strncpy(error->errmsg,"Error within grasbw_test",ERRMSG_LEN);
346     gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
347                    error,1,
348                    res,2);
349     return 1;
350   }
351   res[0].timestamp = (unsigned int) gras_time();
352   res[1].timestamp = (unsigned int) gras_time();
353   gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
354                  error,1,
355                  res,2);
356   gras_msg_free(msg);
357   return 1;
358 }
359
360 /* ***************************************************************************
361  * Link saturation
362  * ***************************************************************************/
363
364 xbt_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
365                                   const char* to_name,unsigned int to_port,
366                                   unsigned int msgSize, unsigned int timeout) {
367   gras_sock_t *sock;
368   xbt_error_t errcode;
369   /* The request */
370   SatExp_t *request;
371   msgHost_t *target;
372   /* answer */
373   gras_msg_t *answer;
374
375   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
376     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
377             __FILE__,__LINE__,xbt_error_name(errcode));
378     return errcode;
379   }
380   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
381       !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
382     fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
383     gras_sock_close(sock);
384     return malloc_error;    
385   }
386
387   request->timeout=timeout;
388   request->msgSize=msgSize;
389
390   strcpy(target->host,to_name);
391   target->port=to_port;
392
393   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2, 
394                               target,1,
395                               request,1))) {
396     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
397             __FILE__,__LINE__,xbt_error_name(errcode));
398     gras_sock_close(sock);
399     return errcode;
400   }
401   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
402     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
403             __FILE__,__LINE__,xbt_error_name(errcode));
404     gras_sock_close(sock);
405     return errcode;
406   }
407
408   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
409     fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
410             __FILE__,__LINE__,xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
411     gras_msg_free(answer);
412     gras_sock_close(sock);
413     return errcode;
414   }
415
416   gras_msg_free(answer);
417   gras_sock_close(sock);
418   return no_error;
419 }
420
421 int grasbw_cbSatStart(gras_msg_t *msg) {
422   gras_rawsock_t *raw;
423   gras_sock_t *sock;
424   xbt_error_t errcode;
425   double start; /* time to timeout */
426
427   /* specification of the test to run */
428   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
429   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
430
431   unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
432   unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
433   unsigned int raw_port;
434
435   /* The request */
436   SatExp_t *request;
437   /* answer */
438   gras_msg_t *answer;
439
440   /*
441   fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
442   fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
443           msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
444           msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
445   */
446
447   /* Negociate the saturation with the peer */
448   if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
449     fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
450             xbt_error_name(errcode));
451     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
452                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
453                      errcode,"Cannot contact peer.\n");
454     return 1;
455   }
456   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
457     fprintf(stderr,"cbSatStart(): Malloc error\n");
458     gras_sock_close(sock);
459     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
460                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
461                      malloc_error,"Cannot build request.\n");
462     return 1;    
463   }
464
465   request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
466   request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
467
468   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1, 
469                               request,1))) {
470     fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
471             xbt_error_name(errcode));
472     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
473                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
474                      errcode,"Cannot send request.\n");
475     gras_sock_close(sock);
476     return 1;
477   }
478
479   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
480     fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
481             xbt_error_name(errcode));
482     gras_sock_close(sock);
483
484     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
485                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
486                      errcode,
487                      "Cannot receive the ACK.\n");
488     return 1;
489   }
490
491   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
492     fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
493             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
494
495     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
496                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
497                      errcode,
498                      "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
499     gras_msg_free(answer);
500     gras_sock_close(sock);
501     return 1;
502   }
503
504   raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
505
506   if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
507     fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
508             xbt_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
509
510     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
511                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
512                      errcode,"Cannot open raw socket.\n");
513     gras_sock_close(sock);
514     return 1;
515   }
516
517   /* send a train of data before repporting that XP is started */
518   if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
519     fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
520     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
521                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
522                      errcode,"Cannot raw send.\n");
523     gras_sock_close(sock);
524     gras_rawsock_close(raw);
525     return 1;
526   }
527   
528   grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
529                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
530                    no_error,"Saturation started");
531   gras_msg_free(answer);
532   gras_msg_free(msg);
533   
534   /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
535   start=gras_time();
536   while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error && 
537          gras_time()-start < timeout) {
538     if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
539       fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
540       /* our error message do not interess anyone. SAT_STOP will do nothing. */
541       gras_sock_close(sock);
542       gras_rawsock_close(raw);
543       return 1;
544     } 
545   }
546   if (gras_time()-start > timeout) {
547     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
548     gras_sock_close(sock);
549     gras_rawsock_close(raw);
550     return 1;
551   }
552
553   /* Handle the SAT_STOP which broke the previous while */
554   
555   if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
556     fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
557
558     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
559                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
560                      errcode,"Sending SAT_END to peer failed.\n");
561     gras_sock_close(sock);
562     gras_rawsock_close(raw);
563     return 1;
564   }
565   
566   if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
567     fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
568
569     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
570                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
571                      errcode,"Receiving SAT_ENDED from peer failed.\n");
572     gras_sock_close(sock);
573     gras_rawsock_close(raw);
574     return 1;
575   }
576   grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
577                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
578                    no_error,"");
579
580   gras_sock_close(sock);
581   gras_rawsock_close(raw);
582   gras_msg_free(answer);
583   gras_msg_free(msg);
584
585   return 1;  
586 }
587
588 int grasbw_cbSatBegin(gras_msg_t *msg) {
589   gras_rawsock_t *raw;
590   xbt_error_t errcode;
591   double start; /* timer */
592   /* request */
593   unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
594   unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
595   /* answer */
596   SatExp_t *request;
597   msgError_t *error;
598
599   if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
600       !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
601     fprintf(stderr,"cbSatBegin(): Malloc error\n");
602     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
603                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
604                      malloc_error,"Malloc error");
605     return 1;
606   }
607
608   if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) { 
609     fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
610             xbt_error_name(errcode));
611     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
612                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
613                      errcode,"Cannot open raw socket");
614     return 1;
615   }
616   request->port=gras_rawsock_get_peer_port(raw);
617   request->msgSize=msgSize;
618   error->errcode=no_error;
619   error->errmsg[0]='\0';
620   if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
621                               error,1,
622                               request,1))) {
623     fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
624             xbt_error_name(errcode));
625     return 1;
626   }
627   gras_msg_free(msg);
628
629   start=gras_time();
630   while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
631          gras_time() - start < timeout) {
632     errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
633     if (errcode != timeout_error && errcode != no_error) {
634       fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",xbt_error_name(errcode));
635       /* our error message do not interess anyone. SAT_END will do nothing. */
636       /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
637       return 1;
638     } 
639   }
640   if (gras_time()-start > timeout) {
641     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
642     gras_rawsock_close(raw);
643     return 1;
644   }
645
646   grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
647                    "cbSatBegin: Cannot send SAT_ENDED.\n",
648                    no_error,"");
649   gras_rawsock_close(raw);
650   gras_msg_free(msg);
651   return 1;
652 }
653
654 xbt_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
655                                  const char* to_name,unsigned int to_port) {
656   xbt_error_t errcode;
657   gras_sock_t *sock;
658   gras_msg_t *answer;
659
660   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
661     fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
662             xbt_error_name(errcode));
663     return errcode;
664   }
665
666   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
667     fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
668             xbt_error_name(errcode));
669     gras_sock_close(sock);
670     return errcode;
671   }
672
673   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
674     fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
675             xbt_error_name(errcode));
676     gras_sock_close(sock);
677     return errcode;
678   }
679
680   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
681     fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
682             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
683     gras_msg_free(answer);
684     gras_sock_close(sock);
685     return errcode;
686   }
687
688   gras_msg_free(answer);
689   gras_sock_close(sock);
690
691   return no_error;
692 }
693 #endif