Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
rename raw sockets to measurement ones
[simgrid.git] / src / gras / Transport / transport_plugin_tcp.c
1 /* $Id$ */
2
3 /* tcp trp (transport) - send/receive a bunch of bytes from a tcp socket    */
4
5 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "portable.h"
11
12 #if 0
13 #  include <signal.h>       /* close() pipe() read() write() */
14 #  include <sys/wait.h>     /* waitpid() */
15 #endif
16
17
18 #include "transport_private.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport,"TCP transport");
21
22 /***
23  *** Prototypes 
24  ***/
25 xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
26                                         gras_socket_t sock);
27 xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
28                                         gras_socket_t sock);
29 xbt_error_t gras_trp_tcp_socket_accept(gras_socket_t  sock,
30                                         gras_socket_t *dst);
31
32 void         gras_trp_tcp_socket_close(gras_socket_t sd);
33   
34 xbt_error_t gras_trp_tcp_chunk_send(gras_socket_t sd,
35                                      const char *data,
36                                      long int size);
37
38 xbt_error_t gras_trp_tcp_chunk_recv(gras_socket_t sd,
39                                      char *data,
40                                      long int size);
41
42 void gras_trp_tcp_exit(gras_trp_plugin_t *plug);
43
44
45 static int TcpProtoNumber(void);
46 /***
47  *** Specific plugin part
48  ***/
49
50 typedef struct {
51   fd_set msg_socks;
52   fd_set meas_socks;
53 } gras_trp_tcp_plug_data_t;
54
55 /***
56  *** Specific socket part
57  ***/
58
59 typedef struct {
60   int buffsize;
61 } gras_trp_tcp_sock_data_t;
62
63
64 /***
65  *** Code
66  ***/
67 xbt_error_t gras_trp_tcp_setup(gras_trp_plugin_t *plug) {
68
69   gras_trp_tcp_plug_data_t *data = xbt_new(gras_trp_tcp_plug_data_t,1);
70
71   FD_ZERO(&(data->msg_socks));
72   FD_ZERO(&(data->meas_socks));
73
74   plug->socket_client = gras_trp_tcp_socket_client;
75   plug->socket_server = gras_trp_tcp_socket_server;
76   plug->socket_accept = gras_trp_tcp_socket_accept;
77   plug->socket_close  = gras_trp_tcp_socket_close;
78
79   plug->chunk_send    = gras_trp_tcp_chunk_send;
80   plug->chunk_recv    = gras_trp_tcp_chunk_recv;
81
82   plug->flush = NULL; /* nothing's cached */
83
84   plug->data = (void*)data;
85   plug->exit = gras_trp_tcp_exit;
86    
87   return no_error;
88 }
89
90 void gras_trp_tcp_exit(gras_trp_plugin_t *plug) {
91   DEBUG1("Exit plugin TCP (free %p)", plug->data);
92   free(plug->data);
93 }
94
95 xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
96                                         gras_socket_t sock){
97   
98   struct sockaddr_in addr;
99   struct hostent *he;
100   struct in_addr *haddr;
101   int size = sock->bufSize * 1024; 
102
103   sock->incoming = 1; /* TCP sockets are duplex'ed */
104
105   sock->sd = socket (AF_INET, SOCK_STREAM, 0);
106   
107   if (sock->sd < 0) {
108     RAISE1(system_error,
109            "Failed to create socket: %s",
110            sock_errstr);
111   }
112
113   if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) ||
114       setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
115      WARN1("setsockopt failed, cannot set buffer size: %s",sock_errstr);
116   }
117   
118   he = gethostbyname (sock->peer_name);
119   if (he == NULL) {
120     RAISE2(system_error,
121            "Failed to lookup hostname %s: %s",
122            sock->peer_name, sock_errstr);
123   }
124   
125   haddr = ((struct in_addr *) (he->h_addr_list)[0]);
126   
127   memset(&addr, 0, sizeof(struct sockaddr_in));
128   memcpy (&addr.sin_addr, haddr, sizeof(struct in_addr));
129   addr.sin_family = AF_INET;
130   addr.sin_port = htons (sock->peer_port);
131
132   if (connect (sock->sd, (struct sockaddr*) &addr, sizeof (addr)) < 0) {
133     tcp_close(sock->sd);
134     RAISE3(system_error,
135            "Failed to connect socket to %s:%d (%s)",
136            sock->peer_name, sock->peer_port, sock_errstr);
137   }
138   
139   return no_error;
140 }
141
142 /**
143  * gras_trp_tcp_socket_server:
144  *
145  * Open a socket used to receive messages.
146  */
147 xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
148                                         /* OUT */ gras_socket_t sock){
149   int size = sock->bufSize * 1024; 
150   int on = 1;
151   struct sockaddr_in server;
152
153   gras_trp_tcp_plug_data_t *tcp=(gras_trp_tcp_plug_data_t*)self->data;
154  
155   sock->outgoing  = 1; /* TCP => duplex mode */
156
157   server.sin_port = htons((u_short)sock->port);
158   server.sin_addr.s_addr = INADDR_ANY;
159   server.sin_family = AF_INET;
160   if((sock->sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
161     RAISE1(system_error,"Socket allocation failed: %s", sock_errstr);
162   }
163
164   if (setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on))) {
165      RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s",
166             sock_errstr);
167   }
168    
169   if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) ||
170       setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
171      WARN1("setsockopt failed, cannot set buffer size: %s",
172            sock_errstr);
173   }
174         
175   if (bind(sock->sd, (struct sockaddr *)&server, sizeof(server)) == -1) {
176     tcp_close(sock->sd);
177     RAISE2(system_error,"Cannot bind to port %d: %s",sock->port, sock_errstr);
178   }
179
180   if (listen(sock->sd, 5) < 0) {
181     tcp_close(sock->sd);
182     RAISE2(system_error,"Cannot listen to port %d: %s",sock->port,sock_errstr);
183   }
184
185   if (sock->meas)
186     FD_SET(sock->sd, &(tcp->meas_socks));
187   else
188     FD_SET(sock->sd, &(tcp->msg_socks));
189
190   DEBUG2("Openned a server socket on port %d (sock %d)",sock->port,sock->sd);
191   
192   return no_error;
193 }
194
195 xbt_error_t
196 gras_trp_tcp_socket_accept(gras_socket_t  sock,
197                            gras_socket_t *dst) {
198   gras_socket_t res;
199   
200   struct sockaddr_in peer_in;
201   socklen_t peer_in_len = sizeof(peer_in);
202
203   int sd;
204   int tmp_errno;
205   int size;
206                 
207   XBT_IN;
208   gras_trp_socket_new(1,&res);
209
210   sd = accept(sock->sd, (struct sockaddr *)&peer_in, &peer_in_len);
211   tmp_errno = errno;
212
213   if(sd == -1) {
214     gras_socket_close(sock);
215     RAISE1(system_error,
216            "Accept failed (%s). Droping server socket.", sock_errstr);
217   } else {
218     int i = 1;
219     socklen_t s = sizeof(int);
220   
221     if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) 
222         || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) {
223        RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s",
224               sock_errstr);
225     }
226
227     res->bufSize = sock->bufSize;
228     size = sock->bufSize * 1024;
229     if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size))
230        || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
231        WARN1("setsockopt failed, cannot set buffer size: %s",
232              sock_errstr);
233     }
234      
235     res->plugin    = sock->plugin;
236     res->incoming  = sock->incoming;
237     res->outgoing  = sock->outgoing;
238     res->accepting = 0;
239     res->sd        = sd;
240     res->port      = -1;
241     res->peer_port = peer_in.sin_port;
242
243     /* FIXME: Lock to protect inet_ntoa */
244     if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) {
245       res->peer_name = (char*)strdup("unknown");
246     } else {
247       struct in_addr addrAsInAddr;
248       char *tmp;
249  
250       addrAsInAddr.s_addr = peer_in.sin_addr.s_addr;
251       
252       tmp = inet_ntoa(addrAsInAddr);
253       if (tmp != NULL) {
254         res->peer_name = (char*)strdup(tmp);
255       } else {
256         res->peer_name = (char*)strdup("unknown");
257       }
258     }
259
260     VERB3("accepted socket %d to %s:%d", sd, res->peer_name,res->peer_port);
261     
262     *dst = res;
263
264     XBT_OUT;
265     return no_error;
266   }
267 }
268
269 void gras_trp_tcp_socket_close(gras_socket_t sock){
270   gras_trp_tcp_plug_data_t *tcp;
271   
272   if (!sock) return; /* close only once */
273   tcp=sock->plugin->data;
274
275   DEBUG1("close tcp connection %d", sock->sd);
276
277   /* FIXME: no pipe in GRAS so far  
278   if(!FD_ISSET(sd, &connectedPipes)) {
279     if(shutdown(sd, 2) < 0) {
280       GetNWSLock(&lock);
281       tmp_errno = errno;
282       ReleaseNWSLock(&lock);
283       
284       / * The other side may have beaten us to the reset. * /
285       if ((tmp_errno!=ENOTCONN) && (tmp_errno!=ECONNRESET)) {
286         WARN1("CloseSocket: shutdown error %d\n", tmp_errno);
287       }
288     }
289   } */
290
291 #ifndef HAVE_WINSOCK_H
292   /* forget about the socket 
293      ... but not when using winsock since accept'ed socket can not fit 
294      into the fd_set*/
295   if (sock->meas){
296     FD_CLR(sock->sd, &(tcp->meas_socks));
297   } else {
298     FD_CLR(sock->sd, &(tcp->msg_socks));
299   }
300 #endif
301    
302   /* close the socket */
303   if(tcp_close(sock->sd) < 0) {
304     WARN3("error while closing tcp socket %d: %d (%s)\n", 
305              sock->sd, sock_errno, sock_errstr);
306   }
307
308 }
309
310 /**
311  * gras_trp_tcp_chunk_send:
312  *
313  * Send data on a TCP socket
314  */
315 xbt_error_t 
316 gras_trp_tcp_chunk_send(gras_socket_t sock,
317                         const char *data,
318                         long int size) {
319   
320   /* TCP sockets are in duplex mode, don't check direction */
321   xbt_assert0(size >= 0, "Cannot send a negative amount of data");
322
323   while (size) {
324     int status = 0;
325     
326     status = tcp_write(sock->sd, data, (size_t)size);
327     DEBUG3("write(%d, %p, %ld);", sock->sd, data, size);
328     
329     if (status <= 0) {
330       RAISE4(system_error,"write(%d,%p,%ld) failed: %s",
331              sock->sd, data, size,
332              sock_errstr);
333     }
334     
335     if (status) {
336       size  -= status;
337       data  += status;
338     } else {
339       RAISE0(system_error,"file descriptor closed");
340     }
341   }
342
343   return no_error;
344 }
345 /**
346  * gras_trp_tcp_chunk_recv:
347  *
348  * Receive data on a TCP socket.
349  */
350 xbt_error_t 
351 gras_trp_tcp_chunk_recv(gras_socket_t sock,
352                         char *data,
353                         long int size) {
354
355   /* TCP sockets are in duplex mode, don't check direction */
356   xbt_assert0(sock, "Cannot recv on an NULL socket");
357   xbt_assert0(size >= 0, "Cannot receive a negative amount of data");
358   
359   while (size) {
360     int status = 0;
361     
362     status = tcp_read(sock->sd, data, (size_t)size);
363     DEBUG3("read(%d, %p, %ld);", sock->sd, data, size);
364     
365     if (status <= 0) {
366       RAISE4(system_error,"read(%d,%p,%d) failed: %s",
367              sock->sd, data, (int)size,
368              sock_errstr);
369     }
370     
371     if (status) {
372       size  -= status;
373       data  += status;
374     } else {
375       RAISE0(system_error,"file descriptor closed");
376     }
377   }
378   
379   return no_error;
380 }
381
382
383 /*
384  * Returns the tcp protocol number from the network protocol data base.
385  *
386  * getprotobyname() is not thread safe. We need to lock it.
387  */
388 static int TcpProtoNumber(void) {
389   struct protoent *fetchedEntry;
390   static int returnValue = 0;
391   
392   if(returnValue == 0) {
393     fetchedEntry = getprotobyname("tcp");
394     xbt_assert0(fetchedEntry, "getprotobyname(tcp) gave NULL");
395     returnValue = fetchedEntry->p_proto;
396   }
397   
398   return returnValue;
399 }
400
401 #if 0 /* KILLME */
402 /* Data exchange over measurement sockets. Placing this in there is a kind of crude hack.
403    It means that the only possible measurement sockets are TCP where we may want to do UDP for them. 
404    But I fail to find a good internal organization for now. We may want to split 
405    meas and regular sockets more efficiently.
406 */
407 xbt_error_t gras_socket_meas_exchange(gras_socket_t peer,
408                                       int sender,
409                                       unsigned int timeout,
410                                       unsigned long int exp_size,
411                                       unsigned long int msg_size) {
412    char *chunk;
413    int res_last, msg_sofar, exp_sofar;
414    
415    fd_set rd_set;
416 /*    int rv; */
417    
418    struct timeval timeOut;
419    
420    chunk = xbt_malloc(msg_size);
421
422    for   (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
423       for(msg_sofar=0; msg_sofar < msg_size; msg_sofar += res_last) {
424          
425          if(sender) {
426             res_last = send(peer->sd, chunk, msg_size - msg_sofar, 0);
427          } else {
428             res_last = 0;
429             FD_ZERO(&rd_set);
430             FD_SET(peer->sd,&rd_set);
431             timeOut.tv_sec = timeout;
432             timeOut.tv_usec = 0;
433                 
434             if (0 < select(peer->sd+1,&rd_set,NULL,NULL,&timeOut))
435               res_last = recv(peer->sd, chunk, msg_size-msg_sofar, 0);
436             
437          }
438          if (res_last == 0) {
439            /* No progress done, bail out */
440            free(chunk);
441            RAISE0(unknown_error,"Not exchanged a single byte, bailing out");
442          }
443       }
444    }
445    
446    free(chunk);
447    return no_error;
448 }
449 #endif
450
451 #ifdef HAVE_WINSOCK_H
452 #define RETSTR( x ) case x: return #x
453
454 const char *gras_wsa_err2string( int err ) {
455    switch( err ) {
456       RETSTR( WSAEINTR );
457       RETSTR( WSAEBADF );
458       RETSTR( WSAEACCES );
459       RETSTR( WSAEFAULT );
460       RETSTR( WSAEINVAL );
461       RETSTR( WSAEMFILE );
462       RETSTR( WSAEWOULDBLOCK );
463       RETSTR( WSAEINPROGRESS );
464       RETSTR( WSAEALREADY );
465       RETSTR( WSAENOTSOCK );
466       RETSTR( WSAEDESTADDRREQ );
467       RETSTR( WSAEMSGSIZE );
468       RETSTR( WSAEPROTOTYPE );
469       RETSTR( WSAENOPROTOOPT );
470       RETSTR( WSAEPROTONOSUPPORT );
471       RETSTR( WSAESOCKTNOSUPPORT );
472       RETSTR( WSAEOPNOTSUPP );
473       RETSTR( WSAEPFNOSUPPORT );
474       RETSTR( WSAEAFNOSUPPORT );
475       RETSTR( WSAEADDRINUSE );
476       RETSTR( WSAEADDRNOTAVAIL );
477       RETSTR( WSAENETDOWN );
478       RETSTR( WSAENETUNREACH );
479       RETSTR( WSAENETRESET );
480       RETSTR( WSAECONNABORTED );
481       RETSTR( WSAECONNRESET );
482       RETSTR( WSAENOBUFS );
483       RETSTR( WSAEISCONN );
484       RETSTR( WSAENOTCONN );
485       RETSTR( WSAESHUTDOWN );
486       RETSTR( WSAETOOMANYREFS );
487       RETSTR( WSAETIMEDOUT );
488       RETSTR( WSAECONNREFUSED );
489       RETSTR( WSAELOOP );
490       RETSTR( WSAENAMETOOLONG );
491       RETSTR( WSAEHOSTDOWN );
492       RETSTR( WSAEHOSTUNREACH );
493       RETSTR( WSAENOTEMPTY );
494       RETSTR( WSAEPROCLIM );
495       RETSTR( WSAEUSERS );
496       RETSTR( WSAEDQUOT );
497       RETSTR( WSAESTALE );
498       RETSTR( WSAEREMOTE );
499       RETSTR( WSASYSNOTREADY );
500       RETSTR( WSAVERNOTSUPPORTED );
501       RETSTR( WSANOTINITIALISED );
502       RETSTR( WSAEDISCON );
503       
504 #ifdef HAVE_WINSOCK2
505       RETSTR( WSAENOMORE );
506       RETSTR( WSAECANCELLED );
507       RETSTR( WSAEINVALIDPROCTABLE );
508       RETSTR( WSAEINVALIDPROVIDER );
509       RETSTR( WSASYSCALLFAILURE );
510       RETSTR( WSASERVICE_NOT_FOUND );
511       RETSTR( WSATYPE_NOT_FOUND );
512       RETSTR( WSA_E_NO_MORE );
513       RETSTR( WSA_E_CANCELLED );
514       RETSTR( WSAEREFUSED );
515 #endif /* HAVE_WINSOCK2 */
516
517       RETSTR( WSAHOST_NOT_FOUND );
518       RETSTR( WSATRY_AGAIN );
519       RETSTR( WSANO_RECOVERY );
520       RETSTR( WSANO_DATA );
521    }
522    return "unknown WSA error";
523 }
524 #endif /* HAVE_WINSOCK_H */