Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
use gras_trp_socket_new when accepting instead of mallocing ourself to get all the...
[simgrid.git] / src / gras / Transport / transport_plugin_tcp.c
index 490ceeb..5692504 100644 (file)
 #include <sys/wait.h>     /* waitpid() */
 #include <sys/socket.h>   /* getpeername() socket() */
 #include <stdlib.h>
-
+#include <string.h>       /* memset */
 
 #include "gras_private.h"
 #include "transport_private.h"
 
 GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport);
 
-typedef struct {
-  int buffsize;
-} gras_trp_tcp_sock_specific_t;
-
 /***
  *** Prototypes 
  ***/
 gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
                                        const char *host,
                                        unsigned short port,
-                                       unsigned int bufSize, 
-                                       /* OUT */ gras_socket_t **dst);
+                                       /* OUT */ gras_socket_t *sock);
 gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
                                        unsigned short port,
-                                       unsigned int bufSize, 
-                                       /* OUT */ gras_socket_t **dst);
+                                       /* OUT */ gras_socket_t *sock);
 gras_error_t gras_trp_tcp_socket_accept(gras_socket_t  *sock,
                                        gras_socket_t **dst);
 
 void         gras_trp_tcp_socket_close(gras_socket_t *sd);
   
-gras_error_t gras_trp_tcp_bloc_send(gras_socket_t *sd,
-                                   char *data,
-                                   size_t size);
+gras_error_t gras_trp_tcp_chunk_send(gras_socket_t *sd,
+                                    char *data,
+                                    size_t size);
 
-gras_error_t gras_trp_tcp_bloc_recv(gras_socket_t *sd,
-                                   char *data,
-                                   size_t size);
+gras_error_t gras_trp_tcp_chunk_recv(gras_socket_t *sd,
+                                    char *data,
+                                    size_t size);
 
-void         gras_trp_tcp_free_specific(void *s);
+void gras_trp_tcp_exit(gras_trp_plugin_t *plug);
 
 
 static int TcpProtoNumber(void);
@@ -64,133 +58,139 @@ static int TcpProtoNumber(void);
  ***/
 
 typedef struct {
-  fd_set incoming_socks;
-} gras_trp_tcp_specific_t;
+  fd_set msg_socks;
+  fd_set raw_socks;
+} gras_trp_tcp_plug_data_t;
 
 /***
  *** Specific socket part
  ***/
 
+typedef struct {
+  int buffsize;
+} gras_trp_tcp_sock_data_t;
+
 
 /***
  *** Code
  ***/
 gras_error_t
