Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
- Reput hook for raw sockets, needed for BW experiments
[simgrid.git] / src / gras / Transport / transport_plugin_tcp.c
index 490ceeb..4d29dbf 100644 (file)
@@ -19,7 +19,7 @@
 #include <sys/wait.h>     /* waitpid() */
 #include <sys/socket.h>   /* getpeername() socket() */
 #include <stdlib.h>
 #include <sys/wait.h>     /* waitpid() */
 #include <sys/socket.h>   /* getpeername() socket() */
 #include <stdlib.h>
-
+#include <string.h>       /* memset */
 
 #include "gras_private.h"
 #include "transport_private.h"
 
 #include "gras_private.h"
 #include "transport_private.h"
@@ -36,24 +36,24 @@ typedef struct {
 gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
                                        const char *host,
                                        unsigned short port,
 gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
                                        const char *host,
                                        unsigned short port,
-                                       unsigned int bufSize, 
-                                       /* OUT */ gras_socket_t **dst);
+                                       int raw,
+                                       /* OUT */ gras_socket_t *sock);
 gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
                                        unsigned short port,
 gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
                                        unsigned short port,
-                                       unsigned int bufSize, 
-                                       /* OUT */ gras_socket_t **dst);
+                                       int raw,
+                                       /* OUT */ gras_socket_t *sock);
 gras_error_t gras_trp_tcp_socket_accept(gras_socket_t  *sock,
                                        gras_socket_t **dst);
 
 void         gras_trp_tcp_socket_close(gras_socket_t *sd);
   
 gras_error_t gras_trp_tcp_socket_accept(gras_socket_t  *sock,
                                        gras_socket_t **dst);
 
 void         gras_trp_tcp_socket_close(gras_socket_t *sd);
   
-gras_error_t gras_trp_tcp_bloc_send(gras_socket_t *sd,
-                                   char *data,
-                                   size_t size);
+gras_error_t gras_trp_tcp_chunk_send(gras_socket_t *sd,
+                                    char *data,
+                                    size_t size);
 
 
-gras_error_t gras_trp_tcp_bloc_recv(gras_socket_t *sd,
-                                   char *data,
-                                   size_t size);
+gras_error_t gras_trp_tcp_chunk_recv(gras_socket_t *sd,
+                                    char *data,
+                                    size_t size);
 
 void         gras_trp_tcp_free_specific(void *s);
 
 
 void         gras_trp_tcp_free_specific(void *s);
 
