Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
55bd596073e9e75ef1b8ffdce5835e0e2c856f9e
[simgrid.git] / src / gras / Transport / transport_plugin_tcp.c
1 /* $Id$ */
2
3 /* tcp trp (transport) - send/receive a bunch of bytes from a tcp socket    */
4
5 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include <unistd.h>       /* close() pipe() read() write() */
11 #include <signal.h>       /* close() pipe() read() write() */
12 #include <netinet/in.h>   /* sometimes required for #include <arpa/inet.h> */
13 #include <netinet/tcp.h>  /* TCP_NODELAY */
14 #include <arpa/inet.h>    /* inet_ntoa() */
15 #include <netdb.h>        /* getprotobyname() */
16 #include <sys/time.h>     /* struct timeval */
17 #include <errno.h>        /* errno */
18 #include <sys/wait.h>     /* waitpid() */
19 #include <sys/socket.h>   /* getpeername() socket() */
20 #include <stdlib.h>
21 #include <string.h>       /* memset */
22
23 #include "transport_private.h"
24
25 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport,"TCP transport");
26
27 /***
28  *** Prototypes 
29  ***/
30 xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
31                                         gras_socket_t sock);
32 xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
33                                         gras_socket_t sock);
34 xbt_error_t gras_trp_tcp_socket_accept(gras_socket_t  sock,
35                                         gras_socket_t *dst);
36
37 void         gras_trp_tcp_socket_close(gras_socket_t sd);
38   
39 xbt_error_t gras_trp_tcp_chunk_send(gras_socket_t sd,
40                                      const char *data,
41                                      long int size);
42
43 xbt_error_t gras_trp_tcp_chunk_recv(gras_socket_t sd,
44                                      char *data,
45                                      long int size);
46
47 void gras_trp_tcp_exit(gras_trp_plugin_t *plug);
48
49
50 static int TcpProtoNumber(void);
51 /***
52  *** Specific plugin part
53  ***/
54
55 typedef struct {
56   fd_set msg_socks;
57   fd_set raw_socks;
58 } gras_trp_tcp_plug_data_t;
59
60 /***
61  *** Specific socket part
62  ***/
63
64 typedef struct {
65   int buffsize;
66 } gras_trp_tcp_sock_data_t;
67
68
69 /***
70  *** Code
71  ***/
72 xbt_error_t gras_trp_tcp_setup(gras_trp_plugin_t *plug) {
73
74   gras_trp_tcp_plug_data_t *data = xbt_new(gras_trp_tcp_plug_data_t,1);
75
76   FD_ZERO(&(data->msg_socks));
77   FD_ZERO(&(data->raw_socks));
78
79   plug->socket_client = gras_trp_tcp_socket_client;
80   plug->socket_server = gras_trp_tcp_socket_server;
81   plug->socket_accept = gras_trp_tcp_socket_accept;
82   plug->socket_close  = gras_trp_tcp_socket_close;
83
84   plug->chunk_send    = gras_trp_tcp_chunk_send;
85   plug->chunk_recv    = gras_trp_tcp_chunk_recv;
86
87   plug->flush = NULL; /* nothing's cached */
88
89   plug->data = (void*)data;
90   plug->exit = gras_trp_tcp_exit;
91
92   return no_error;
93 }
94
95 void gras_trp_tcp_exit(gras_trp_plugin_t *plug) {
96   DEBUG1("Exit plugin TCP (free %p)", plug->data);
97   xbt_free(plug->data);
98 }
99
100 xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
101                                         gras_socket_t sock){
102   
103   struct sockaddr_in addr;
104   struct hostent *he;
105   struct in_addr *haddr;
106   int size = sock->bufSize * 1024; 
107
108   sock->incoming = 1; /* TCP sockets are duplex'ed */
109
110   sock->sd = socket (AF_INET, SOCK_STREAM, 0);
111   
112   if (sock->sd < 0) {
113     RAISE1(system_error,
114            "Failed to create socket: %s",
115            strerror (errno));
116   }
117
118   if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) ||
119       setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
120      WARN1("setsockopt failed, cannot set buffer size: %s",
121            strerror(errno));
122   }
123   
124   he = gethostbyname (sock->peer_name);
125   if (he == NULL) {
126     RAISE2(system_error,
127            "Failed to lookup hostname %s: %s",
128            sock->peer_name, strerror (errno));
129   }
130   
131   haddr = ((struct in_addr *) (he->h_addr_list)[0]);
132   
133   memset(&addr, 0, sizeof(struct sockaddr_in));
134   memcpy (&addr.sin_addr, haddr, sizeof(struct in_addr));
135   addr.sin_family = AF_INET;
136   addr.sin_port = htons (sock->peer_port);
137
138   if (connect (sock->sd, (struct sockaddr*) &addr, sizeof (addr)) < 0) {
139     close(sock->sd);
140     RAISE3(system_error,
141            "Failed to connect socket to %s:%d (%s)",
142            sock->peer_name, sock->peer_port, strerror (errno));
143   }
144   
145   return no_error;
146 }
147
148 /**
149  * gras_trp_tcp_socket_server:
150  *
151  * Open a socket used to receive messages.
152  */
153 xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
154                                         /* OUT */ gras_socket_t sock){
155   int size = sock->bufSize * 1024; 
156   int on = 1;
157   struct sockaddr_in server;
158
159   gras_trp_tcp_plug_data_t *tcp=(gras_trp_tcp_plug_data_t*)self->data;
160  
161   sock->outgoing  = 1; /* TCP => duplex mode */
162
163   server.sin_port = htons((u_short)sock->port);
164   server.sin_addr.s_addr = INADDR_ANY;
165   server.sin_family = AF_INET;
166   if((sock->sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
167     RAISE1(system_error,"Socket allocation failed: %s", strerror(errno));
168   }
169
170   if (setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on))) {
171      RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s",
172             strerror(errno));
173   }
174    
175   if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) ||
176       setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
177      WARN1("setsockopt failed, cannot set buffer size: %s",
178            strerror(errno));
179   }
180         
181   if (bind(sock->sd, (struct sockaddr *)&server, sizeof(server)) == -1) {
182     close(sock->sd);
183     RAISE2(system_error,"Cannot bind to port %d: %s",sock->port, strerror(errno));
184   }
185
186   if (listen(sock->sd, 5) < 0) {
187     close(sock->sd);
188     RAISE2(system_error,"Cannot listen to port %d: %s",sock->port,strerror(errno));
189   }
190
191   if (sock->raw)
192     FD_SET(sock->sd, &(tcp->raw_socks));
193   else
194     FD_SET(sock->sd, &(tcp->msg_socks));
195
196   DEBUG2("Openned a server socket on port %d (sock %d)",sock->port,sock->sd);
197   
198   return no_error;
199 }
200
201 xbt_error_t
202 gras_trp_tcp_socket_accept(gras_socket_t  sock,
203                            gras_socket_t *dst) {
204   gras_socket_t res;
205   
206   struct sockaddr_in peer_in;
207   socklen_t peer_in_len = sizeof(peer_in);
208
209   int sd;
210   int tmp_errno;
211   int size;
212                         
213   gras_trp_socket_new(1,&res);
214
215   sd = accept(sock->sd, (struct sockaddr *)&peer_in, &peer_in_len);
216   tmp_errno = errno;
217
218   if(sd == -1) {
219     gras_socket_close(sock);
220     RAISE1(system_error,
221            "Accept failed (%s). Droping server socket.", strerror(tmp_errno));
222   } else {
223     int i = 1;
224     socklen_t s = sizeof(int);
225   
226     if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) 
227         || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) {
228        RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s",
229               strerror(errno));
230     }
231  
232     (*dst)->bufSize = sock->bufSize;
233     size = sock->bufSize * 1024;
234     if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size))
235        || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
236        WARN1("setsockopt failed, cannot set buffer size: %s",
237              strerror(errno));
238     }
239      
240     res->plugin    = sock->plugin;
241     res->incoming  = sock->incoming;
242     res->outgoing  = sock->outgoing;
243     res->accepting = 0;
244     res->sd        = sd;
245     res->port      = -1;
246     res->peer_port = peer_in.sin_port;
247
248     /* FIXME: Lock to protect inet_ntoa */
249     if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) {
250       res->peer_name = (char*)strdup("unknown");
251     } else {
252       struct in_addr addrAsInAddr;
253       char *tmp;
254  
255       addrAsInAddr.s_addr = peer_in.sin_addr.s_addr;
256       
257       tmp = inet_ntoa(addrAsInAddr);
258       if (tmp != NULL) {
259         res->peer_name = (char*)strdup(tmp);
260       } else {
261         res->peer_name = (char*)strdup("unknown");
262       }
263     }
264
265     VERB3("accepted socket %d to %s:%d", sd, res->peer_name,res->peer_port);
266     
267     *dst = res;
268
269     return no_error;
270   }
271 }
272
273 void gras_trp_tcp_socket_close(gras_socket_t sock){
274   gras_trp_tcp_plug_data_t *tcp;
275   
276   if (!sock) return; /* close only once */
277   tcp=sock->plugin->data;
278
279   DEBUG1("close tcp connection %d", sock->sd);
280
281   /* FIXME: no pipe in GRAS so far  
282   if(!FD_ISSET(sd, &connectedPipes)) {
283     if(shutdown(sd, 2) < 0) {
284       GetNWSLock(&lock);
285       tmp_errno = errno;
286       ReleaseNWSLock(&lock);
287       
288       / * The other side may have beaten us to the reset. * /
289       if ((tmp_errno!=ENOTCONN) && (tmp_errno!=ECONNRESET)) {
290         WARN1("CloseSocket: shutdown error %d\n", tmp_errno);
291       }
292     }
293   } */
294
295   /* forget about the socket */
296   if (sock->raw)
297     FD_CLR(sock->sd, &(tcp->raw_socks));
298   else
299     FD_CLR(sock->sd, &(tcp->msg_socks));
300
301   /* close the socket */
302   if(close(sock->sd) < 0) {
303     WARN3("error while closing tcp socket %d: %d (%s)\n", 
304              sock->sd, errno, strerror(errno));
305   }
306 }
307
308 /**
309  * gras_trp_tcp_chunk_send:
310  *
311  * Send data on a TCP socket
312  */
313 xbt_error_t 
314 gras_trp_tcp_chunk_send(gras_socket_t sock,
315                         const char *data,
316                         long int size) {
317   
318   /* TCP sockets are in duplex mode, don't check direction */
319   xbt_assert0(size >= 0, "Cannot send a negative amount of data");
320
321   while (size) {
322     int status = 0;
323     
324     status = write(sock->sd, data, (size_t)size);
325     DEBUG3("write(%d, %p, %ld);", sock->sd, data, size);
326     
327     if (status == -1) {
328       RAISE4(system_error,"write(%d,%p,%ld) failed: %s",
329              sock->sd, data, size,
330              strerror(errno));
331     }
332     
333     if (status) {
334       size  -= status;
335       data  += status;
336     } else {
337       RAISE0(system_error,"file descriptor closed");
338     }
339   }
340
341   return no_error;
342 }
343 /**
344  * gras_trp_tcp_chunk_recv:
345  *
346  * Receive data on a TCP socket.
347  */
348 xbt_error_t 
349 gras_trp_tcp_chunk_recv(gras_socket_t sock,
350                         char *data,
351                         long int size) {
352
353   /* TCP sockets are in duplex mode, don't check direction */
354   xbt_assert0(sock, "Cannot recv on an NULL socket");
355   xbt_assert0(size >= 0, "Cannot receive a negative amount of data");
356   
357   while (size) {
358     int status = 0;
359     
360     status = read(sock->sd, data, (size_t)size);
361     DEBUG3("read(%d, %p, %ld);", sock->sd, data, size);
362     
363     if (status == -1) {
364       RAISE4(system_error,"read(%d,%p,%d) failed: %s",
365              sock->sd, data, (int)size,
366              strerror(errno));
367     }
368     
369     if (status) {
370       size  -= status;
371       data  += status;
372     } else {
373       RAISE0(system_error,"file descriptor closed");
374     }
375   }
376   
377   return no_error;
378 }
379
380
381 /*
382  * Returns the tcp protocol number from the network protocol data base.
383  *
384  * getprotobyname() is not thread safe. We need to lock it.
385  */
386 static int TcpProtoNumber(void) {
387   struct protoent *fetchedEntry;
388   static int returnValue = 0;
389   
390   if(returnValue == 0) {
391     fetchedEntry = getprotobyname("tcp");
392     xbt_assert0(fetchedEntry, "getprotobyname(tcp) gave NULL");
393     returnValue = fetchedEntry->p_proto;
394   }
395   
396   return returnValue;
397 }
398
399 /* Data exchange over raw sockets. Placing this in there is a kind of crude hack.
400    It means that the only possible raw are TCP where we may want to do UDP for them. 
401    But I fail to find a good internal organization for now. We may want to split 
402    raw and regular sockets more efficiently.
403 */
404 xbt_error_t gras_socket_raw_exchange(gras_socket_t peer,
405                                       int sender,
406                                       unsigned int timeout,
407                                       unsigned long int exp_size,
408                                       unsigned long int msg_size) {
409    char *chunk;
410    int res_last, msg_sofar, exp_sofar;
411    
412    fd_set rd_set;
413 /*    int rv; */
414    
415    struct timeval timeOut;
416    
417    chunk = xbt_malloc(msg_size);
418
419    for   (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
420       for(msg_sofar=0; msg_sofar < msg_size; msg_sofar += res_last) {
421          
422          if(sender) {
423             res_last = send(peer->sd, chunk, msg_size - msg_sofar, 0);
424          } else {
425             res_last = 0;
426             FD_ZERO(&rd_set);
427             FD_SET(peer->sd,&rd_set);
428             timeOut.tv_sec = timeout;
429             timeOut.tv_usec = 0;
430                 
431             if (0 < select(peer->sd+1,&rd_set,NULL,NULL,&timeOut))
432               res_last = recv(peer->sd, chunk, msg_size-msg_sofar, 0);
433             
434          }
435          if (res_last == 0) {
436            /* No progress done, bail out */
437            xbt_free(chunk);
438            RAISE0(unknown_error,"Not exchanged a single byte, bailing out");
439          }
440       }
441    }
442    
443    xbt_free(chunk);
444    return no_error;
445 }