Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
don't use XBT_CIN, since I didn't manage to implement it correctly
[simgrid.git] / src / gras / Transport / transport_plugin_tcp.c
1 /* $Id$ */
2
3 /* tcp trp (transport) - send/receive a bunch of bytes from a tcp socket    */
4
5 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "portable.h"
11
12 #if 0
13 #  include <signal.h>       /* close() pipe() read() write() */
14 #  include <sys/wait.h>     /* waitpid() */
15 #endif
16
17
18 #include "transport_private.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport,"TCP transport");
21
22 /***
23  *** Prototypes 
24  ***/
25 xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
26                                         gras_socket_t sock);
27 xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
28                                         gras_socket_t sock);
29 xbt_error_t gras_trp_tcp_socket_accept(gras_socket_t  sock,
30                                         gras_socket_t *dst);
31
32 void         gras_trp_tcp_socket_close(gras_socket_t sd);
33   
34 xbt_error_t gras_trp_tcp_chunk_send(gras_socket_t sd,
35                                     const char *data,
36                                     unsigned long int size);
37
38 xbt_error_t gras_trp_tcp_chunk_recv(gras_socket_t sd,
39                                     char *data,
40                                     unsigned long int size);
41
42 void gras_trp_tcp_exit(gras_trp_plugin_t *plug);
43
44
45 static int TcpProtoNumber(void);
46 /***
47  *** Specific plugin part
48  ***/
49
50 typedef struct {
51   fd_set msg_socks;
52   fd_set meas_socks;
53 } gras_trp_tcp_plug_data_t;
54
55 /***
56  *** Specific socket part
57  ***/
58
59 typedef struct {
60   int buffsize;
61 } gras_trp_tcp_sock_data_t;
62
63
64 /***
65  *** Code
66  ***/
67 xbt_error_t gras_trp_tcp_setup(gras_trp_plugin_t *plug) {
68
69   gras_trp_tcp_plug_data_t *data = xbt_new(gras_trp_tcp_plug_data_t,1);
70
71   FD_ZERO(&(data->msg_socks));
72   FD_ZERO(&(data->meas_socks));
73
74   plug->socket_client = gras_trp_tcp_socket_client;
75   plug->socket_server = gras_trp_tcp_socket_server;
76   plug->socket_accept = gras_trp_tcp_socket_accept;
77   plug->socket_close  = gras_trp_tcp_socket_close;
78
79   plug->chunk_send    = gras_trp_tcp_chunk_send;
80   plug->chunk_recv    = gras_trp_tcp_chunk_recv;
81
82   plug->flush = NULL; /* nothing's cached */
83
84   plug->data = (void*)data;
85   plug->exit = gras_trp_tcp_exit;
86    
87   return no_error;
88 }
89
90 void gras_trp_tcp_exit(gras_trp_plugin_t *plug) {
91   DEBUG1("Exit plugin TCP (free %p)", plug->data);
92   free(plug->data);
93 }
94
95 xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
96                                         gras_socket_t sock){
97   
98   struct sockaddr_in addr;
99   struct hostent *he;
100   struct in_addr *haddr;
101   int size = sock->bufSize * 1024; 
102
103   sock->incoming = 1; /* TCP sockets are duplex'ed */
104
105   sock->sd = socket (AF_INET, SOCK_STREAM, 0);
106   
107   if (sock->sd < 0) {
108     RAISE1(system_error,
109            "Failed to create socket: %s",
110            sock_errstr);
111   }
112
113   if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) ||
114       setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
115      WARN1("setsockopt failed, cannot set buffer size: %s",sock_errstr);
116   }
117   
118   he = gethostbyname (sock->peer_name);
119   if (he == NULL) {
120     RAISE2(system_error,
121            "Failed to lookup hostname %s: %s",
122            sock->peer_name, sock_errstr);
123   }
124   
125   haddr = ((struct in_addr *) (he->h_addr_list)[0]);
126   
127   memset(&addr, 0, sizeof(struct sockaddr_in));
128   memcpy (&addr.sin_addr, haddr, sizeof(struct in_addr));
129   addr.sin_family = AF_INET;
130   addr.sin_port = htons (sock->peer_port);
131
132   DEBUG2("Connect to %s:%d",sock->peer_name, sock->peer_port);
133   if (connect (sock->sd, (struct sockaddr*) &addr, sizeof (addr)) < 0) {
134     tcp_close(sock->sd);
135     RAISE3(system_error,
136            "Failed to connect socket to %s:%d (%s)",
137            sock->peer_name, sock->peer_port, sock_errstr);
138   }
139   
140   return no_error;
141 }
142
143 /**
144  * gras_trp_tcp_socket_server:
145  *
146  * Open a socket used to receive messages.
147  */
148 xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
149                                         /* OUT */ gras_socket_t sock){
150   int size = sock->bufSize * 1024; 
151   int on = 1;
152   struct sockaddr_in server;
153
154   gras_trp_tcp_plug_data_t *tcp=(gras_trp_tcp_plug_data_t*)self->data;
155  
156   sock->outgoing  = 1; /* TCP => duplex mode */
157
158   server.sin_port = htons((u_short)sock->port);
159   server.sin_addr.s_addr = INADDR_ANY;
160   server.sin_family = AF_INET;
161   if((sock->sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
162     RAISE1(system_error,"Socket allocation failed: %s", sock_errstr);
163   }
164
165   if (setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on))) {
166      RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s",
167             sock_errstr);
168   }
169    
170   if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) ||
171       setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
172      WARN1("setsockopt failed, cannot set buffer size: %s",
173            sock_errstr);
174   }
175         
176   if (bind(sock->sd, (struct sockaddr *)&server, sizeof(server)) == -1) {
177     tcp_close(sock->sd);
178     RAISE2(system_error,"Cannot bind to port %d: %s",sock->port, sock_errstr);
179   }
180
181   DEBUG1("Listen on port %d",sock->port);
182   if (listen(sock->sd, 5) < 0) {
183     tcp_close(sock->sd);
184     RAISE2(system_error,"Cannot listen on port %d: %s",sock->port,sock_errstr);
185   }
186
187   if (sock->meas)
188     FD_SET(sock->sd, &(tcp->meas_socks));
189   else
190     FD_SET(sock->sd, &(tcp->msg_socks));
191
192   DEBUG2("Openned a server socket on port %d (sock %d)",sock->port,sock->sd);
193   
194   return no_error;
195 }
196
197 xbt_error_t
198 gras_trp_tcp_socket_accept(gras_socket_t  sock,
199                            gras_socket_t *dst) {
200   gras_socket_t res;
201   
202   struct sockaddr_in peer_in;
203   socklen_t peer_in_len = sizeof(peer_in);
204
205   int sd;
206   int tmp_errno;
207   int size;
208                 
209   XBT_IN;
210   gras_trp_socket_new(1,&res);
211
212   sd = accept(sock->sd, (struct sockaddr *)&peer_in, &peer_in_len);
213   tmp_errno = errno;
214
215   if (sd == -1) {
216     gras_socket_close(sock);
217     RAISE1(system_error,
218            "Accept failed (%s). Droping server socket.", sock_errstr);
219   } else {
220     int i = 1;
221     socklen_t s = sizeof(int);
222   
223     if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) 
224         || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) {
225        RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s",
226               sock_errstr);
227     }
228
229     res->bufSize = sock->bufSize;
230     size = sock->bufSize * 1024;
231     if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size))
232        || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
233        WARN1("setsockopt failed, cannot set buffer size: %s",
234              sock_errstr);
235     }
236      
237     res->plugin    = sock->plugin;
238     res->incoming  = sock->incoming;
239     res->outgoing  = sock->outgoing;
240     res->accepting = 0;
241     res->sd        = sd;
242     res->port      = -1;
243     res->peer_port = peer_in.sin_port;
244
245     /* FIXME: Lock to protect inet_ntoa */
246     if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) {
247       res->peer_name = (char*)strdup("unknown");
248     } else {
249       struct in_addr addrAsInAddr;
250       char *tmp;
251  
252       addrAsInAddr.s_addr = peer_in.sin_addr.s_addr;
253       
254       tmp = inet_ntoa(addrAsInAddr);
255       if (tmp != NULL) {
256         res->peer_name = (char*)strdup(tmp);
257       } else {
258         res->peer_name = (char*)strdup("unknown");
259       }
260     }
261
262     VERB3("Accepted socket %d to %s:%d", sd, res->peer_name,res->peer_port);
263     
264     *dst = res;
265
266     XBT_OUT;
267     return no_error;
268   }
269 }
270
271 void gras_trp_tcp_socket_close(gras_socket_t sock){
272   gras_trp_tcp_plug_data_t *tcp;
273   
274   if (!sock) return; /* close only once */
275   tcp=sock->plugin->data;
276
277   DEBUG1("close tcp connection %d", sock->sd);
278
279   /* FIXME: no pipe in GRAS so far  
280   if(!FD_ISSET(sd, &connectedPipes)) {
281     if(shutdown(sd, 2) < 0) {
282       GetNWSLock(&lock);
283       tmp_errno = errno;
284       ReleaseNWSLock(&lock);
285       
286       / * The other side may have beaten us to the reset. * /
287       if ((tmp_errno!=ENOTCONN) && (tmp_errno!=ECONNRESET)) {
288         WARN1("CloseSocket: shutdown error %d\n", tmp_errno);
289       }
290     }
291   } */
292
293 #ifndef HAVE_WINSOCK_H
294   /* forget about the socket 
295      ... but not when using winsock since accept'ed socket can not fit 
296      into the fd_set*/
297   if (sock->meas){
298     FD_CLR(sock->sd, &(tcp->meas_socks));
299   } else {
300     FD_CLR(sock->sd, &(tcp->msg_socks));
301   }
302 #endif
303    
304   /* close the socket */
305   if(tcp_close(sock->sd) < 0) {
306     WARN3("error while closing tcp socket %d: %d (%s)\n", 
307              sock->sd, sock_errno, sock_errstr);
308   }
309
310 }
311
312 /**
313  * gras_trp_tcp_chunk_send:
314  *
315  * Send data on a TCP socket
316  */
317 xbt_error_t 
318 gras_trp_tcp_chunk_send(gras_socket_t sock,
319                         const char *data,
320                         unsigned long int size) {
321   
322   /* TCP sockets are in duplex mode, don't check direction */
323   xbt_assert0(size >= 0, "Cannot send a negative amount of data");
324
325   while (size) {
326     int status = 0;
327     
328     status = tcp_write(sock->sd, data, (size_t)size);
329     DEBUG3("write(%d, %p, %ld);", sock->sd, data, size);
330     
331     if (status < 0) {
332       RAISE4(system_error,"write(%d,%p,%ld) failed: %s",
333              sock->sd, data, size,
334              sock_errstr);
335     }
336     
337     if (status) {
338       size  -= status;
339       data  += status;
340     } else {
341       RAISE1(system_error,"file descriptor closed (%s)",
342              sock_errstr);
343     }
344   }
345
346   return no_error;
347 }
348 /**
349  * gras_trp_tcp_chunk_recv:
350  *
351  * Receive data on a TCP socket.
352  */
353 xbt_error_t 
354 gras_trp_tcp_chunk_recv(gras_socket_t sock,
355                         char *data,
356                         unsigned long int size) {
357
358   /* TCP sockets are in duplex mode, don't check direction */
359   xbt_assert0(sock, "Cannot recv on an NULL socket");
360   xbt_assert0(size >= 0, "Cannot receive a negative amount of data");
361   
362   while (size) {
363     int status = 0;
364     
365     DEBUG3("read(%d, %p, %ld);", sock->sd, data, size);
366     status = tcp_read(sock->sd, data, (size_t)size);
367     
368     if (status < 0) {
369       RAISE4(system_error,"read(%d,%p,%d) failed: %s",
370              sock->sd, data, (int)size,
371              sock_errstr);
372     }
373     
374     if (status) {
375       size  -= status;
376       data  += status;
377     } else {
378       RAISE0(system_error,"file descriptor closed (nothing read on the socket)");
379     }
380   }
381   
382   return no_error;
383 }
384
385
386 /*
387  * Returns the tcp protocol number from the network protocol data base.
388  *
389  * getprotobyname() is not thread safe. We need to lock it.
390  */
391 static int TcpProtoNumber(void) {
392   struct protoent *fetchedEntry;
393   static int returnValue = 0;
394   
395   if(returnValue == 0) {
396     fetchedEntry = getprotobyname("tcp");
397     xbt_assert0(fetchedEntry, "getprotobyname(tcp) gave NULL");
398     returnValue = fetchedEntry->p_proto;
399   }
400   
401   return returnValue;
402 }
403
404 #if 0 /* KILLME */
405 /* Data exchange over measurement sockets. Placing this in there is a kind of crude hack.
406    It means that the only possible measurement sockets are TCP where we may want to do UDP for them. 
407    But I fail to find a good internal organization for now. We may want to split 
408    meas and regular sockets more efficiently.
409 */
410 xbt_error_t gras_socket_meas_exchange(gras_socket_t peer,
411                                       int sender,
412                                       unsigned int timeout,
413                                       unsigned long int exp_size,
414                                       unsigned long int msg_size) {
415    char *chunk;
416    int res_last, msg_sofar, exp_sofar;
417    
418    fd_set rd_set;
419 /*    int rv; */
420    
421    struct timeval timeOut;
422    
423    chunk = xbt_malloc(msg_size);
424
425    for   (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
426       for(msg_sofar=0; msg_sofar < msg_size; msg_sofar += res_last) {
427          
428          if(sender) {
429             res_last = send(peer->sd, chunk, msg_size - msg_sofar, 0);
430          } else {
431             res_last = 0;
432             FD_ZERO(&rd_set);
433             FD_SET(peer->sd,&rd_set);
434             timeOut.tv_sec = timeout;
435             timeOut.tv_usec = 0;
436                 
437             if (0 < select(peer->sd+1,&rd_set,NULL,NULL,&timeOut))
438               res_last = recv(peer->sd, chunk, msg_size-msg_sofar, 0);
439             
440          }
441          if (res_last == 0) {
442            /* No progress done, bail out */
443            free(chunk);
444            RAISE0(unknown_error,"Not exchanged a single byte, bailing out");
445          }
446       }
447    }
448    
449    free(chunk);
450    return no_error;
451 }
452 #endif
453
454 #ifdef HAVE_WINSOCK_H
455 #define RETSTR( x ) case x: return #x
456
457 const char *gras_wsa_err2string( int err ) {
458    switch( err ) {
459       RETSTR( WSAEINTR );
460       RETSTR( WSAEBADF );
461       RETSTR( WSAEACCES );
462       RETSTR( WSAEFAULT );
463       RETSTR( WSAEINVAL );
464       RETSTR( WSAEMFILE );
465       RETSTR( WSAEWOULDBLOCK );
466       RETSTR( WSAEINPROGRESS );
467       RETSTR( WSAEALREADY );
468       RETSTR( WSAENOTSOCK );
469       RETSTR( WSAEDESTADDRREQ );
470       RETSTR( WSAEMSGSIZE );
471       RETSTR( WSAEPROTOTYPE );
472       RETSTR( WSAENOPROTOOPT );
473       RETSTR( WSAEPROTONOSUPPORT );
474       RETSTR( WSAESOCKTNOSUPPORT );
475       RETSTR( WSAEOPNOTSUPP );
476       RETSTR( WSAEPFNOSUPPORT );
477       RETSTR( WSAEAFNOSUPPORT );
478       RETSTR( WSAEADDRINUSE );
479       RETSTR( WSAEADDRNOTAVAIL );
480       RETSTR( WSAENETDOWN );
481       RETSTR( WSAENETUNREACH );
482       RETSTR( WSAENETRESET );
483       RETSTR( WSAECONNABORTED );
484       RETSTR( WSAECONNRESET );
485       RETSTR( WSAENOBUFS );
486       RETSTR( WSAEISCONN );
487       RETSTR( WSAENOTCONN );
488       RETSTR( WSAESHUTDOWN );
489       RETSTR( WSAETOOMANYREFS );
490       RETSTR( WSAETIMEDOUT );
491       RETSTR( WSAECONNREFUSED );
492       RETSTR( WSAELOOP );
493       RETSTR( WSAENAMETOOLONG );
494       RETSTR( WSAEHOSTDOWN );
495       RETSTR( WSAEHOSTUNREACH );
496       RETSTR( WSAENOTEMPTY );
497       RETSTR( WSAEPROCLIM );
498       RETSTR( WSAEUSERS );
499       RETSTR( WSAEDQUOT );
500       RETSTR( WSAESTALE );
501       RETSTR( WSAEREMOTE );
502       RETSTR( WSASYSNOTREADY );
503       RETSTR( WSAVERNOTSUPPORTED );
504       RETSTR( WSANOTINITIALISED );
505       RETSTR( WSAEDISCON );
506       
507 #ifdef HAVE_WINSOCK2
508       RETSTR( WSAENOMORE );
509       RETSTR( WSAECANCELLED );
510       RETSTR( WSAEINVALIDPROCTABLE );
511       RETSTR( WSAEINVALIDPROVIDER );
512       RETSTR( WSASYSCALLFAILURE );
513       RETSTR( WSASERVICE_NOT_FOUND );
514       RETSTR( WSATYPE_NOT_FOUND );
515       RETSTR( WSA_E_NO_MORE );
516       RETSTR( WSA_E_CANCELLED );
517       RETSTR( WSAEREFUSED );
518 #endif /* HAVE_WINSOCK2 */
519
520       RETSTR( WSAHOST_NOT_FOUND );
521       RETSTR( WSATRY_AGAIN );
522       RETSTR( WSANO_RECOVERY );
523       RETSTR( WSANO_DATA );
524    }
525    return "unknown WSA error";
526 }
527 #endif /* HAVE_WINSOCK_H */