@@ -85,11 +85,15 @@ gras_trp_tcp_init(gras_trp_plugin_t **dst) {
 
   FD_ZERO(&(tcp->incoming_socks));
 
 
   FD_ZERO(&(tcp->incoming_socks));
 
+  res->name = strdup("TCP");
   res->socket_client = gras_trp_tcp_socket_client;
   res->socket_server = gras_trp_tcp_socket_server;
   res->socket_accept = gras_trp_tcp_socket_accept;
   res->socket_close  = gras_trp_tcp_socket_close;
 
   res->socket_client = gras_trp_tcp_socket_client;
   res->socket_server = gras_trp_tcp_socket_server;
   res->socket_accept = gras_trp_tcp_socket_accept;
   res->socket_close  = gras_trp_tcp_socket_close;
 
+  res->chunk_send    = gras_trp_tcp_chunk_send;
+  res->chunk_recv    = gras_trp_tcp_chunk_recv;
+
   res->specific      = (void*)tcp;
   res->free_specific = gras_trp_tcp_free_specific;
 
   res->specific      = (void*)tcp;
   res->free_specific = gras_trp_tcp_free_specific;
 
@@ -105,92 +109,94 @@ void gras_trp_tcp_free_specific(void *s) {
 gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
                                        const char *host,
                                        unsigned short port,
 gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
                                        const char *host,
                                        unsigned short port,
-                                       unsigned int bufSize, 
-                                       /* OUT */ gras_socket_t **dst){
-  
-  int addrCount;
-  IPAddress addresses[10];
-  int i;
-  int sd;
+                                       int raw,
+                                       /* OUT */ gras_socket_t *sock){
   
   
-  if (!(*sock=malloc(sizeof(gras_socket_t))))
-    RAISE_MALLOC;
+  struct sockaddr_in addr;
+  struct hostent *he;
+  struct in_addr *haddr;
+
+  gras_assert0(!raw,"Raw TCP sockets not implemented yet");
+   
+  sock->incoming = 1; /* TCP sockets are duplex'ed */
+
+  sock->sd = socket (AF_INET, SOCK_STREAM, 0);
   
   
-  (*sock)->peer_addr=NULL;
+  if (sock->sd < 0) {
+    RAISE1(system_error,
+          "Failed to create socket: %s",
+          strerror (errno));
+  }
   
   
-  if (!(addrCount = IPAddressValues(host, addresses, 10))) {
+  he = gethostbyname (host);
+  if (he == NULL) {
     RAISE2(system_error,
     RAISE2(system_error,
-         "tcp address retrieval of '%s' failed: %s",
-          host,strerror(errno));
+          "Failed to lookup hostname %s: %s",
+          host, strerror (errno));
   }
   
   }
   
-  for(i = 0; i < addrCount && i<10 ; i++) {
-    if(CallAddr(addresses[i], port, &sd, -1)) {
-      (*sock)->sock = sd;
-      (*sock)->port = port;
-      return no_error;
-    }
+  haddr = ((struct in_addr *) (he->h_addr_list)[0]);
+  
+  memset(&addr, 0, sizeof(struct sockaddr_in));
+  memcpy (&addr.sin_addr, haddr, sizeof(struct in_addr));
+  addr.sin_family = AF_INET;
+  addr.sin_port = htons (port);
+
+  if (connect (sock->sd, (struct sockaddr*) &addr, sizeof (addr)) < 0) {
+    close(sock->sd);
+    RAISE3(system_error,
+          "Failed to connect socket to %s:%d (%s)",
+          host, port, strerror (errno));
   }
   }
-  free(*sock);
-  RAISE2(system_error,"Something wicked happenned while connecting to %s:%d",
-          host,port);
+  
+  return no_error;
 }
 
 /**
  * gras_trp_tcp_socket_server:
  *
 }
 
 /**
  * gras_trp_tcp_socket_server:
  *
- * Open a socket used to receive messages. bufSize is in ko.
+ * Open a socket used to receive messages.
  */
 gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
                                        unsigned short port,
  */
 gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
                                        unsigned short port,
-                                       unsigned int bufSize, 
-                                       /* OUT */ gras_socket_t **dst){
-  int size = bufSize * 1024;
+                                       int raw,
+                                       /* OUT */ gras_socket_t *sock){
+//  int size = bufSize * 1024;
   int on = 1;
   int on = 1;
-  int sd = -1;
   struct sockaddr_in server;
 
   struct sockaddr_in server;
 
-  gras_socket_t *res;
-  gras_trp_tcp_specific_t *data=(gras_trp_tcp_specific_t*)self -> specific;
+  gras_assert0(!raw,"Raw TCP sockets not implemented yet");
+
+   gras_trp_tcp_specific_t *data=(gras_trp_tcp_specific_t*)self -> specific;
  
  
-  res=malloc(sizeof(gras_socket_t));
-  if (!res)
-    RAISE_MALLOC;
+  sock->outgoing  = 1; /* TCP => duplex mode */
 
   server.sin_port = htons((u_short)port);
   server.sin_addr.s_addr = INADDR_ANY;
   server.sin_family = AF_INET;
 
   server.sin_port = htons((u_short)port);
   server.sin_addr.s_addr = INADDR_ANY;
   server.sin_family = AF_INET;
-  if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-    free(res);
+  if((sock->sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
     RAISE1(system_error,"socket allocation failed: %s", strerror(errno));
   }
 
     RAISE1(system_error,"socket allocation failed: %s", strerror(errno));
   }
 
-  (void)setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));
+  (void)setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, 
+                  (char *)&on, sizeof(on));
+   /*
   (void)setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size));
   (void)setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size));
   (void)setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size));
   (void)setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size));
-  if (bind(sd, (struct sockaddr *)&server, sizeof(server)) == -1) {
-    free(res);
-    close(sd);
+    */
+  if (bind(sock->sd, (struct sockaddr *)&server, sizeof(server)) == -1) {
+    close(sock->sd);
     RAISE2(system_error,"Cannot bind to port %d: %s",port, strerror(errno));
   }
 
     RAISE2(system_error,"Cannot bind to port %d: %s",port, strerror(errno));
   }
 
-  if (listen(sd, 5) != -1) {
-    free(res);
-    close(sd);
+  if (listen(sock->sd, 5) < 0) {
+    close(sock->sd);
     RAISE2(system_error,"Cannot listen to port %d: %s",port,strerror(errno));
   }
 
     RAISE2(system_error,"Cannot listen to port %d: %s",port,strerror(errno));
   }
 
-  FD_SET(sd, &(data->incoming_socks));
+  FD_SET(sock->sd, &(data->incoming_socks));
 
 
-  *dst=res;
-  res->plugin = self;
-  res->incoming = 1;
-  res->sd = sd;
-  res->port=port;
-  res->peer_port=-1;
-  res->peer_name=NULL;
-
-  DEBUG2("Openned a server socket on port %d (sock %d)",port,sd);
+  DEBUG2("Openned a server socket on port %d (sock %d)",port,sock->sd);
   
   return no_error;
 }
   
   return no_error;
 }