-gras_trp_tcp_init(gras_trp_plugin_t **dst) {
+gras_trp_tcp_setup(gras_trp_plugin_t *plug) {
 
-  gras_trp_plugin_t *res=malloc(sizeof(gras_trp_plugin_t));
-  gras_trp_tcp_specific_t *tcp = malloc(sizeof(gras_trp_tcp_specific_t));
-  if (!res || !tcp)
+  gras_trp_tcp_plug_data_t *data = malloc(sizeof(gras_trp_tcp_plug_data_t));
+  if (!data)
     RAISE_MALLOC;
 
-  FD_ZERO(&(tcp->incoming_socks));
+  FD_ZERO(&(data->msg_socks));
+  FD_ZERO(&(data->raw_socks));
+
+  plug->socket_client = gras_trp_tcp_socket_client;
+  plug->socket_server = gras_trp_tcp_socket_server;
+  plug->socket_accept = gras_trp_tcp_socket_accept;
+  plug->socket_close  = gras_trp_tcp_socket_close;
 
-  res->socket_client = gras_trp_tcp_socket_client;
-  res->socket_server = gras_trp_tcp_socket_server;
-  res->socket_accept = gras_trp_tcp_socket_accept;
-  res->socket_close  = gras_trp_tcp_socket_close;
+  plug->chunk_send    = gras_trp_tcp_chunk_send;
+  plug->chunk_recv    = gras_trp_tcp_chunk_recv;
 
-  res->specific      = (void*)tcp;
-  res->free_specific = gras_trp_tcp_free_specific;
+  plug->data = (void*)data;
+  plug->exit = gras_trp_tcp_exit;
 
-  *dst = res;
   return no_error;
 }
 
-void gras_trp_tcp_free_specific(void *s) {
-  gras_trp_tcp_specific_t *specific = s;
-  free(specific);
+void gras_trp_tcp_exit(gras_trp_plugin_t *plug) {
+  DEBUG1("Exit plugin TCP (free %p)", plug->data);
+  free(plug->data);
 }
 
 gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
                                        const char *host,
                                        unsigned short port,
-                                       unsigned int bufSize, 
-                                       /* OUT */ gras_socket_t **dst){
+                                       /* OUT */ gras_socket_t *sock){
   
-  int addrCount;
-  IPAddress addresses[10];
-  int i;
-  int sd;
-  
-  if (!(*sock=malloc(sizeof(gras_socket_t))))
-    RAISE_MALLOC;
+  struct sockaddr_in addr;
+  struct hostent *he;
+  struct in_addr *haddr;
+
+  sock->incoming = 1; /* TCP sockets are duplex'ed */
+
+  sock->sd = socket (AF_INET, SOCK_STREAM, 0);
   
-  (*sock)->peer_addr=NULL;
+  if (sock->sd < 0) {
+    RAISE1(system_error,
+          "Failed to create socket: %s",
+          strerror (errno));
+  }
   
-  if (!(addrCount = IPAddressValues(host, addresses, 10))) {
+  he = gethostbyname (host);
+  if (he == NULL) {
     RAISE2(system_error,
-         "tcp address retrieval of '%s' failed: %s",
-          host,strerror(errno));
+          "Failed to lookup hostname %s: %s",
+          host, strerror (errno));
   }
   
-  for(i = 0; i < addrCount && i<10 ; i++) {
-    if(CallAddr(addresses[i], port, &sd, -1)) {
-      (*sock)->sock = sd;
-      (*sock)->port = port;
-      return no_error;
-    }
+  haddr = ((struct in_addr *) (he->h_addr_list)[0]);
+  
+  memset(&addr, 0, sizeof(struct sockaddr_in));
+  memcpy (&addr.sin_addr, haddr, sizeof(struct in_addr));
+  addr.sin_family = AF_INET;
+  addr.sin_port = htons (port);
+
+  if (connect (sock->sd, (struct sockaddr*) &addr, sizeof (addr)) < 0) {
+    close(sock->sd);
+    RAISE3(system_error,
+          "Failed to connect socket to %s:%d (%s)",
+          host, port, strerror (errno));
   }
-  free(*sock);
-  RAISE2(system_error,"Something wicked happenned while connecting to %s:%d",
-          host,port);
+  
+  return no_error;
 }
 
 /**
  * gras_trp_tcp_socket_server:
  *
- * Open a socket used to receive messages. bufSize is in ko.
+ * Open a socket used to receive messages.
  */
 gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
                                        unsigned short port,
-                                       unsigned int bufSize, 
-                                       /* OUT */ gras_socket_t **dst){
-  int size = bufSize * 1024;
+                                       /* OUT */ gras_socket_t *sock){
+//  int size = bufSize * 1024;
   int on = 1;
-  int sd = -1;
   struct sockaddr_in server;
 
-  gras_socket_t *res;
-  gras_trp_tcp_specific_t *data=(gras_trp_tcp_specific_t*)self -> specific;
+  gras_trp_tcp_plug_data_t *tcp=(gras_trp_tcp_plug_data_t*)self->data;
  
-  res=malloc(sizeof(gras_socket_t));
-  if (!res)
-    RAISE_MALLOC;
+  sock->outgoing  = 1; /* TCP => duplex mode */
 
   server.sin_port = htons((u_short)port);
   server.sin_addr.s_addr = INADDR_ANY;
   server.sin_family = AF_INET;
-  if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-    free(res);
+  if((sock->sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
     RAISE1(system_error,"socket allocation failed: %s", strerror(errno));
   }
 
-  (void)setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));
+  (void)setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, 
+                  (char *)&on, sizeof(on));
+   /*
   (void)setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size));
   (void)setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size));
-  if (bind(sd, (struct sockaddr *)&server, sizeof(server)) == -1) {
-    free(res);
-    close(sd);
+    */
+  if (bind(sock->sd, (struct sockaddr *)&server, sizeof(server)) == -1) {
+    close(sock->sd);
     RAISE2(system_error,"Cannot bind to port %d: %s",port, strerror(errno));
   }
 
-  if (listen(sd, 5) != -1) {
-    free(res);
-    close(sd);
+  if (listen(sock->sd, 5) < 0) {
+    close(sock->sd);
     RAISE2(system_error,"Cannot listen to port %d: %s",port,strerror(errno));
   }
 
-  FD_SET(sd, &(data->incoming_socks));
+  if (sock->raw)
+    FD_SET(sock->sd, &(tcp->raw_socks));
+  else
+    FD_SET(sock->sd, &(tcp->msg_socks));
 
-  *dst=res;
-  res->plugin = self;
-  res->incoming = 1;
-  res->sd = sd;
-  res->port=port;
-  res->peer_port=-1;
-  res->peer_name=NULL;
-
-  DEBUG2("Openned a server socket on port %d (sock %d)",port,sd);
+  DEBUG2("Openned a server socket on port %d (sock %d)",port,sock->sd);
   
   return no_error;
 }
@@ -199,16 +199,15 @@ gras_error_t
 gras_trp_tcp_socket_accept(gras_socket_t  *sock,
                           gras_socket_t **dst) {
   gras_socket_t *res;
+  gras_error_t errcode;
   
   struct sockaddr_in peer_in;
   socklen_t peer_in_len = sizeof(peer_in);
 
   int sd;
   int tmp_errno;
-                               
-  res=malloc(sizeof(gras_socket_t));
-  if (!res)
-    RAISE_MALLOC;
+                       
+  TRY(gras_trp_socket_new(1,&res));
 
   sd = accept(sock->sd, (struct sockaddr *)&peer_in, &peer_in_len);
   tmp_errno = errno;
@@ -223,21 +222,26 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
   
     if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) 
        || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) {
-      WARNING0("setsockopt failed, cannot condition the accepted socket");
+      WARN0("setsockopt failed, cannot condition the accepted socket");
     }
  
