Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
sizes are unsigned long int
[simgrid.git] / src / gras / Transport / transport_plugin_tcp.c
index 56aa78e..ee776c8 100644 (file)
@@ -2,48 +2,42 @@
 
 /* tcp trp (transport) - send/receive a bunch of bytes from a tcp socket    */
 
-/* Authors: Martin Quinson                                                  */
-/* Copyright (C) 2004 Martin Quinson.                                       */
+/* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
 
 /* This program is free software; you can redistribute it and/or modify it
-   under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include <unistd.h>       /* close() pipe() read() write() */
-#include <signal.h>       /* close() pipe() read() write() */
-#include <netinet/in.h>   /* sometimes required for #include <arpa/inet.h> */
-#include <netinet/tcp.h>  /* TCP_NODELAY */
-#include <arpa/inet.h>    /* inet_ntoa() */
-#include <netdb.h>        /* getprotobyname() */
-#include <sys/time.h>     /* struct timeval */
-#include <errno.h>        /* errno */
-#include <sys/wait.h>     /* waitpid() */
-#include <sys/socket.h>   /* getpeername() socket() */
-#include <stdlib.h>
-#include <string.h>       /* memset */
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "portable.h"
+
+#if 0
+#  include <signal.h>       /* close() pipe() read() write() */
+#  include <sys/wait.h>     /* waitpid() */
+#endif
+
 
 #include "transport_private.h"
 
-GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport,"TCP transport");
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport,"TCP transport");
 
 /***
  *** Prototypes 
  ***/
-gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
-                                       /* OUT */ gras_socket_t *sock);
-gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
-                                       /* OUT */ gras_socket_t *sock);
-gras_error_t gras_trp_tcp_socket_accept(gras_socket_t  *sock,
-                                       gras_socket_t **dst);
-
-void         gras_trp_tcp_socket_close(gras_socket_t *sd);
+xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
+                                       gras_socket_t sock);
+xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
+                                       gras_socket_t sock);
+xbt_error_t gras_trp_tcp_socket_accept(gras_socket_t  sock,
+                                       gras_socket_t *dst);
+
+void         gras_trp_tcp_socket_close(gras_socket_t sd);
   
-gras_error_t gras_trp_tcp_chunk_send(gras_socket_t *sd,
-                                    const char *data,
-                                    long int size);
+xbt_error_t gras_trp_tcp_chunk_send(gras_socket_t sd,
+                                   const char *data,
+                                   unsigned long int size);
 
-gras_error_t gras_trp_tcp_chunk_recv(gras_socket_t *sd,
-                                    char *data,
-                                    long int size);
+xbt_error_t gras_trp_tcp_chunk_recv(gras_socket_t sd,
+                                   char *data,
+                                   unsigned long int size);
 
 void gras_trp_tcp_exit(gras_trp_plugin_t *plug);
 
@@ -55,7 +49,7 @@ static int TcpProtoNumber(void);
 
 typedef struct {
   fd_set msg_socks;
-  fd_set raw_socks;
+  fd_set meas_socks;
 } gras_trp_tcp_plug_data_t;
 
 /***
@@ -70,12 +64,12 @@ typedef struct {
 /***
  *** Code
  ***/
