Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Tweak gras_trp_*_recv() prototype. This is now (sock, char*data,int size, int bufsize...
[simgrid.git] / src / gras / Transport / transport_plugin_tcp.c
1 /* $Id$ */
2
3 /* tcp trp (transport) - send/receive a bunch of bytes from a tcp socket    */
4
5 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "portable.h"
11 #include "xbt/ex.h"
12 #if 0
13 #  include <signal.h>       /* close() pipe() read() write() */
14 #  include <sys/wait.h>     /* waitpid() */
15 #endif
16
17
18 #include "transport_private.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport,"TCP transport");
21
22 /***
23  *** Prototypes 
24  ***/
25 void gras_trp_tcp_socket_client(gras_trp_plugin_t self, gras_socket_t sock);
26 void gras_trp_tcp_socket_server(gras_trp_plugin_t self, gras_socket_t sock);
27 gras_socket_t gras_trp_tcp_socket_accept(gras_socket_t  sock);
28
29 void          gras_trp_tcp_socket_close(gras_socket_t sd);
30   
31 void gras_trp_tcp_chunk_send(gras_socket_t sd,
32                              const char *data,
33                              unsigned long int size);
34
35 void gras_trp_tcp_chunk_recv(gras_socket_t sd,
36                              char *data,
37                              unsigned long int size,
38                              unsigned long int bufsize);
39
40 void gras_trp_tcp_exit(gras_trp_plugin_t plug);
41
42
43 static int TcpProtoNumber(void);
44 /***
45  *** Specific plugin part
46  ***/
47
48 typedef struct {
49   fd_set msg_socks;
50   fd_set meas_socks;
51 } gras_trp_tcp_plug_data_t;
52
53 /***
54  *** Specific socket part
55  ***/
56
57 typedef struct {
58   int buffsize;
59 } gras_trp_tcp_sock_data_t;
60
61
62 /***
63  *** Code
64  ***/
65 void gras_trp_tcp_setup(gras_trp_plugin_t plug) {
66
67   gras_trp_tcp_plug_data_t *data = xbt_new(gras_trp_tcp_plug_data_t,1);
68
69   FD_ZERO(&(data->msg_socks));
70   FD_ZERO(&(data->meas_socks));
71
72   plug->socket_client = gras_trp_tcp_socket_client;
73   plug->socket_server = gras_trp_tcp_socket_server;
74   plug->socket_accept = gras_trp_tcp_socket_accept;
75   plug->socket_close  = gras_trp_tcp_socket_close;
76
77   plug->chunk_send    = gras_trp_tcp_chunk_send;
78   plug->chunk_recv    = gras_trp_tcp_chunk_recv;
79
80   plug->flush = NULL; /* nothing's cached */
81
82   plug->data = (void*)data;
83   plug->exit = gras_trp_tcp_exit;
84 }
85
86 void gras_trp_tcp_exit(gras_trp_plugin_t plug) {
87   DEBUG1("Exit plugin TCP (free %p)", plug->data);
88   free(plug->data);
89 }
90
91 void gras_trp_tcp_socket_client(gras_trp_plugin_t self, gras_socket_t sock){
92   
93   struct sockaddr_in addr;
94   struct hostent *he;
95   struct in_addr *haddr;
96   int size = sock->bufSize * 1024; 
97
98   sock->incoming = 1; /* TCP sockets are duplex'ed */
99
100   sock->sd = socket (AF_INET, SOCK_STREAM, 0);
101   
102   if (sock->sd < 0) {
103     THROW1(system_error,0, "Failed to create socket: %s", sock_errstr);
104   }
105
106   if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) ||
107       setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
108      WARN1("setsockopt failed, cannot set buffer size: %s",sock_errstr);
109   }
110   
111   he = gethostbyname (sock->peer_name);
112   if (he == NULL) {
113     THROW2(system_error,0, "Failed to lookup hostname %s: %s",
114            sock->peer_name, sock_errstr);
115   }
116   
117   haddr = ((struct in_addr *) (he->h_addr_list)[0]);
118   
119   memset(&addr, 0, sizeof(struct sockaddr_in));
120   memcpy (&addr.sin_addr, haddr, sizeof(struct in_addr));
121   addr.sin_family = AF_INET;
122   addr.sin_port = htons (sock->peer_port);
123
124   if (connect (sock->sd, (struct sockaddr*) &addr, sizeof (addr)) < 0) {
125     tcp_close(sock->sd);
126     THROW3(system_error,0,
127            "Failed to connect socket to %s:%d (%s)",
128            sock->peer_name, sock->peer_port, sock_errstr);
129   }
130   VERB4("Connect to %s:%d (sd=%d, port %d here)",sock->peer_name, sock->peer_port, sock->sd, sock->port);
131 }
132
133 /**
134  * gras_trp_tcp_socket_server:
135  *
136  * Open a socket used to receive messages.
137  */
138 void gras_trp_tcp_socket_server(gras_trp_plugin_t self, gras_socket_t sock){
139   int size = sock->bufSize * 1024; 
140   int on = 1;
141   struct sockaddr_in server;
142
143   gras_trp_tcp_plug_data_t *tcp=(gras_trp_tcp_plug_data_t*)self->data;
144  
145   sock->outgoing  = 1; /* TCP => duplex mode */
146
147   server.sin_port = htons((u_short)sock->port);
148   server.sin_addr.s_addr = INADDR_ANY;
149   server.sin_family = AF_INET;
150   if((sock->sd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
151     THROW1(system_error,0,"Socket allocation failed: %s", sock_errstr);
152
153   if (setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on)))
154      THROW1(system_error,0,"setsockopt failed, cannot condition the socket: %s",
155             sock_errstr);
156    
157   if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) ||
158       setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
159      WARN1("setsockopt failed, cannot set buffer size: %s",
160            sock_errstr);
161   }
162         
163   if (bind(sock->sd, (struct sockaddr *)&server, sizeof(server)) == -1) {
164     tcp_close(sock->sd);
165     THROW2(system_error,0,"Cannot bind to port %d: %s",sock->port, sock_errstr);
166   }
167
168   DEBUG2("Listen on port %d (sd=%d)",sock->port, sock->sd);
169   if (listen(sock->sd, 5) < 0) {
170     tcp_close(sock->sd);
171     THROW2(system_error,0,"Cannot listen on port %d: %s",sock->port,sock_errstr);
172   }
173
174   if (sock->meas)
175     FD_SET(sock->sd, &(tcp->meas_socks));
176   else
177     FD_SET(sock->sd, &(tcp->msg_socks));
178
179   VERB2("Openned a server socket on port %d (sd=%d)",sock->port,sock->sd);
180 }
181
182 gras_socket_t gras_trp_tcp_socket_accept(gras_socket_t  sock) {
183   gras_socket_t res;
184   
185   struct sockaddr_in peer_in;
186   socklen_t peer_in_len = sizeof(peer_in);
187
188   int sd;
189   int tmp_errno;
190   int size;
191
192   int i = 1;
193   socklen_t s = sizeof(int);
194                 
195   XBT_IN;
196   gras_trp_socket_new(1,&res);
197
198   sd = accept(sock->sd, (struct sockaddr *)&peer_in, &peer_in_len);
199   tmp_errno = errno;
200
201   if (sd == -1) {
202     gras_socket_close(sock);
203     THROW1(system_error,0,
204            "Accept failed (%s). Droping server socket.", sock_errstr);
205   }
206   
207   if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) 
208       || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s))
209     THROW1(system_error,0,"setsockopt failed, cannot condition the socket: %s",
210            sock_errstr);
211
212   res->bufSize = sock->bufSize;
213   size = sock->bufSize * 1024;
214   if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size))
215       || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size)))
216     WARN1("setsockopt failed, cannot set buffer size: %s", sock_errstr);
217      
218   res->plugin    = sock->plugin;
219   res->incoming  = sock->incoming;
220   res->outgoing  = sock->outgoing;
221   res->accepting = 0;
222   res->sd        = sd;
223   res->port      = -1;
224   res->peer_port = peer_in.sin_port;
225
226   /* FIXME: Lock to protect inet_ntoa */
227   if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) {
228     res->peer_name = (char*)strdup("unknown");
229   } else {
230     struct in_addr addrAsInAddr;
231     char *tmp;
232     
233     addrAsInAddr.s_addr = peer_in.sin_addr.s_addr;
234     
235     tmp = inet_ntoa(addrAsInAddr);
236     if (tmp != NULL) {
237       res->peer_name = (char*)strdup(tmp);
238     } else {
239       res->peer_name = (char*)strdup("unknown");
240     }
241   }
242   
243   VERB3("Accepted from %s:%d (sd=%d)", res->peer_name,res->peer_port,sd);
244   
245   XBT_OUT;
246   return res;
247 }
248
249 void gras_trp_tcp_socket_close(gras_socket_t sock){
250   gras_trp_tcp_plug_data_t *tcp;
251   
252   if (!sock) return; /* close only once */
253   tcp=sock->plugin->data;
254
255   VERB1("close tcp connection %d", sock->sd);
256
257   /* FIXME: no pipe in GRAS so far  
258   if(!FD_ISSET(sd, &connectedPipes)) {
259     if(shutdown(sd, 2) < 0) {
260       GetNWSLock(&lock);
261       tmp_errno = errno;
262       ReleaseNWSLock(&lock);
263       
264       / * The other side may have beaten us to the reset. * /
265       if ((tmp_errno!=ENOTCONN) && (tmp_errno!=ECONNRESET)) {
266         WARN1("CloseSocket: shutdown error %d\n", tmp_errno);
267       }
268     }
269   } */
270
271 #ifndef HAVE_WINSOCK_H
272   /* forget about the socket 
273      ... but not when using winsock since accept'ed socket can not fit 
274      into the fd_set*/
275   if (sock->meas){
276     FD_CLR(sock->sd, &(tcp->meas_socks));
277   } else {
278     FD_CLR(sock->sd, &(tcp->msg_socks));
279   }
280 #endif
281    
282   /* close the socket */
283   if(tcp_close(sock->sd) < 0) {
284     WARN3("error while closing tcp socket %d: %d (%s)\n", 
285              sock->sd, sock_errno, sock_errstr);
286   }
287
288 }
289
290 /**
291  * gras_trp_tcp_chunk_send:
292  *
293  * Send data on a TCP socket
294  */
295 void
296 gras_trp_tcp_chunk_send(gras_socket_t sock,
297                         const char *data,
298                         unsigned long int size) {
299   
300   /* TCP sockets are in duplex mode, don't check direction */
301   xbt_assert0(size >= 0, "Cannot send a negative amount of data");
302
303   while (size) {
304     int status = 0;
305     
306     status = tcp_write(sock->sd, data, (size_t)size);
307     DEBUG3("write(%d, %p, %ld);", sock->sd, data, size);
308     
309     if (status < 0) {
310       THROW4(system_error,0,"write(%d,%p,%ld) failed: %s",
311              sock->sd, data, size,
312              sock_errstr);
313     }
314     
315     if (status) {
316       size  -= status;
317       data  += status;
318     } else {
319       THROW1(system_error,0,"file descriptor closed (%s)",
320              sock_errstr);
321     }
322   }
323 }
324 /**
325  * gras_trp_tcp_chunk_recv:
326  *
327  * Receive data on a TCP socket.
328  */
329 void
330 gras_trp_tcp_chunk_recv(gras_socket_t sock,
331                         char *data,
332                         unsigned long int size,
333                         unsigned long int bufsize) {
334
335   /* TCP sockets are in duplex mode, don't check direction */
336   xbt_assert0(sock, "Cannot recv on an NULL socket");
337   xbt_assert0(size >= 0, "Cannot receive a negative amount of data");
338   xbt_assert0(bufsize>=size,"Not enough buffer size to receive that much data");
339   
340   while (size) {
341     int status = 0;
342     
343     DEBUG3("read(%d, %p, %ld);", sock->sd, data, size);
344     status = tcp_read(sock->sd, data, (size_t)bufsize);
345     
346     if (status < 0) {
347       THROW4(system_error,0,"read(%d,%p,%d) failed: %s",
348              sock->sd, data, (int)size,
349              sock_errstr);
350     }
351     
352     if (status) {
353       size    -= status;
354       bufsize -= status;
355       data    += status;
356     } else {
357       gras_socket_close(sock);
358       THROW0(system_error,0,"Socket closed by remote side");
359     }
360   }
361 }
362
363
364 /*
365  * Returns the tcp protocol number from the network protocol data base.
366  *
367  * getprotobyname() is not thread safe. We need to lock it.
368  */
369 static int TcpProtoNumber(void) {
370   struct protoent *fetchedEntry;
371   static int returnValue = 0;
372   
373   if(returnValue == 0) {
374     fetchedEntry = getprotobyname("tcp");
375     xbt_assert0(fetchedEntry, "getprotobyname(tcp) gave NULL");
376     returnValue = fetchedEntry->p_proto;
377   }
378   
379   return returnValue;
380 }
381
382 #ifdef HAVE_WINSOCK_H
383 #define RETSTR( x ) case x: return #x
384
385 const char *gras_wsa_err2string( int err ) {
386    switch( err ) {
387       RETSTR( WSAEINTR );
388       RETSTR( WSAEBADF );
389       RETSTR( WSAEACCES );
390       RETSTR( WSAEFAULT );
391       RETSTR( WSAEINVAL );
392       RETSTR( WSAEMFILE );
393       RETSTR( WSAEWOULDBLOCK );
394       RETSTR( WSAEINPROGRESS );
395       RETSTR( WSAEALREADY );
396       RETSTR( WSAENOTSOCK );
397       RETSTR( WSAEDESTADDRREQ );
398       RETSTR( WSAEMSGSIZE );
399       RETSTR( WSAEPROTOTYPE );
400       RETSTR( WSAENOPROTOOPT );
401       RETSTR( WSAEPROTONOSUPPORT );
402       RETSTR( WSAESOCKTNOSUPPORT );
403       RETSTR( WSAEOPNOTSUPP );
404       RETSTR( WSAEPFNOSUPPORT );
405       RETSTR( WSAEAFNOSUPPORT );
406       RETSTR( WSAEADDRINUSE );
407       RETSTR( WSAEADDRNOTAVAIL );
408       RETSTR( WSAENETDOWN );
409       RETSTR( WSAENETUNREACH );
410       RETSTR( WSAENETRESET );
411       RETSTR( WSAECONNABORTED );
412       RETSTR( WSAECONNRESET );
413       RETSTR( WSAENOBUFS );
414       RETSTR( WSAEISCONN );
415       RETSTR( WSAENOTCONN );
416       RETSTR( WSAESHUTDOWN );
417       RETSTR( WSAETOOMANYREFS );
418       RETSTR( WSAETIMEDOUT );
419       RETSTR( WSAECONNREFUSED );
420       RETSTR( WSAELOOP );
421       RETSTR( WSAENAMETOOLONG );
422       RETSTR( WSAEHOSTDOWN );
423       RETSTR( WSAEHOSTUNREACH );
424       RETSTR( WSAENOTEMPTY );
425       RETSTR( WSAEPROCLIM );
426       RETSTR( WSAEUSERS );
427       RETSTR( WSAEDQUOT );
428       RETSTR( WSAESTALE );
429       RETSTR( WSAEREMOTE );
430       RETSTR( WSASYSNOTREADY );
431       RETSTR( WSAVERNOTSUPPORTED );
432       RETSTR( WSANOTINITIALISED );
433       RETSTR( WSAEDISCON );
434       
435 #ifdef HAVE_WINSOCK2
436       RETSTR( WSAENOMORE );
437       RETSTR( WSAECANCELLED );
438       RETSTR( WSAEINVALIDPROCTABLE );
439       RETSTR( WSAEINVALIDPROVIDER );
440       RETSTR( WSASYSCALLFAILURE );
441       RETSTR( WSASERVICE_NOT_FOUND );
442       RETSTR( WSATYPE_NOT_FOUND );
443       RETSTR( WSA_E_NO_MORE );
444       RETSTR( WSA_E_CANCELLED );
445       RETSTR( WSAEREFUSED );
446 #endif /* HAVE_WINSOCK2 */
447
448       RETSTR( WSAHOST_NOT_FOUND );
449       RETSTR( WSATRY_AGAIN );
450       RETSTR( WSANO_RECOVERY );
451       RETSTR( WSANO_DATA );
452    }
453    return "unknown WSA error";
454 }
455 #endif /* HAVE_WINSOCK_H */