-    i = ((gras_trp_tcp_sock_specific_t*)sock->specific)->buffsize;
-    if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&i, s)
-       || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&i, s)) {
-      WARNING0("setsockopt failed, cannot set buffsize");      
-    }
-    res->plugin = sock->plugin;
-    res->incoming = 1;
-    res->sd = sd;
-    res->port= -1;
-    res->peer_port= peer_in.sin_port;
-
+    /* FIXME: bufSize removed until we can have optionsets 
+       i = ((gras_trp_tcp_sock_specific_t*)sock->specific)->buffsize;
+       if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&i, s)
+       || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&i, s)) {
+       WARNING0("setsockopt failed, cannot set buffsize");     
+       }
+    */
+     
+    res->plugin    = sock->plugin;
+    res->incoming  = sock->incoming;
+    res->outgoing  = sock->outgoing;
+    res->accepting = 0;
+    res->sd        = sd;
+    res->port      = -1;
+    res->peer_port = peer_in.sin_port;
+
+    /* FIXME: Lock to protect inet_ntoa */
     if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) {
       res->peer_name = strdup("unknown");
     } else {
@@ -248,7 +252,7 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
       
       tmp = inet_ntoa(addrAsInAddr);
       if (tmp != NULL) {
-       res->peer_name = strdup(inet_ntoa(addrAsInAddr));
+       res->peer_name = strdup(tmp);
       } else {
        res->peer_name = strdup("unknown");
       }
@@ -263,12 +267,12 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
 }
 
 void gras_trp_tcp_socket_close(gras_socket_t *sock){
-  gras_trp_tcp_specific_t *tcp;
+  gras_trp_tcp_plug_data_t *tcp;
   
   if (!sock) return; /* close only once */
-  tcp=sock->plugin->specific;
+  tcp=sock->plugin->data;
 
-  DEBUG1("close tcp connection %d\n", sock->sd);
+  DEBUG1("close tcp connection %d", sock->sd);
 
   /* FIXME: no pipe in GRAS so far  
   if(!FD_ISSET(sd, &connectedPipes)) {
@@ -284,14 +288,17 @@ void gras_trp_tcp_socket_close(gras_socket_t *sock){
     }
   } */
 
+  /* forget about the socket */
+  if (sock->raw)
+    FD_CLR(sock->sd, &(tcp->raw_socks));
+  else
+    FD_CLR(sock->sd, &(tcp->msg_socks));
+
   /* close the socket */
   if(close(sock->sd) < 0) {
-    WARNING3("error while closing tcp socket %d: %d (%s)\n", sock->sd, errno, strerror(errno));
+    WARN3("error while closing tcp socket %d: %d (%s)\n", 
+            sock->sd, errno, strerror(errno));
   }
-
-  /* forget about it */
-  FD_CLR(sock->sd, &(tcp->incoming_socks));
-
 }
 
 /**
@@ -304,15 +311,14 @@ gras_trp_tcp_chunk_send(gras_socket_t *sock,
                    char *data,
                    size_t size) {
   
-  /*  gras_assert0(sock && !sock->incoming,
-      "Asked to send stuff on an incomming socket");*/
+  /* TCP sockets are in duplex mode, don't check direction */
   gras_assert0(size >= 0, "Cannot send a negative amount of data");
 
   while (size) {
     int status = 0;
     
     status = write(sock->sd, data, (size_t)size);
-    DEBUG3("write(%d, %p, %ld);\n", sock->sd, data, size);
+    DEBUG3("write(%d, %p, %ld);", sock->sd, data, size);
     
     if (status == -1) {
       RAISE4(system_error,"write(%d,%p,%d) failed: %s",
@@ -339,15 +345,16 @@ gras_error_t
 gras_trp_tcp_chunk_recv(gras_socket_t *sock,
                        char *data,
                        size_t size) {
-  gras_assert0(sock && !sock->incoming,
-              "Ascked to receive stuff on an outcomming socket");
+
+  /* TCP sockets are in duplex mode, don't check direction */
+  gras_assert0(sock, "Cannot recv on an NULL socket");
   gras_assert0(size >= 0, "Cannot receive a negative amount of data");
   
   while (size) {
     int status = 0;
     
     status = read(sock->sd, data, (size_t)size);
-    DEBUG3("read(%d, %p, %ld);\n", sock->sd, data, size);
+    DEBUG3("read(%d, %p, %ld);", sock->sd, data, size);
     
     if (status == -1) {
       RAISE4(system_error,"read(%d,%p,%d) failed: %s",