-gras_error_t gras_trp_tcp_setup(gras_trp_plugin_t *plug) {
+xbt_error_t gras_trp_tcp_setup(gras_trp_plugin_t *plug) {
 
-  gras_trp_tcp_plug_data_t *data = gras_new(gras_trp_tcp_plug_data_t,1);
+  gras_trp_tcp_plug_data_t *data = xbt_new(gras_trp_tcp_plug_data_t,1);
 
   FD_ZERO(&(data->msg_socks));
-  FD_ZERO(&(data->raw_socks));
+  FD_ZERO(&(data->meas_socks));
 
   plug->socket_client = gras_trp_tcp_socket_client;
   plug->socket_server = gras_trp_tcp_socket_server;
@@ -89,7 +83,7 @@ gras_error_t gras_trp_tcp_setup(gras_trp_plugin_t *plug) {
 
   plug->data = (void*)data;
   plug->exit = gras_trp_tcp_exit;
-
+   
   return no_error;
 }
 
@@ -98,8 +92,8 @@ void gras_trp_tcp_exit(gras_trp_plugin_t *plug) {
   free(plug->data);
 }
 
-gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
-                                       /* OUT */ gras_socket_t *sock){
+xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
+                                       gras_socket_t sock){
   
   struct sockaddr_in addr;
   struct hostent *he;
@@ -113,20 +107,19 @@ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
   if (sock->sd < 0) {
     RAISE1(system_error,
           "Failed to create socket: %s",
-          strerror (errno));
+          sock_errstr);
   }
 
   if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) ||
       setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
-     WARN1("setsockopt failed, cannot set buffer size: %s",
-          strerror(errno));
+     WARN1("setsockopt failed, cannot set buffer size: %s",sock_errstr);
   }
   
   he = gethostbyname (sock->peer_name);
   if (he == NULL) {
     RAISE2(system_error,
           "Failed to lookup hostname %s: %s",
-          sock->peer_name, strerror (errno));
+          sock->peer_name, sock_errstr);
   }
   
   haddr = ((struct in_addr *) (he->h_addr_list)[0]);
@@ -136,11 +129,12 @@ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
   addr.sin_family = AF_INET;
   addr.sin_port = htons (sock->peer_port);
 
+  DEBUG2("Connect to %s:%d",sock->peer_name, sock->peer_port);
   if (connect (sock->sd, (struct sockaddr*) &addr, sizeof (addr)) < 0) {
-    close(sock->sd);
+    tcp_close(sock->sd);
     RAISE3(system_error,
           "Failed to connect socket to %s:%d (%s)",
-          sock->peer_name, sock->peer_port, strerror (errno));
+          sock->peer_name, sock->peer_port, sock_errstr);
   }
   
   return no_error;
@@ -151,8 +145,8 @@ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
  *
  * Open a socket used to receive messages.
  */
-gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
-                                       /* OUT */ gras_socket_t *sock){
+xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
+                                       /* OUT */ gras_socket_t sock){
   int size = sock->bufSize * 1024; 
   int on = 1;
   struct sockaddr_in server;
@@ -165,32 +159,33 @@ gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
   server.sin_addr.s_addr = INADDR_ANY;
   server.sin_family = AF_INET;
   if((sock->sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-    RAISE1(system_error,"Socket allocation failed: %s", strerror(errno));
+    RAISE1(system_error,"Socket allocation failed: %s", sock_errstr);
   }
 
   if (setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on))) {
      RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s",
-           strerror(errno));
+           sock_errstr);
   }
    
   if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) ||
       setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
      WARN1("setsockopt failed, cannot set buffer size: %s",
-          strerror(errno));
+          sock_errstr);
   }
        
   if (bind(sock->sd, (struct sockaddr *)&server, sizeof(server)) == -1) {
-    close(sock->sd);
-    RAISE2(system_error,"Cannot bind to port %d: %s",sock->port, strerror(errno));
+    tcp_close(sock->sd);
+    RAISE2(system_error,"Cannot bind to port %d: %s",sock->port, sock_errstr);
   }
 
+  DEBUG1("Listen on port %d",sock->port);
   if (listen(sock->sd, 5) < 0) {
-    close(sock->sd);
-    RAISE2(system_error,"Cannot listen to port %d: %s",sock->port,strerror(errno));
+    tcp_close(sock->sd);
+    RAISE2(system_error,"Cannot listen on port %d: %s",sock->port,sock_errstr);
   }
 
-  if (sock->raw)
-    FD_SET(sock->sd, &(tcp->raw_socks));
+  if (sock->meas)
+    FD_SET(sock->sd, &(tcp->meas_socks));
   else
     FD_SET(sock->sd, &(tcp->msg_socks));
 
