Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Plug a memleak when using the parser
[simgrid.git] / src / amok / Bandwidth / saturate.c
1 /* $Id$ */
2
3 /* amok_saturate - Link saturating facilities (for ALNeM's BW testing)      */
4
5 /* Copyright (c) 2003, 2004 Martin Quinson. All rights reserved.            */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "amok/Bandwidth/bandwidth_private.h"
11
12 XBT_LOG_EXTERNAL_CATEGORY(bw);
13 XBT_LOG_DEFAULT_CATEGORY(bw);
14
15 #if 0
16 /* ***************************************************************************
17  * Link saturation
18  * ***************************************************************************/
19
20 xbt_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
21                                    const char* to_name,unsigned int to_port,
22                                    unsigned int msgSize, unsigned int timeout) {
23   gras_sock_t *sock;
24   xbt_error_t errcode;
25   /* The request */
26   SatExp_t *request;
27   msgHost_t *target;
28   /* answer */
29   gras_msg_t *answer;
30
31   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
32     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
33             __FILE__,__LINE__,xbt_error_name(errcode));
34     return errcode;
35   }
36   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
37       !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
38     fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
39     gras_sock_close(sock);
40     return malloc_error;    
41   }
42
43   request->timeout=timeout;
44   request->msgSize=msgSize;
45
46   strcpy(target->host,to_name);
47   target->port=to_port;
48
49   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2, 
50                               target,1,
51                               request,1))) {
52     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
53             __FILE__,__LINE__,xbt_error_name(errcode));
54     gras_sock_close(sock);
55     return errcode;
56   }
57   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
58     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
59             __FILE__,__LINE__,xbt_error_name(errcode));
60     gras_sock_close(sock);
61     return errcode;
62   }
63
64   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
65     fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
66             __FILE__,__LINE__,xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
67     gras_msg_free(answer);
68     gras_sock_close(sock);
69     return errcode;
70   }
71
72   gras_msg_free(answer);
73   gras_sock_close(sock);
74   return no_error;
75 }
76
77 int grasbw_cbSatStart(gras_msg_t *msg) {
78   gras_rawsock_t *raw;
79   gras_sock_t *sock;
80   xbt_error_t errcode;
81   double start; /* time to timeout */
82
83   /* specification of the test to run */
84   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
85   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
86
87   unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
88   unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
89   unsigned int raw_port;
90
91   /* The request */
92   SatExp_t *request;
93   /* answer */
94   gras_msg_t *answer;
95
96   /*
97   fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
98   fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
99           msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
100           msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
101   */
102
103   /* Negociate the saturation with the peer */
104   if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
105     fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
106             xbt_error_name(errcode));
107     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
108                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
109                      errcode,"Cannot contact peer.\n");
110     return 1;
111   }
112   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
113     fprintf(stderr,"cbSatStart(): Malloc error\n");
114     gras_sock_close(sock);
115     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
116                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
117                      malloc_error,"Cannot build request.\n");
118     return 1;    
119   }
120
121   request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
122   request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
123
124   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1, 
125                               request,1))) {
126     fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
127             xbt_error_name(errcode));
128     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
129                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
130                      errcode,"Cannot send request.\n");
131     gras_sock_close(sock);
132     return 1;
133   }
134
135   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
136     fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
137             xbt_error_name(errcode));
138     gras_sock_close(sock);
139
140     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
141                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
142                      errcode,
143                      "Cannot receive the ACK.\n");
144     return 1;
145   }
146
147   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
148     fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
149             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
150
151     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
152                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
153                      errcode,
154                      "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
155     gras_msg_free(answer);
156     gras_sock_close(sock);
157     return 1;
158   }
159
160   raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
161
162   if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
163     fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
164             xbt_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
165
166     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
167                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
168                      errcode,"Cannot open raw socket.\n");
169     gras_sock_close(sock);
170     return 1;
171   }
172
173   /* send a train of data before repporting that XP is started */
174   if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
175     fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
176     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
177                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
178                      errcode,"Cannot raw send.\n");
179     gras_sock_close(sock);
180     gras_rawsock_close(raw);
181     return 1;
182   }
183   
184   grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
185                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
186                    no_error,"Saturation started");
187   gras_msg_free(answer);
188   gras_msg_free(msg);
189   
190   /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
191   start=gras_time();
192   while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error && 
193          gras_time()-start < timeout) {
194     if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
195       fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
196       /* our error message do not interess anyone. SAT_STOP will do nothing. */
197       gras_sock_close(sock);
198       gras_rawsock_close(raw);
199       return 1;
200     } 
201   }
202   if (gras_time()-start > timeout) {
203     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
204     gras_sock_close(sock);
205     gras_rawsock_close(raw);
206     return 1;
207   }
208
209   /* Handle the SAT_STOP which broke the previous while */
210   
211   if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
212     fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
213
214     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
215                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
216                      errcode,"Sending SAT_END to peer failed.\n");
217     gras_sock_close(sock);
218     gras_rawsock_close(raw);
219     return 1;
220   }
221   
222   if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
223     fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
224
225     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
226                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
227                      errcode,"Receiving SAT_ENDED from peer failed.\n");
228     gras_sock_close(sock);
229     gras_rawsock_close(raw);
230     return 1;
231   }
232   grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
233                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
234                    no_error,"");
235
236   gras_sock_close(sock);
237   gras_rawsock_close(raw);
238   gras_msg_free(answer);
239   gras_msg_free(msg);
240
241   return 1;  
242 }
243
244 int grasbw_cbSatBegin(gras_msg_t *msg) {
245   gras_rawsock_t *raw;
246   xbt_error_t errcode;
247   double start; /* timer */
248   /* request */
249   unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
250   unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
251   /* answer */
252   SatExp_t *request;
253   msgError_t *error;
254
255   if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
256       !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
257     fprintf(stderr,"cbSatBegin(): Malloc error\n");
258     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
259                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
260                      malloc_error,"Malloc error");
261     return 1;
262   }
263
264   if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) { 
265     fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
266             xbt_error_name(errcode));
267     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
268                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
269                      errcode,"Cannot open raw socket");
270     return 1;
271   }
272   request->port=gras_rawsock_get_peer_port(raw);
273   request->msgSize=msgSize;
274   error->errcode=no_error;
275   error->errmsg[0]='\0';
276   if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
277                               error,1,
278                               request,1))) {
279     fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
280             xbt_error_name(errcode));
281     return 1;
282   }
283   gras_msg_free(msg);
284
285   start=gras_time();
286   while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
287          gras_time() - start < timeout) {
288     errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
289     if (errcode != timeout_error && errcode != no_error) {
290       fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",xbt_error_name(errcode));
291       /* our error message do not interess anyone. SAT_END will do nothing. */
292       /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
293       return 1;
294     } 
295   }
296   if (gras_time()-start > timeout) {
297     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
298     gras_rawsock_close(raw);
299     return 1;
300   }
301
302   grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
303                    "cbSatBegin: Cannot send SAT_ENDED.\n",
304                    no_error,"");
305   gras_rawsock_close(raw);
306   gras_msg_free(msg);
307   return 1;
308 }
309
310 xbt_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
311                                  const char* to_name,unsigned int to_port) {
312   xbt_error_t errcode;
313   gras_sock_t *sock;
314   gras_msg_t *answer;
315
316   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
317     fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
318             xbt_error_name(errcode));
319     return errcode;
320   }
321
322   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
323     fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
324             xbt_error_name(errcode));
325     gras_sock_close(sock);
326     return errcode;
327   }
328
329   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
330     fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
331             xbt_error_name(errcode));
332     gras_sock_close(sock);
333     return errcode;
334   }
335
336   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
337     fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
338             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
339     gras_msg_free(answer);
340     gras_sock_close(sock);
341     return errcode;
342   }
343
344   gras_msg_free(answer);
345   gras_sock_close(sock);
346
347   return no_error;
348 }
349 #endif