Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
2f6f368b6abc69130cee447e2ceb94381be7bd2e
[simgrid.git] / src / gras / Transport / transport_plugin_tcp.c
1 /* $Id$ */
2
3 /* tcp trp (transport) - send/receive a bunch of bytes from a tcp socket    */
4
5 /* Authors: Martin Quinson                                                  */
6 /* Copyright (C) 2004 Martin Quinson.                                       */
7
8 /* This program is free software; you can redistribute it and/or modify it
9    under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include <unistd.h>       /* close() pipe() read() write() */
12 #include <signal.h>       /* close() pipe() read() write() */
13 #include <netinet/in.h>   /* sometimes required for #include <arpa/inet.h> */
14 #include <netinet/tcp.h>  /* TCP_NODELAY */
15 #include <arpa/inet.h>    /* inet_ntoa() */
16 #include <netdb.h>        /* getprotobyname() */
17 #include <sys/time.h>     /* struct timeval */
18 #include <errno.h>        /* errno */
19 #include <sys/wait.h>     /* waitpid() */
20 #include <sys/socket.h>   /* getpeername() socket() */
21 #include <stdlib.h>
22 #include <string.h>       /* memset */
23
24 #include "gras_private.h"
25 #include "transport_private.h"
26
27 GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport);
28
29 /***
30  *** Prototypes 
31  ***/
32 gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
33                                         const char *host,
34                                         unsigned short port,
35                                         /* OUT */ gras_socket_t *sock);
36 gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
37                                         unsigned short port,
38                                         /* OUT */ gras_socket_t *sock);
39 gras_error_t gras_trp_tcp_socket_accept(gras_socket_t  *sock,
40                                         gras_socket_t **dst);
41
42 void         gras_trp_tcp_socket_close(gras_socket_t *sd);
43   
44 gras_error_t gras_trp_tcp_chunk_send(gras_socket_t *sd,
45                                      char *data,
46                                      size_t size);
47
48 gras_error_t gras_trp_tcp_chunk_recv(gras_socket_t *sd,
49                                      char *data,
50                                      size_t size);
51
52 void         gras_trp_tcp_free_specific(void *s);
53
54
55 static int TcpProtoNumber(void);
56 /***
57  *** Specific plugin part
58  ***/
59
60 typedef struct {
61   fd_set msg_socks;
62   fd_set raw_socks;
63 } gras_trp_tcp_plug_data_t;
64
65 /***
66  *** Specific socket part
67  ***/
68
69 typedef struct {
70   int buffsize;
71 } gras_trp_tcp_sock_data_t;
72
73
74 /***
75  *** Code
76  ***/
77 gras_error_t
78 gras_trp_tcp_setup(gras_trp_plugin_t *plug) {
79
80   gras_trp_tcp_plug_data_t *tcp = malloc(sizeof(gras_trp_tcp_plug_data_t));
81   if (!tcp)
82     RAISE_MALLOC;
83
84   FD_ZERO(&(tcp->msg_socks));
85   FD_ZERO(&(tcp->raw_socks));
86
87   plug->socket_client = gras_trp_tcp_socket_client;
88   plug->socket_server = gras_trp_tcp_socket_server;
89   plug->socket_accept = gras_trp_tcp_socket_accept;
90   plug->socket_close  = gras_trp_tcp_socket_close;
91
92   plug->chunk_send    = gras_trp_tcp_chunk_send;
93   plug->chunk_recv    = gras_trp_tcp_chunk_recv;
94
95   plug->data      = (void*)tcp;
96
97   return no_error;
98 }
99
100 gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
101                                         const char *host,
102                                         unsigned short port,
103                                         /* OUT */ gras_socket_t *sock){
104   
105   struct sockaddr_in addr;
106   struct hostent *he;
107   struct in_addr *haddr;
108
109   sock->incoming = 1; /* TCP sockets are duplex'ed */
110
111   sock->sd = socket (AF_INET, SOCK_STREAM, 0);
112   
113   if (sock->sd < 0) {
114     RAISE1(system_error,
115            "Failed to create socket: %s",
116            strerror (errno));
117   }
118   
119   he = gethostbyname (host);
120   if (he == NULL) {
121     RAISE2(system_error,
122            "Failed to lookup hostname %s: %s",
123            host, strerror (errno));
124   }
125   
126   haddr = ((struct in_addr *) (he->h_addr_list)[0]);
127   
128   memset(&addr, 0, sizeof(struct sockaddr_in));
129   memcpy (&addr.sin_addr, haddr, sizeof(struct in_addr));
130   addr.sin_family = AF_INET;
131   addr.sin_port = htons (port);
132
133   if (connect (sock->sd, (struct sockaddr*) &addr, sizeof (addr)) < 0) {
134     close(sock->sd);
135     RAISE3(system_error,
136            "Failed to connect socket to %s:%d (%s)",
137            host, port, strerror (errno));
138   }
139   
140   return no_error;
141 }
142
143 /**
144  * gras_trp_tcp_socket_server:
145  *
146  * Open a socket used to receive messages.
147  */
148 gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
149                                         unsigned short port,
150                                         /* OUT */ gras_socket_t *sock){
151 //  int size = bufSize * 1024;
152   int on = 1;
153   struct sockaddr_in server;
154
155   gras_trp_tcp_plug_data_t *tcp=(gras_trp_tcp_plug_data_t*)self->data;
156  
157   sock->outgoing  = 1; /* TCP => duplex mode */
158
159   server.sin_port = htons((u_short)port);
160   server.sin_addr.s_addr = INADDR_ANY;
161   server.sin_family = AF_INET;
162   if((sock->sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
163     RAISE1(system_error,"socket allocation failed: %s", strerror(errno));
164   }
165
166   (void)setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, 
167                    (char *)&on, sizeof(on));
168    /*
169   (void)setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size));
170   (void)setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size));
171     */
172   if (bind(sock->sd, (struct sockaddr *)&server, sizeof(server)) == -1) {
173     close(sock->sd);
174     RAISE2(system_error,"Cannot bind to port %d: %s",port, strerror(errno));
175   }
176
177   if (listen(sock->sd, 5) < 0) {
178     close(sock->sd);
179     RAISE2(system_error,"Cannot listen to port %d: %s",port,strerror(errno));
180   }
181
182   if (sock->raw)
183     FD_SET(sock->sd, &(tcp->raw_socks));
184   else
185     FD_SET(sock->sd, &(tcp->msg_socks));
186
187   DEBUG2("Openned a server socket on port %d (sock %d)",port,sock->sd);
188   
189   return no_error;
190 }
191
192 gras_error_t
193 gras_trp_tcp_socket_accept(gras_socket_t  *sock,
194                            gras_socket_t **dst) {
195   gras_socket_t *res;
196   
197   struct sockaddr_in peer_in;
198   socklen_t peer_in_len = sizeof(peer_in);
199
200   int sd;
201   int tmp_errno;
202                                 
203   res=malloc(sizeof(gras_socket_t));
204   if (!res)
205     RAISE_MALLOC;
206
207   sd = accept(sock->sd, (struct sockaddr *)&peer_in, &peer_in_len);
208   tmp_errno = errno;
209
210   if(sd == -1) {
211     gras_socket_close(&sock);
212     RAISE1(system_error,
213            "Accept failed (%s). Droping server socket.", strerror(tmp_errno));
214   } else {
215     int i = 1;
216     socklen_t s = sizeof(int);
217   
218     if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) 
219         || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) {
220       WARN0("setsockopt failed, cannot condition the accepted socket");
221     }
222  
223      /* FIXME: bufSize removed until we can have optionsets 
224     i = ((gras_trp_tcp_sock_specific_t*)sock->specific)->buffsize;
225     if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&i, s)
226         || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&i, s)) {
227       WARNING0("setsockopt failed, cannot set buffsize");       
228     }
229       */
230      
231     res->plugin    = sock->plugin;
232     res->incoming  = sock->incoming;
233     res->outgoing  = sock->outgoing;
234     res->accepting = 0;
235     res->sd        = sd;
236     res->port      = -1;
237     res->peer_port = peer_in.sin_port;
238
239     /* FIXME: Lock to protect inet_ntoa */
240     if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) {
241       res->peer_name = strdup("unknown");
242     } else {
243       struct in_addr addrAsInAddr;
244       char *tmp;
245  
246       addrAsInAddr.s_addr = peer_in.sin_addr.s_addr;
247       
248       tmp = inet_ntoa(addrAsInAddr);
249       if (tmp != NULL) {
250         res->peer_name = strdup(tmp);
251       } else {
252         res->peer_name = strdup("unknown");
253       }
254     }
255
256     VERB3("accepted socket %d to %s:%d\n", sd, res->peer_name,res->peer_port);
257     
258     *dst = res;
259
260     return no_error;
261   }
262 }
263
264 void gras_trp_tcp_socket_close(gras_socket_t *sock){
265   gras_trp_tcp_plug_data_t *tcp;
266   
267   if (!sock) return; /* close only once */
268   tcp=sock->plugin->data;
269
270   DEBUG1("close tcp connection %d\n", sock->sd);
271
272   /* FIXME: no pipe in GRAS so far  
273   if(!FD_ISSET(sd, &connectedPipes)) {
274     if(shutdown(sd, 2) < 0) {
275       GetNWSLock(&lock);
276       tmp_errno = errno;
277       ReleaseNWSLock(&lock);
278       
279       / * The other side may have beaten us to the reset. * /
280       if ((tmp_errno!=ENOTCONN) && (tmp_errno!=ECONNRESET)) {
281         WARN1("CloseSocket: shutdown error %d\n", tmp_errno);
282       }
283     }
284   } */
285
286   /* forget about the socket */
287   if (sock->raw)
288     FD_CLR(sock->sd, &(tcp->raw_socks));
289   else
290     FD_CLR(sock->sd, &(tcp->msg_socks));
291
292   /* close the socket */
293   if(close(sock->sd) < 0) {
294     WARN3("error while closing tcp socket %d: %d (%s)\n", 
295              sock->sd, errno, strerror(errno));
296   }
297 }
298
299 /**
300  * gras_trp_tcp_chunk_send:
301  *
302  * Send data on a TCP socket
303  */
304 gras_error_t 
305 gras_trp_tcp_chunk_send(gras_socket_t *sock,
306                     char *data,
307                     size_t size) {
308   
309   /* TCP sockets are in duplex mode, don't check direction */
310   gras_assert0(size >= 0, "Cannot send a negative amount of data");
311
312   while (size) {
313     int status = 0;
314     
315     status = write(sock->sd, data, (size_t)size);
316     DEBUG3("write(%d, %p, %ld);\n", sock->sd, data, size);
317     
318     if (status == -1) {
319       RAISE4(system_error,"write(%d,%p,%d) failed: %s",
320              sock->sd, data, (int)size,
321              strerror(errno));
322     }
323     
324     if (status) {
325       size  -= status;
326       data  += status;
327     } else {
328       RAISE0(system_error,"file descriptor closed");
329     }
330   }
331
332   return no_error;
333 }
334 /**
335  * gras_trp_tcp_chunk_recv:
336  *
337  * Receive data on a TCP socket.
338  */
339 gras_error_t 
340 gras_trp_tcp_chunk_recv(gras_socket_t *sock,
341                         char *data,
342                         size_t size) {
343
344   /* TCP sockets are in duplex mode, don't check direction */
345   gras_assert0(sock, "Cannot recv on an NULL socket");
346   gras_assert0(size >= 0, "Cannot receive a negative amount of data");
347   
348   while (size) {
349     int status = 0;
350     
351     status = read(sock->sd, data, (size_t)size);
352     DEBUG3("read(%d, %p, %ld);\n", sock->sd, data, size);
353     
354     if (status == -1) {
355       RAISE4(system_error,"read(%d,%p,%d) failed: %s",
356              sock->sd, data, (int)size,
357              strerror(errno));
358     }
359     
360     if (status) {
361       size  -= status;
362       data  += status;
363     } else {
364       RAISE0(system_error,"file descriptor closed");
365     }
366   }
367   
368   return no_error;
369 }
370
371
372 /*
373  * Returns the tcp protocol number from the network protocol data base.
374  *
375  * getprotobyname() is not thread safe. We need to lock it.
376  */
377 static int TcpProtoNumber(void) {
378   struct protoent *fetchedEntry;
379   static int returnValue = 0;
380   
381   if(returnValue == 0) {
382     fetchedEntry = getprotobyname("tcp");
383     gras_assert0(fetchedEntry, "getprotobyname(tcp) gave NULL");
384     returnValue = fetchedEntry->p_proto;
385   }
386   
387   return returnValue;
388 }