@@ -199,11 +194,10 @@ gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
   return no_error;
 }
 
-gras_error_t
-gras_trp_tcp_socket_accept(gras_socket_t  *sock,
-                          gras_socket_t **dst) {
-  gras_socket_t *res;
-  gras_error_t errcode;
+xbt_error_t
+gras_trp_tcp_socket_accept(gras_socket_t  sock,
+                          gras_socket_t *dst) {
+  gras_socket_t res;
   
   struct sockaddr_in peer_in;
   socklen_t peer_in_len = sizeof(peer_in);
@@ -211,16 +205,17 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
   int sd;
   int tmp_errno;
   int size;
-                       
+               
+  XBT_IN;
   gras_trp_socket_new(1,&res);
 
   sd = accept(sock->sd, (struct sockaddr *)&peer_in, &peer_in_len);
   tmp_errno = errno;
 
-  if(sd == -1) {
+  if (sd == -1) {
     gras_socket_close(sock);
     RAISE1(system_error,
-          "Accept failed (%s). Droping server socket.", strerror(tmp_errno));
+          "Accept failed (%s). Droping server socket.", sock_errstr);
   } else {
     int i = 1;
     socklen_t s = sizeof(int);
@@ -228,15 +223,15 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
     if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) 
        || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) {
        RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s",
-             strerror(errno));
+             sock_errstr);
     }
-    (*dst)->bufSize = sock->bufSize;
+
+    res->bufSize = sock->bufSize;
     size = sock->bufSize * 1024;
     if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size))
        || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
        WARN1("setsockopt failed, cannot set buffer size: %s",
-            strerror(errno));
+            sock_errstr);
     }
      
     res->plugin    = sock->plugin;
@@ -264,15 +259,16 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
       }
     }
 
-    VERB3("accepted socket %d to %s:%d", sd, res->peer_name,res->peer_port);
+    VERB3("Accepted socket %d to %s:%d", sd, res->peer_name,res->peer_port);
     
     *dst = res;
 
+    XBT_OUT;
     return no_error;
   }
 }
 
-void gras_trp_tcp_socket_close(gras_socket_t *sock){
+void gras_trp_tcp_socket_close(gras_socket_t sock){
   gras_trp_tcp_plug_data_t *tcp;
   
   if (!sock) return; /* close only once */
@@ -294,17 +290,23 @@ void gras_trp_tcp_socket_close(gras_socket_t *sock){
     }
   } */
 
-  /* forget about the socket */
-  if (sock->raw)
-    FD_CLR(sock->sd, &(tcp->raw_socks));
-  else
+#ifndef HAVE_WINSOCK_H
+  /* forget about the socket 
+     ... but not when using winsock since accept'ed socket can not fit 
+     into the fd_set*/
+  if (sock->meas){
+    FD_CLR(sock->sd, &(tcp->meas_socks));
+  } else {
     FD_CLR(sock->sd, &(tcp->msg_socks));
-
+  }
+#endif
+   
   /* close the socket */
-  if(close(sock->sd) < 0) {
+  if(tcp_close(sock->sd) < 0) {
     WARN3("error while closing tcp socket %d: %d (%s)\n", 
-            sock->sd, errno, strerror(errno));
+            sock->sd, sock_errno, sock_errstr);
   }
+
 }
 
 /**
@@ -312,31 +314,32 @@ void gras_trp_tcp_socket_close(gras_socket_t *sock){
  *
  * Send data on a TCP socket
  */
