Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Improve documentation and put the examples in another file, so that we can include...
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-5 Martin Quinson. All rights reserved.                */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "amok/Bandwidth/bandwidth_private.h"
11 #include "gras/messages.h"
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
14
15 static short _amok_bw_initialized = 0;
16
17 /** @brief module initialization; all participating nodes must run this */
18 void amok_bw_init(void) {
19   gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
20
21   amok_base_init();
22
23   if (! _amok_bw_initialized) {
24         
25      /* Build the datatype descriptions */ 
26      bw_request_desc = gras_datadesc_struct("s_bw_request_t");
27      gras_datadesc_struct_append(bw_request_desc,"host",
28                                  gras_datadesc_by_name("xbt_host_t"));
29      gras_datadesc_struct_append(bw_request_desc,"buf_size",
30                                  gras_datadesc_by_name("unsigned long int"));
31      gras_datadesc_struct_append(bw_request_desc,"exp_size",
32                                  gras_datadesc_by_name("unsigned long int"));
33      gras_datadesc_struct_append(bw_request_desc,"msg_size",
34                                  gras_datadesc_by_name("unsigned long int"));
35      gras_datadesc_struct_close(bw_request_desc);
36      bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
37
38      bw_res_desc = gras_datadesc_struct("s_bw_res_t");
39      gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("s_amok_remoterr_t"));
40      gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
41      gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
42      gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
43      gras_datadesc_struct_close(bw_res_desc);
44      bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
45
46      sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
47      gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
48      gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
49      gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
50      gras_datadesc_struct_close(sat_request_desc);
51      sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
52      
53      /* Register the bandwidth messages */
54      gras_msgtype_declare("BW handshake",     bw_request_desc);
55      gras_msgtype_declare("BW handshake ACK", bw_request_desc);
56      gras_msgtype_declare("BW request",       bw_request_desc);
57      gras_msgtype_declare("BW result",        bw_res_desc);
58      
59      /* Register the saturation messages */
60      gras_msgtype_declare("SAT start",   sat_request_desc);
61      gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
62      gras_msgtype_declare("SAT begin",   sat_request_desc);
63      gras_msgtype_declare("SAT begun",   gras_datadesc_by_name("amok_remoterr_t"));
64      gras_msgtype_declare("SAT end",     NULL);
65      gras_msgtype_declare("SAT ended",   gras_datadesc_by_name("amok_remoterr_t"));
66      gras_msgtype_declare("SAT stop",    NULL);
67      gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
68   }
69    
70   /* Register the callbacks */
71   gras_cb_register(gras_msgtype_by_name("BW request"),
72                    &amok_bw_cb_bw_request);
73   gras_cb_register(gras_msgtype_by_name("BW handshake"),
74                    &amok_bw_cb_bw_handshake);
75
76   gras_cb_register(gras_msgtype_by_name("SAT start"),
77                    &amok_bw_cb_sat_start);
78   gras_cb_register(gras_msgtype_by_name("SAT begin"),
79                    &amok_bw_cb_sat_begin);
80   
81   _amok_bw_initialized =1;
82 }
83
84 /** @brief module finalization */
85 void amok_bw_exit(void) {
86   if (! _amok_bw_initialized)
87     return;
88    
89   gras_cb_unregister(gras_msgtype_by_name("BW request"),
90                      &amok_bw_cb_bw_request);
91   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
92                      &amok_bw_cb_bw_handshake);
93
94   gras_cb_unregister(gras_msgtype_by_name("SAT start"),
95                      &amok_bw_cb_sat_start);
96   gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
97                      &amok_bw_cb_sat_begin);
98
99   _amok_bw_initialized = 0;
100 }
101
102 /* ***************************************************************************
103  * Bandwidth tests
104  * ***************************************************************************/
105
106 /**
107  * \brief bandwidth measurement between localhost and \e peer
108  * 
109  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
110  * \arg buf_size: Size of the socket buffer
111  * \arg exp_size: Total size of data sent across the network
112  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
113  * \arg sec: where the result (in seconds) should be stored.
114  * \arg bw: observed Bandwidth (in kb/s) 
115  *
116  * Conduct a bandwidth test from the local process to the given peer.
117  * This call is blocking until the end of the experiment.
118  *
119  * Results are reported in last args, and sizes are in kb.
120  */
121 xbt_error_t amok_bw_test(gras_socket_t peer,
122                          unsigned long int buf_size,
123                          unsigned long int exp_size,
124                          unsigned long int msg_size,
125                          /*OUT*/ double *sec, double *bw) {
126
127   /* Measurement sockets for the experiments */
128   gras_socket_t measMasterIn,measIn,measOut;
129   int port;
130   xbt_error_t errcode;
131   bw_request_t request,request_ack;
132   
133   for (port = 5000, errcode = system_error;
134        errcode == system_error && port < 10000;
135        errcode = gras_socket_server_ext(++port,buf_size,1,&measMasterIn));
136   if (errcode != no_error) {
137     ERROR1("Error %s encountered while opening a measurement socket",
138            xbt_error_name(errcode));
139     return errcode;
140   }
141         
142   request=xbt_new0(s_bw_request_t,1);
143   request->buf_size=buf_size*1024;
144   request->exp_size=exp_size*1024;
145   request->msg_size=msg_size*1024;
146   request->host.name = NULL;
147   request->host.port = gras_socket_my_port(measMasterIn);
148   VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld kb= %ld b)", 
149         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
150         buf_size,request->buf_size);
151
152   if ((errcode=gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request))) {
153     ERROR1("Error %s encountered while sending the BW request.", xbt_error_name(errcode));
154     return errcode;
155   }
156   TRY(gras_socket_meas_accept(measMasterIn,&measIn));
157
158   if ((errcode=gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),
159                              NULL,&request_ack))) {
160     ERROR1("Error %s encountered while waiting for the answer to BW request.\n",
161             xbt_error_name(errcode));
162     return errcode;
163   }
164   
165   /* FIXME: What if there is a remote error? */
166    
167   if((errcode=gras_socket_client_ext(gras_socket_peer_name(peer),
168                                      request_ack->host.port, 
169                                      request->buf_size,1,&measOut))) {
170     ERROR3("Error %s encountered while opening the measurement socket to %s:%d for BW test\n",
171             xbt_error_name(errcode),gras_socket_peer_name(peer),request_ack->host.port);
172     return errcode;
173   }
174   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
175
176   *sec=gras_os_time();
177   TRY(gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size));
178   TRY(gras_socket_meas_recv(measIn,120,1,1));
179
180   /*catch
181     ERROR1("Error %s encountered while sending the BW experiment.",
182             xbt_error_name(errcode));
183     gras_socket_close(measOut);
184     gras_socket_close(measMasterIn);
185     gras_socket_close(measIn);
186   */
187
188   *sec = gras_os_time() - *sec;
189   *bw = ((double)exp_size) / *sec;
190
191   free(request_ack);
192   free(request);
193   if (measIn != measMasterIn)
194     gras_socket_close(measIn);
195   gras_socket_close(measMasterIn);
196   gras_socket_close(measOut);
197   return errcode;
198 }
199
200
201 /* Callback to the "BW handshake" message: 
202    opens a server measurement socket,
203    indicate its port in an "BW handshaked" message,
204    receive the corresponding data on the measurement socket, 
205    close the measurment socket
206
207    sizes are in byte (got converted from kb my expeditor)
208 */
209 int amok_bw_cb_bw_handshake(gras_socket_t  expeditor,
210                             void          *payload) {
211   gras_socket_t measMasterIn,measIn,measOut;
212   bw_request_t request=*(bw_request_t*)payload;
213   bw_request_t answer;
214   xbt_error_t errcode;
215   int port;
216   
217   VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
218         gras_socket_peer_name(expeditor),request->host.port,
219         request->buf_size,request->exp_size,request->msg_size);     
220
221   /* Build our answer */
222   answer = xbt_new0(s_bw_request_t,1);
223   
224   for (port = 6000, errcode = system_error;
225        errcode == system_error;
226        errcode = gras_socket_server_ext(++port,request->buf_size,1,&measMasterIn));
227   if (errcode != no_error) {
228     ERROR1("Error %s encountered while opening a measurement server socket", xbt_error_name(errcode));
229     /* FIXME: tell error to remote */
230     return 1;
231   }
232    
233   answer->buf_size=request->buf_size;
234   answer->exp_size=request->exp_size;
235   answer->msg_size=request->msg_size;
236   answer->host.port=gras_socket_my_port(measMasterIn);
237
238   /* Don't connect asap to leave time to other side to enter the accept() */
239   if ((errcode=gras_socket_client_ext(gras_socket_peer_name(expeditor),
240                                       request->host.port,
241                                       request->buf_size,1,&measOut))) { 
242     ERROR3("Error '%s' encountered while opening a measurement socket back to %s:%d", 
243            xbt_error_name(errcode),gras_socket_peer_name(expeditor),request->host.port);
244     /* FIXME: tell error to remote */
245     return 1;
246   }
247
248
249   if ((errcode=gras_msg_send(expeditor,
250                              gras_msgtype_by_name("BW handshake ACK"),
251                              &answer))) {
252     ERROR1("Error %s encountered while sending the answer.",
253             xbt_error_name(errcode));
254     gras_socket_close(measMasterIn);
255     gras_socket_close(measOut);
256     /* FIXME: tell error to remote */
257     return 1;
258   }
259   TRY(gras_socket_meas_accept(measMasterIn,&measIn));
260   DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
261         answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
262
263   TRY(gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size));
264   TRY(gras_socket_meas_send(measOut,120,1,1));
265
266   /*catch
267     ERROR1("Error %s encountered while receiving the experiment.",
268             xbt_error_name(errcode));
269     gras_socket_close(measMasterIn);
270     gras_socket_close(measIn);
271     gras_socket_close(measOut);
272     * FIXME: tell error to remote ? *
273     return 1;
274     }*/
275
276   if (measIn != measMasterIn)
277     gras_socket_close(measMasterIn);
278   gras_socket_close(measIn);
279   gras_socket_close(measOut);
280   free(answer);
281   free(request);
282   DEBUG0("BW experiment done.");
283   return 1;
284 }
285
286 /**
287  * \brief request a bandwidth measurement between two remote hosts
288  *
289  * \arg from_name: Name of the first host 
290  * \arg from_port: port on which the first process is listening for messages
291  * \arg to_name: Name of the second host 
292  * \arg to_port: port on which the second process is listening (for messages, do not 
293  * give a measurement socket here. The needed measurement sockets will be created 
294  * automatically and negociated between the peers)
295  * \arg buf_size: Size of the socket buffer
296  * \arg exp_size: Total size of data sent across the network
297  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
298  * \arg sec: where the result (in seconds) should be stored.
299  * \arg bw: observed Bandwidth (in kb/s)
300  *
301  * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
302  * This call is blocking until the end of the experiment.
303  *
304  * Results are reported in last args, and sizes are in kb.
305  */
306 xbt_error_t amok_bw_request(const char* from_name,unsigned int from_port,
307                             const char* to_name,unsigned int to_port,
308                             unsigned long int buf_size,
309                             unsigned long int exp_size,
310                             unsigned long int msg_size,
311                             /*OUT*/ double *sec, double*bw) {
312   
313   gras_socket_t sock;
314   xbt_error_t errcode;
315   /* The request */
316   bw_request_t request;
317   bw_res_t result;
318
319   request=xbt_new0(s_bw_request_t,1);
320   request->buf_size=buf_size;
321   request->exp_size=exp_size;
322   request->msg_size=msg_size;
323
324   request->host.name = (char*)to_name;
325   request->host.port = to_port;
326
327   TRY(gras_socket_client(from_name,from_port,&sock));
328   TRY(gras_msg_send(sock,gras_msgtype_by_name("BW request"),&request));
329   free(request);
330
331   TRY(gras_msg_wait(240,gras_msgtype_by_name("BW result"),NULL, &result));
332   
333   *sec=result->sec;
334   *bw =result->bw;
335
336   VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
337         from_name,from_port, to_name,to_port,
338         *sec,*bw);
339
340   gras_socket_close(sock);
341   free(result);
342   return no_error;
343 }
344
345 int amok_bw_cb_bw_request(gras_socket_t    expeditor,
346                           void            *payload) {
347                           
348   xbt_error_t errcode;                    
349   /* specification of the test to run, and our answer */
350   bw_request_t request = *(bw_request_t*)payload;
351   bw_res_t result = xbt_new0(s_bw_res,1);
352   gras_socket_t peer;
353
354   TRY(gras_socket_client(request->host.name,request->host.port,&peer));
355   TRY(amok_bw_test(peer,
356                    request->buf_size,request->exp_size,request->msg_size,
357                    &(result->sec),&(result->bw)));
358
359   TRY(gras_msg_send(expeditor,gras_msgtype_by_name("BW result"),&result));
360
361   gras_os_sleep(1);
362   gras_socket_close(peer);
363   free(request);
364   free(result);
365   
366   return 1;
367
368 #if 0
369   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
370   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
371
372   unsigned int bufSize=gras_msg_ctn(msg,1,0,BwExp_t).bufSize;
373   unsigned int expSize=gras_msg_ctn(msg,1,0,BwExp_t).expSize;
374   unsigned int msgSize=gras_msg_ctn(msg,1,0,BwExp_t).msgSize;
375   /* our answer */
376   msgError_t *error;
377   msgResult_t *res;
378
379   if ((error->errcode=grasbw_test(to_name,to_port,bufSize,expSize,msgSize,
380                                   &(res[0].value),&(res[1].value) ))) {
381     fprintf(stderr,
382             "%s:%d:grasbw_cbRequest: Error %s encountered while doing the test\n",
383             __FILE__,__LINE__,xbt_error_name(error->errcode));
384     strncpy(error->errmsg,"Error within grasbw_test",ERRMSG_LEN);
385     gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
386                    error,1,
387                    res,2);
388     return 1;
389   }
390   res[0].timestamp = (unsigned int) gras_time();
391   res[1].timestamp = (unsigned int) gras_time();
392   gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
393                  error,1,
394                  res,2);
395   gras_msg_free(msg);
396   return 1;
397 #endif
398 }
399
400 int amok_bw_cb_sat_start(gras_socket_t     expeditor,
401                          void             *payload) {
402    CRITICAL0("amok_bw_cb_sat_start; not implemented");
403    return 1;
404
405 int amok_bw_cb_sat_begin(gras_socket_t     expeditor,
406                          void             *payload) {
407    CRITICAL0("amok_bw_cb_sat_begin: not implemented");
408    return 1;
409 }
410 #if 0
411 /* ***************************************************************************
412  * Link saturation
413  * ***************************************************************************/
414
415 xbt_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
416                                   const char* to_name,unsigned int to_port,
417                                   unsigned int msgSize, unsigned int timeout) {
418   gras_sock_t *sock;
419   xbt_error_t errcode;
420   /* The request */
421   SatExp_t *request;
422   msgHost_t *target;
423   /* answer */
424   gras_msg_t *answer;
425
426   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
427     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
428             __FILE__,__LINE__,xbt_error_name(errcode));
429     return errcode;
430   }
431   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
432       !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
433     fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
434     gras_sock_close(sock);
435     return malloc_error;    
436   }
437
438   request->timeout=timeout;
439   request->msgSize=msgSize;
440
441   strcpy(target->host,to_name);
442   target->port=to_port;
443
444   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2, 
445                               target,1,
446                               request,1))) {
447     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
448             __FILE__,__LINE__,xbt_error_name(errcode));
449     gras_sock_close(sock);
450     return errcode;
451   }
452   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
453     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
454             __FILE__,__LINE__,xbt_error_name(errcode));
455     gras_sock_close(sock);
456     return errcode;
457   }
458
459   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
460     fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
461             __FILE__,__LINE__,xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
462     gras_msg_free(answer);
463     gras_sock_close(sock);
464     return errcode;
465   }
466
467   gras_msg_free(answer);
468   gras_sock_close(sock);
469   return no_error;
470 }
471
472 int grasbw_cbSatStart(gras_msg_t *msg) {
473   gras_rawsock_t *raw;
474   gras_sock_t *sock;
475   xbt_error_t errcode;
476   double start; /* time to timeout */
477
478   /* specification of the test to run */
479   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
480   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
481
482   unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
483   unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
484   unsigned int raw_port;
485
486   /* The request */
487   SatExp_t *request;
488   /* answer */
489   gras_msg_t *answer;
490
491   /*
492   fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
493   fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
494           msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
495           msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
496   */
497
498   /* Negociate the saturation with the peer */
499   if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
500     fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
501             xbt_error_name(errcode));
502     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
503                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
504                      errcode,"Cannot contact peer.\n");
505     return 1;
506   }
507   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
508     fprintf(stderr,"cbSatStart(): Malloc error\n");
509     gras_sock_close(sock);
510     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
511                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
512                      malloc_error,"Cannot build request.\n");
513     return 1;    
514   }
515
516   request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
517   request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
518
519   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1, 
520                               request,1))) {
521     fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
522             xbt_error_name(errcode));
523     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
524                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
525                      errcode,"Cannot send request.\n");
526     gras_sock_close(sock);
527     return 1;
528   }
529
530   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
531     fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
532             xbt_error_name(errcode));
533     gras_sock_close(sock);
534
535     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
536                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
537                      errcode,
538                      "Cannot receive the ACK.\n");
539     return 1;
540   }
541
542   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
543     fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
544             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
545
546     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
547                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
548                      errcode,
549                      "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
550     gras_msg_free(answer);
551     gras_sock_close(sock);
552     return 1;
553   }
554
555   raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
556
557   if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
558     fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
559             xbt_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
560
561     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
562                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
563                      errcode,"Cannot open raw socket.\n");
564     gras_sock_close(sock);
565     return 1;
566   }
567
568   /* send a train of data before repporting that XP is started */
569   if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
570     fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
571     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
572                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
573                      errcode,"Cannot raw send.\n");
574     gras_sock_close(sock);
575     gras_rawsock_close(raw);
576     return 1;
577   }
578   
579   grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
580                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
581                    no_error,"Saturation started");
582   gras_msg_free(answer);
583   gras_msg_free(msg);
584   
585   /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
586   start=gras_time();
587   while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error && 
588          gras_time()-start < timeout) {
589     if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
590       fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
591       /* our error message do not interess anyone. SAT_STOP will do nothing. */
592       gras_sock_close(sock);
593       gras_rawsock_close(raw);
594       return 1;
595     } 
596   }
597   if (gras_time()-start > timeout) {
598     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
599     gras_sock_close(sock);
600     gras_rawsock_close(raw);
601     return 1;
602   }
603
604   /* Handle the SAT_STOP which broke the previous while */
605   
606   if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
607     fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
608
609     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
610                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
611                      errcode,"Sending SAT_END to peer failed.\n");
612     gras_sock_close(sock);
613     gras_rawsock_close(raw);
614     return 1;
615   }
616   
617   if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
618     fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
619
620     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
621                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
622                      errcode,"Receiving SAT_ENDED from peer failed.\n");
623     gras_sock_close(sock);
624     gras_rawsock_close(raw);
625     return 1;
626   }
627   grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
628                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
629                    no_error,"");
630
631   gras_sock_close(sock);
632   gras_rawsock_close(raw);
633   gras_msg_free(answer);
634   gras_msg_free(msg);
635
636   return 1;  
637 }
638
639 int grasbw_cbSatBegin(gras_msg_t *msg) {
640   gras_rawsock_t *raw;
641   xbt_error_t errcode;
642   double start; /* timer */
643   /* request */
644   unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
645   unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
646   /* answer */
647   SatExp_t *request;
648   msgError_t *error;
649
650   if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
651       !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
652     fprintf(stderr,"cbSatBegin(): Malloc error\n");
653     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
654                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
655                      malloc_error,"Malloc error");
656     return 1;
657   }
658
659   if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) { 
660     fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
661             xbt_error_name(errcode));
662     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
663                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
664                      errcode,"Cannot open raw socket");
665     return 1;
666   }
667   request->port=gras_rawsock_get_peer_port(raw);
668   request->msgSize=msgSize;
669   error->errcode=no_error;
670   error->errmsg[0]='\0';
671   if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
672                               error,1,
673                               request,1))) {
674     fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
675             xbt_error_name(errcode));
676     return 1;
677   }
678   gras_msg_free(msg);
679
680   start=gras_time();
681   while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
682          gras_time() - start < timeout) {
683     errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
684     if (errcode != timeout_error && errcode != no_error) {
685       fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",xbt_error_name(errcode));
686       /* our error message do not interess anyone. SAT_END will do nothing. */
687       /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
688       return 1;
689     } 
690   }
691   if (gras_time()-start > timeout) {
692     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
693     gras_rawsock_close(raw);
694     return 1;
695   }
696
697   grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
698                    "cbSatBegin: Cannot send SAT_ENDED.\n",
699                    no_error,"");
700   gras_rawsock_close(raw);
701   gras_msg_free(msg);
702   return 1;
703 }
704
705 xbt_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
706                                  const char* to_name,unsigned int to_port) {
707   xbt_error_t errcode;
708   gras_sock_t *sock;
709   gras_msg_t *answer;
710
711   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
712     fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
713             xbt_error_name(errcode));
714     return errcode;
715   }
716
717   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
718     fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
719             xbt_error_name(errcode));
720     gras_sock_close(sock);
721     return errcode;
722   }
723
724   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
725     fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
726             xbt_error_name(errcode));
727     gras_sock_close(sock);
728     return errcode;
729   }
730
731   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
732     fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
733             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
734     gras_msg_free(answer);
735     gras_sock_close(sock);
736     return errcode;
737   }
738
739   gras_msg_free(answer);
740   gras_sock_close(sock);
741
742   return no_error;
743 }
744 #endif