Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
21228960056b767bb1bb1ede77029dcd9b91bfc7
[simgrid.git] / src / gras / Transport / transport_plugin_tcp.c
1 /* $Id$ */
2
3 /* tcp trp (transport) - send/receive a bunch of bytes from a tcp socket    */
4
5 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "portable.h"
11
12 #if 0
13 #  include <signal.h>       /* close() pipe() read() write() */
14 #  include <sys/wait.h>     /* waitpid() */
15 #endif
16
17
18 #include "transport_private.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport,"TCP transport");
21
22 /***
23  *** Prototypes 
24  ***/
25 xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t self,
26                                         gras_socket_t sock);
27 xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t self,
28                                         gras_socket_t sock);
29 xbt_error_t gras_trp_tcp_socket_accept(gras_socket_t  sock,
30                                         gras_socket_t *dst);
31
32 void         gras_trp_tcp_socket_close(gras_socket_t sd);
33   
34 xbt_error_t gras_trp_tcp_chunk_send(gras_socket_t sd,
35                                     const char *data,
36                                     unsigned long int size);
37
38 xbt_error_t gras_trp_tcp_chunk_recv(gras_socket_t sd,
39                                     char *data,
40                                     unsigned long int size);
41
42 void gras_trp_tcp_exit(gras_trp_plugin_t plug);
43
44
45 static int TcpProtoNumber(void);
46 /***
47  *** Specific plugin part
48  ***/
49
50 typedef struct {
51   fd_set msg_socks;
52   fd_set meas_socks;
53 } gras_trp_tcp_plug_data_t;
54
55 /***
56  *** Specific socket part
57  ***/
58
59 typedef struct {
60   int buffsize;
61 } gras_trp_tcp_sock_data_t;
62
63
64 /***
65  *** Code
66  ***/
67 xbt_error_t gras_trp_tcp_setup(gras_trp_plugin_t plug) {
68
69   gras_trp_tcp_plug_data_t *data = xbt_new(gras_trp_tcp_plug_data_t,1);
70
71   FD_ZERO(&(data->msg_socks));
72   FD_ZERO(&(data->meas_socks));
73
74   plug->socket_client = gras_trp_tcp_socket_client;
75   plug->socket_server = gras_trp_tcp_socket_server;
76   plug->socket_accept = gras_trp_tcp_socket_accept;
77   plug->socket_close  = gras_trp_tcp_socket_close;
78
79   plug->chunk_send    = gras_trp_tcp_chunk_send;
80   plug->chunk_recv    = gras_trp_tcp_chunk_recv;
81
82   plug->flush = NULL; /* nothing's cached */
83
84   plug->data = (void*)data;
85   plug->exit = gras_trp_tcp_exit;
86    
87   return no_error;
88 }
89
90 void gras_trp_tcp_exit(gras_trp_plugin_t plug) {
91   DEBUG1("Exit plugin TCP (free %p)", plug->data);
92   free(plug->data);
93 }
94
95 xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t self,
96                                         gras_socket_t sock){
97   
98   struct sockaddr_in addr;
99   struct hostent *he;
100   struct in_addr *haddr;
101   int size = sock->bufSize * 1024; 
102
103   sock->incoming = 1; /* TCP sockets are duplex'ed */
104
105   sock->sd = socket (AF_INET, SOCK_STREAM, 0);
106   
107   if (sock->sd < 0) {
108     RAISE1(system_error,
109            "Failed to create socket: %s",
110            sock_errstr);
111   }
112
113   if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) ||
114       setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
115      WARN1("setsockopt failed, cannot set buffer size: %s",sock_errstr);
116   }
117   
118   he = gethostbyname (sock->peer_name);
119   if (he == NULL) {
120     RAISE2(system_error,
121            "Failed to lookup hostname %s: %s",
122            sock->peer_name, sock_errstr);
123   }
124   
125   haddr = ((struct in_addr *) (he->h_addr_list)[0]);
126   
127   memset(&addr, 0, sizeof(struct sockaddr_in));
128   memcpy (&addr.sin_addr, haddr, sizeof(struct in_addr));
129   addr.sin_family = AF_INET;
130   addr.sin_port = htons (sock->peer_port);
131
132   if (connect (sock->sd, (struct sockaddr*) &addr, sizeof (addr)) < 0) {
133     tcp_close(sock->sd);
134     RAISE3(system_error,
135            "Failed to connect socket to %s:%d (%s)",
136            sock->peer_name, sock->peer_port, sock_errstr);
137   }
138   VERB4("Connect to %s:%d (sd=%d, port %d here)",sock->peer_name, sock->peer_port, sock->sd, sock->port);
139    
140   return no_error;
141 }
142
143 /**
144  * gras_trp_tcp_socket_server:
145  *
146  * Open a socket used to receive messages.
147  */
148 xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t self,
149                                         /* OUT */ gras_socket_t sock){
150   int size = sock->bufSize * 1024; 
151   int on = 1;
152   struct sockaddr_in server;
153
154   gras_trp_tcp_plug_data_t *tcp=(gras_trp_tcp_plug_data_t*)self->data;
155  
156   sock->outgoing  = 1; /* TCP => duplex mode */
157
158   server.sin_port = htons((u_short)sock->port);
159   server.sin_addr.s_addr = INADDR_ANY;
160   server.sin_family = AF_INET;
161   if((sock->sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
162     RAISE1(system_error,"Socket allocation failed: %s", sock_errstr);
163   }
164
165   if (setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on))) {
166      RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s",
167             sock_errstr);
168   }
169    
170   if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) ||
171       setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
172      WARN1("setsockopt failed, cannot set buffer size: %s",
173            sock_errstr);
174   }
175         
176   if (bind(sock->sd, (struct sockaddr *)&server, sizeof(server)) == -1) {
177     tcp_close(sock->sd);
178     RAISE2(system_error,"Cannot bind to port %d: %s",sock->port, sock_errstr);
179   }
180
181   DEBUG2("Listen on port %d (sd=%d)",sock->port, sock->sd);
182   if (listen(sock->sd, 5) < 0) {
183     tcp_close(sock->sd);
184     RAISE2(system_error,"Cannot listen on port %d: %s",sock->port,sock_errstr);
185   }
186
187   if (sock->meas)
188     FD_SET(sock->sd, &(tcp->meas_socks));
189   else
190     FD_SET(sock->sd, &(tcp->msg_socks));
191
192   VERB2("Openned a server socket on port %d (sd=%d)",sock->port,sock->sd);
193   
194   return no_error;
195 }
196
197 xbt_error_t
198 gras_trp_tcp_socket_accept(gras_socket_t  sock,
199                            gras_socket_t *dst) {
200   gras_socket_t res;
201   
202   struct sockaddr_in peer_in;
203   socklen_t peer_in_len = sizeof(peer_in);
204
205   int sd;
206   int tmp_errno;
207   int size;
208                 
209   XBT_IN;
210   gras_trp_socket_new(1,&res);
211
212   sd = accept(sock->sd, (struct sockaddr *)&peer_in, &peer_in_len);
213   tmp_errno = errno;
214
215   if (sd == -1) {
216     gras_socket_close(sock);
217     RAISE1(system_error,
218            "Accept failed (%s). Droping server socket.", sock_errstr);
219   } else {
220     int i = 1;
221     socklen_t s = sizeof(int);
222   
223     if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) 
224         || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) {
225        RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s",
226               sock_errstr);
227     }
228
229     res->bufSize = sock->bufSize;
230     size = sock->bufSize * 1024;
231     if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size))
232        || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
233        WARN1("setsockopt failed, cannot set buffer size: %s",
234              sock_errstr);
235     }
236      
237     res->plugin    = sock->plugin;
238     res->incoming  = sock->incoming;
239     res->outgoing  = sock->outgoing;
240     res->accepting = 0;
241     res->sd        = sd;
242     res->port      = -1;
243     res->peer_port = peer_in.sin_port;
244
245     /* FIXME: Lock to protect inet_ntoa */
246     if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) {
247       res->peer_name = (char*)strdup("unknown");
248     } else {
249       struct in_addr addrAsInAddr;
250       char *tmp;
251  
252       addrAsInAddr.s_addr = peer_in.sin_addr.s_addr;
253       
254       tmp = inet_ntoa(addrAsInAddr);
255       if (tmp != NULL) {
256         res->peer_name = (char*)strdup(tmp);
257       } else {
258         res->peer_name = (char*)strdup("unknown");
259       }
260     }
261
262     VERB3("Accepted from %s:%d (sd=%d)", res->peer_name,res->peer_port,sd);
263     
264     *dst = res;
265
266     XBT_OUT;
267     return no_error;
268   }
269 }
270
271 void gras_trp_tcp_socket_close(gras_socket_t sock){
272   gras_trp_tcp_plug_data_t *tcp;
273   
274   if (!sock) return; /* close only once */
275   tcp=sock->plugin->data;
276
277   VERB1("close tcp connection %d", sock->sd);
278
279   /* FIXME: no pipe in GRAS so far  
280   if(!FD_ISSET(sd, &connectedPipes)) {
281     if(shutdown(sd, 2) < 0) {
282       GetNWSLock(&lock);
283       tmp_errno = errno;
284       ReleaseNWSLock(&lock);
285       
286       / * The other side may have beaten us to the reset. * /
287       if ((tmp_errno!=ENOTCONN) && (tmp_errno!=ECONNRESET)) {
288         WARN1("CloseSocket: shutdown error %d\n", tmp_errno);
289       }
290     }
291   } */
292
293 #ifndef HAVE_WINSOCK_H
294   /* forget about the socket 
295      ... but not when using winsock since accept'ed socket can not fit 
296      into the fd_set*/
297   if (sock->meas){
298     FD_CLR(sock->sd, &(tcp->meas_socks));
299   } else {
300     FD_CLR(sock->sd, &(tcp->msg_socks));
301   }
302 #endif
303    
304   /* close the socket */
305   if(tcp_close(sock->sd) < 0) {
306     WARN3("error while closing tcp socket %d: %d (%s)\n", 
307              sock->sd, sock_errno, sock_errstr);
308   }
309
310 }
311
312 /**
313  * gras_trp_tcp_chunk_send:
314  *
315  * Send data on a TCP socket
316  */
317 xbt_error_t 
318 gras_trp_tcp_chunk_send(gras_socket_t sock,
319                         const char *data,
320                         unsigned long int size) {
321   
322   /* TCP sockets are in duplex mode, don't check direction */
323   xbt_assert0(size >= 0, "Cannot send a negative amount of data");
324
325   while (size) {
326     int status = 0;
327     
328     status = tcp_write(sock->sd, data, (size_t)size);
329     DEBUG3("write(%d, %p, %ld);", sock->sd, data, size);
330     
331     if (status < 0) {
332       RAISE4(system_error,"write(%d,%p,%ld) failed: %s",
333              sock->sd, data, size,
334              sock_errstr);
335     }
336     
337     if (status) {
338       size  -= status;
339       data  += status;
340     } else {
341       RAISE1(system_error,"file descriptor closed (%s)",
342              sock_errstr);
343     }
344   }
345
346   return no_error;
347 }
348 /**
349  * gras_trp_tcp_chunk_recv:
350  *
351  * Receive data on a TCP socket.
352  */
353 xbt_error_t 
354 gras_trp_tcp_chunk_recv(gras_socket_t sock,
355                         char *data,
356                         unsigned long int size) {
357
358   /* TCP sockets are in duplex mode, don't check direction */
359   xbt_assert0(sock, "Cannot recv on an NULL socket");
360   xbt_assert0(size >= 0, "Cannot receive a negative amount of data");
361   
362   while (size) {
363     int status = 0;
364     
365     DEBUG3("read(%d, %p, %ld);", sock->sd, data, size);
366     status = tcp_read(sock->sd, data, (size_t)size);
367     
368     if (status < 0) {
369       RAISE4(system_error,"read(%d,%p,%d) failed: %s",
370              sock->sd, data, (int)size,
371              sock_errstr);
372     }
373     
374     if (status) {
375       size  -= status;
376       data  += status;
377     } else {
378       RAISE3(system_error,"file descriptor closed (nothing read(%d, %p, %ld) on the socket)",
379              sock->sd, data, size);
380     }
381   }
382   
383   return no_error;
384 }
385
386
387 /*
388  * Returns the tcp protocol number from the network protocol data base.
389  *
390  * getprotobyname() is not thread safe. We need to lock it.
391  */
392 static int TcpProtoNumber(void) {
393   struct protoent *fetchedEntry;
394   static int returnValue = 0;
395   
396   if(returnValue == 0) {
397     fetchedEntry = getprotobyname("tcp");
398     xbt_assert0(fetchedEntry, "getprotobyname(tcp) gave NULL");
399     returnValue = fetchedEntry->p_proto;
400   }
401   
402   return returnValue;
403 }
404
405 #ifdef HAVE_WINSOCK_H
406 #define RETSTR( x ) case x: return #x
407
408 const char *gras_wsa_err2string( int err ) {
409    switch( err ) {
410       RETSTR( WSAEINTR );
411       RETSTR( WSAEBADF );
412       RETSTR( WSAEACCES );
413       RETSTR( WSAEFAULT );
414       RETSTR( WSAEINVAL );
415       RETSTR( WSAEMFILE );
416       RETSTR( WSAEWOULDBLOCK );
417       RETSTR( WSAEINPROGRESS );
418       RETSTR( WSAEALREADY );
419       RETSTR( WSAENOTSOCK );
420       RETSTR( WSAEDESTADDRREQ );
421       RETSTR( WSAEMSGSIZE );
422       RETSTR( WSAEPROTOTYPE );
423       RETSTR( WSAENOPROTOOPT );
424       RETSTR( WSAEPROTONOSUPPORT );
425       RETSTR( WSAESOCKTNOSUPPORT );
426       RETSTR( WSAEOPNOTSUPP );
427       RETSTR( WSAEPFNOSUPPORT );
428       RETSTR( WSAEAFNOSUPPORT );
429       RETSTR( WSAEADDRINUSE );
430       RETSTR( WSAEADDRNOTAVAIL );
431       RETSTR( WSAENETDOWN );
432       RETSTR( WSAENETUNREACH );
433       RETSTR( WSAENETRESET );
434       RETSTR( WSAECONNABORTED );
435       RETSTR( WSAECONNRESET );
436       RETSTR( WSAENOBUFS );
437       RETSTR( WSAEISCONN );
438       RETSTR( WSAENOTCONN );
439       RETSTR( WSAESHUTDOWN );
440       RETSTR( WSAETOOMANYREFS );
441       RETSTR( WSAETIMEDOUT );
442       RETSTR( WSAECONNREFUSED );
443       RETSTR( WSAELOOP );
444       RETSTR( WSAENAMETOOLONG );
445       RETSTR( WSAEHOSTDOWN );
446       RETSTR( WSAEHOSTUNREACH );
447       RETSTR( WSAENOTEMPTY );
448       RETSTR( WSAEPROCLIM );
449       RETSTR( WSAEUSERS );
450       RETSTR( WSAEDQUOT );
451       RETSTR( WSAESTALE );
452       RETSTR( WSAEREMOTE );
453       RETSTR( WSASYSNOTREADY );
454       RETSTR( WSAVERNOTSUPPORTED );
455       RETSTR( WSANOTINITIALISED );
456       RETSTR( WSAEDISCON );
457       
458 #ifdef HAVE_WINSOCK2
459       RETSTR( WSAENOMORE );
460       RETSTR( WSAECANCELLED );
461       RETSTR( WSAEINVALIDPROCTABLE );
462       RETSTR( WSAEINVALIDPROVIDER );
463       RETSTR( WSASYSCALLFAILURE );
464       RETSTR( WSASERVICE_NOT_FOUND );
465       RETSTR( WSATYPE_NOT_FOUND );
466       RETSTR( WSA_E_NO_MORE );
467       RETSTR( WSA_E_CANCELLED );
468       RETSTR( WSAEREFUSED );
469 #endif /* HAVE_WINSOCK2 */
470
471       RETSTR( WSAHOST_NOT_FOUND );
472       RETSTR( WSATRY_AGAIN );
473       RETSTR( WSANO_RECOVERY );
474       RETSTR( WSANO_DATA );
475    }
476    return "unknown WSA error";
477 }
478 #endif /* HAVE_WINSOCK_H */