-gras_error_t 
-gras_trp_tcp_chunk_send(gras_socket_t *sock,
+xbt_error_t 
+gras_trp_tcp_chunk_send(gras_socket_t sock,
                        const char *data,
-                       long int size) {
+                       unsigned long int size) {
   
   /* TCP sockets are in duplex mode, don't check direction */
-  gras_assert0(size >= 0, "Cannot send a negative amount of data");
+  xbt_assert0(size >= 0, "Cannot send a negative amount of data");
 
   while (size) {
     int status = 0;
     
-    status = write(sock->sd, data, (size_t)size);
+    status = tcp_write(sock->sd, data, (size_t)size);
     DEBUG3("write(%d, %p, %ld);", sock->sd, data, size);
     
-    if (status == -1) {
+    if (status < 0) {
       RAISE4(system_error,"write(%d,%p,%ld) failed: %s",
             sock->sd, data, size,
-            strerror(errno));
+            sock_errstr);
     }
     
     if (status) {
       size  -= status;
       data  += status;
     } else {
-      RAISE0(system_error,"file descriptor closed");
+      RAISE1(system_error,"file descriptor closed (%s)",
+             sock_errstr);
     }
   }
 
@@ -347,32 +350,32 @@ gras_trp_tcp_chunk_send(gras_socket_t *sock,
  *
  * Receive data on a TCP socket.
  */
-gras_error_t 
-gras_trp_tcp_chunk_recv(gras_socket_t *sock,
+xbt_error_t 
+gras_trp_tcp_chunk_recv(gras_socket_t sock,
                        char *data,
-                       long int size) {
+                       unsigned long int size) {
 
   /* TCP sockets are in duplex mode, don't check direction */
-  gras_assert0(sock, "Cannot recv on an NULL socket");
-  gras_assert0(size >= 0, "Cannot receive a negative amount of data");
+  xbt_assert0(sock, "Cannot recv on an NULL socket");
+  xbt_assert0(size >= 0, "Cannot receive a negative amount of data");
   
   while (size) {
     int status = 0;
     
-    status = read(sock->sd, data, (size_t)size);
     DEBUG3("read(%d, %p, %ld);", sock->sd, data, size);
+    status = tcp_read(sock->sd, data, (size_t)size);
     
-    if (status == -1) {
+    if (status < 0) {
       RAISE4(system_error,"read(%d,%p,%d) failed: %s",
             sock->sd, data, (int)size,
-            strerror(errno));
+            sock_errstr);
     }
     
     if (status) {
       size  -= status;
       data  += status;
     } else {
-      RAISE0(system_error,"file descriptor closed");
+      RAISE0(system_error,"file descriptor closed (nothing read on the socket)");
     }
   }
   
@@ -391,19 +394,20 @@ static int TcpProtoNumber(void) {
   
   if(returnValue == 0) {
     fetchedEntry = getprotobyname("tcp");
-    gras_assert0(fetchedEntry, "getprotobyname(tcp) gave NULL");
+    xbt_assert0(fetchedEntry, "getprotobyname(tcp) gave NULL");
     returnValue = fetchedEntry->p_proto;
   }
   
   return returnValue;
 }
 
-/* Data exchange over raw sockets. Placing this in there is a kind of crude hack.
-   It means that the only possible raw are TCP where we may want to do UDP for them. 
+#if 0 /* KILLME */
+/* Data exchange over measurement sockets. Placing this in there is a kind of crude hack.
+   It means that the only possible measurement sockets are TCP where we may want to do UDP for them. 
    But I fail to find a good internal organization for now. We may want to split 
-   raw and regular sockets more efficiently.
+   meas and regular sockets more efficiently.
 */
-gras_error_t gras_socket_raw_exchange(gras_socket_t *peer,
+xbt_error_t gras_socket_meas_exchange(gras_socket_t peer,
                                      int sender,
                                      unsigned int timeout,
                                      unsigned long int exp_size,
@@ -412,11 +416,11 @@ gras_error_t gras_socket_raw_exchange(gras_socket_t *peer,
    int res_last, msg_sofar, exp_sofar;
    
    fd_set rd_set;
-   int rv;
+/*    int rv; */
    
    struct timeval timeOut;
    
-   chunk = gras_malloc(msg_size);
+   chunk = xbt_malloc(msg_size);
 
    for   (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
       for(msg_sofar=0; msg_sofar < msg_size; msg_sofar += res_last) {
@@ -436,12 +440,88 @@ gras_error_t gras_socket_raw_exchange(gras_socket_t *peer,
         }
         if (res_last == 0) {
           /* No progress done, bail out */
-          gras_free(chunk);
+          free(chunk);
           RAISE0(unknown_error,"Not exchanged a single byte, bailing out");
         }
       }
    }
    
-   gras_free(chunk);
+   free(chunk);
    return no_error;
 }
+#endif
+
+#ifdef HAVE_WINSOCK_H
+#define RETSTR( x ) case x: return #x
+
+const char *gras_wsa_err2string( int err ) {
+   switch( err ) {
+      RETSTR( WSAEINTR );
+      RETSTR( WSAEBADF );
+      RETSTR( WSAEACCES );
+      RETSTR( WSAEFAULT );
+      RETSTR( WSAEINVAL );
+      RETSTR( WSAEMFILE );
+      RETSTR( WSAEWOULDBLOCK );
+      RETSTR( WSAEINPROGRESS );
+      RETSTR( WSAEALREADY );
+      RETSTR( WSAENOTSOCK );
+      RETSTR( WSAEDESTADDRREQ );
+      RETSTR( WSAEMSGSIZE );
+      RETSTR( WSAEPROTOTYPE );
+      RETSTR( WSAENOPROTOOPT );
+      RETSTR( WSAEPROTONOSUPPORT );
+      RETSTR( WSAESOCKTNOSUPPORT );
+      RETSTR( WSAEOPNOTSUPP );
+      RETSTR( WSAEPFNOSUPPORT );
+      RETSTR( WSAEAFNOSUPPORT );
+      RETSTR( WSAEADDRINUSE );
+      RETSTR( WSAEADDRNOTAVAIL );
+      RETSTR( WSAENETDOWN );
+      RETSTR( WSAENETUNREACH );
+      RETSTR( WSAENETRESET );
+      RETSTR( WSAECONNABORTED );
+      RETSTR( WSAECONNRESET );
+      RETSTR( WSAENOBUFS );
+      RETSTR( WSAEISCONN );
+      RETSTR( WSAENOTCONN );
+      RETSTR( WSAESHUTDOWN );
+      RETSTR( WSAETOOMANYREFS );
+      RETSTR( WSAETIMEDOUT );
+      RETSTR( WSAECONNREFUSED );
+      RETSTR( WSAELOOP );
+      RETSTR( WSAENAMETOOLONG );
+      RETSTR( WSAEHOSTDOWN );
+      RETSTR( WSAEHOSTUNREACH );
+      RETSTR( WSAENOTEMPTY );
+      RETSTR( WSAEPROCLIM );
+      RETSTR( WSAEUSERS );
+      RETSTR( WSAEDQUOT );
+      RETSTR( WSAESTALE );
+      RETSTR( WSAEREMOTE );
+      RETSTR( WSASYSNOTREADY );
+      RETSTR( WSAVERNOTSUPPORTED );
+      RETSTR( WSANOTINITIALISED );
+      RETSTR( WSAEDISCON );
+      
+#ifdef HAVE_WINSOCK2
+      RETSTR( WSAENOMORE );
+      RETSTR( WSAECANCELLED );
+      RETSTR( WSAEINVALIDPROCTABLE );
+      RETSTR( WSAEINVALIDPROVIDER );
+      RETSTR( WSASYSCALLFAILURE );
+      RETSTR( WSASERVICE_NOT_FOUND );
+      RETSTR( WSATYPE_NOT_FOUND );
+      RETSTR( WSA_E_NO_MORE );
+      RETSTR( WSA_E_CANCELLED );
+      RETSTR( WSAEREFUSED );
+#endif /* HAVE_WINSOCK2        */
+
+      RETSTR( WSAHOST_NOT_FOUND );
+      RETSTR( WSATRY_AGAIN );
+      RETSTR( WSANO_RECOVERY );
+      RETSTR( WSANO_DATA );
+   }
+   return "unknown WSA error";
+}
+#endif /* HAVE_WINSOCK_H */