@@ -214,7 +220,7 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
   tmp_errno = errno;
 
   if(sd == -1) {
   tmp_errno = errno;
 
   if(sd == -1) {
-    gras_socket_close(sock);
+    gras_socket_close(&sock);
     RAISE1(system_error,
           "Accept failed (%s). Droping server socket.", strerror(tmp_errno));
   } else {
     RAISE1(system_error,
           "Accept failed (%s). Droping server socket.", strerror(tmp_errno));
   } else {
@@ -223,21 +229,26 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
   
     if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) 
        || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) {
   
     if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) 
        || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) {
-      WARNING0("setsockopt failed, cannot condition the accepted socket");
+      WARN0("setsockopt failed, cannot condition the accepted socket");
     }
  
     }
  
+     /* FIXME: bufSize removed until we can have optionsets 
     i = ((gras_trp_tcp_sock_specific_t*)sock->specific)->buffsize;
     if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&i, s)
        || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&i, s)) {
       WARNING0("setsockopt failed, cannot set buffsize");      
     }
     i = ((gras_trp_tcp_sock_specific_t*)sock->specific)->buffsize;
     if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&i, s)
        || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&i, s)) {
       WARNING0("setsockopt failed, cannot set buffsize");      
     }
-    res->plugin = sock->plugin;
-    res->incoming = 1;
-    res->sd = sd;
-    res->port= -1;
-    res->peer_port= peer_in.sin_port;
-
+      */
+     
+    res->plugin    = sock->plugin;
+    res->incoming  = sock->incoming;
+    res->outgoing  = sock->outgoing;
+    res->accepting = 0;
+    res->sd        = sd;
+    res->port      = -1;
+    res->peer_port = peer_in.sin_port;
+
+    /* FIXME: Lock to protect inet_ntoa */
     if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) {
       res->peer_name = strdup("unknown");
     } else {
     if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) {
       res->peer_name = strdup("unknown");
     } else {
@@ -248,7 +259,7 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
       
       tmp = inet_ntoa(addrAsInAddr);
       if (tmp != NULL) {
       
       tmp = inet_ntoa(addrAsInAddr);
       if (tmp != NULL) {
-       res->peer_name = strdup(inet_ntoa(addrAsInAddr));
+       res->peer_name = strdup(tmp);
       } else {
        res->peer_name = strdup("unknown");
       }
       } else {
        res->peer_name = strdup("unknown");
       }
@@ -284,14 +295,14 @@ void gras_trp_tcp_socket_close(gras_socket_t *sock){
     }
   } */
 
     }
   } */
 
+  /* forget about the socket */
+  FD_CLR(sock->sd, &(tcp->incoming_socks));
+
   /* close the socket */
   if(close(sock->sd) < 0) {
   /* close the socket */
   if(close(sock->sd) < 0) {
-    WARNING3("error while closing tcp socket %d: %d (%s)\n", sock->sd, errno, strerror(errno));
+    WARN3("error while closing tcp socket %d: %d (%s)\n", 
+            sock->sd, errno, strerror(errno));
   }
   }
-
-  /* forget about it */
-  FD_CLR(sock->sd, &(tcp->incoming_socks));
-
 }
 
 /**
 }
 
 /**
@@ -304,8 +315,7 @@ gras_trp_tcp_chunk_send(gras_socket_t *sock,
                    char *data,
                    size_t size) {
   
                    char *data,
                    size_t size) {
   
-  /*  gras_assert0(sock && !sock->incoming,
-      "Asked to send stuff on an incomming socket");*/
+  /* TCP sockets are in duplex mode, don't check direction */
   gras_assert0(size >= 0, "Cannot send a negative amount of data");
 
   while (size) {
   gras_assert0(size >= 0, "Cannot send a negative amount of data");
 
   while (size) {
@@ -339,8 +349,9 @@ gras_error_t
 gras_trp_tcp_chunk_recv(gras_socket_t *sock,
                        char *data,
                        size_t size) {
 gras_trp_tcp_chunk_recv(gras_socket_t *sock,
                        char *data,
                        size_t size) {
-  gras_assert0(sock && !sock->incoming,
-              "Ascked to receive stuff on an outcomming socket");
+
+  /* TCP sockets are in duplex mode, don't check direction */
+  gras_assert0(sock, "Cannot recv on an NULL socket");
   gras_assert0(size >= 0, "Cannot receive a negative amount of data");
   
   while (size) {
   gras_assert0(size >= 0, "Cannot receive a negative amount of data");
   
   while (size) {