Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
J'en ai marre de faire des messages detailles. 'Current state' ;)
[simgrid.git] / src / gras / Transport / transport_tcp.c
index 967e907..18eb5fb 100644 (file)
 /* This program is free software; you can redistribute it and/or modify it
    under the terms of the license (GNU LGPL) which comes with this package. */
 
+#include <unistd.h>       /* close() pipe() read() write() */
+#include <signal.h>       /* close() pipe() read() write() */
+#include <netinet/in.h>   /* sometimes required for #include <arpa/inet.h> */
+#include <netinet/tcp.h>  /* TCP_NODELAY */
+#include <arpa/inet.h>    /* inet_ntoa() */
+#include <netdb.h>        /* getprotobyname() */
+#include <sys/time.h>     /* struct timeval */
+#include <errno.h>        /* errno */
+#include <sys/wait.h>     /* waitpid() */
+#include <sys/socket.h>   /* getpeername() socket() */
+#include <stdlib.h>
+
+
 #include "gras_private.h"
 #include "transport_private.h"
 
 GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport);
 
 typedef struct {
-  int dummy;
+  int buffsize;
+} gras_trp_tcp_sock_specific_t;
+
+/***
+ *** Prototypes 
+ ***/
+gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
+                                       const char *host,
+                                       unsigned short port,
+                                       unsigned int bufSize, 
+                                       /* OUT */ gras_socket_t **dst);
+gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
+                                       unsigned short port,
+                                       unsigned int bufSize, 
+                                       /* OUT */ gras_socket_t **dst);
+gras_error_t gras_trp_tcp_socket_accept(gras_socket_t  *sock,
+                                       gras_socket_t **dst);
+
+void         gras_trp_tcp_socket_close(gras_socket_t *sd);
+  
+gras_error_t gras_trp_tcp_bloc_send(gras_socket_t *sd,
+                                   char *data,
+                                   size_t size);
+
+gras_error_t gras_trp_tcp_bloc_recv(gras_socket_t *sd,
+                                   char *data,
+                                   size_t size);
+
+void         gras_trp_tcp_free_specific(void *s);
+
+
+static int TcpProtoNumber(void);
+/***
+ *** Specific plugin part
+ ***/
+
+typedef struct {
+  fd_set incoming_socks;
 } gras_trp_tcp_specific_t;
 
+/***
+ *** Specific socket part
+ ***/
+
+
+/***
+ *** Code
+ ***/
 gras_error_t
-gras_trp_tcp_init(void) {
+gras_trp_tcp_init(gras_trp_plugin_t **dst) {
 
-  gras_trp_tcp_specific_t *specific = malloc(sizeof(gras_trp_tcp_specific_t));
-  if (!specific)
+  gras_trp_plugin_t *res=malloc(sizeof(gras_trp_plugin_t));
+  gras_trp_tcp_specific_t *tcp = malloc(sizeof(gras_trp_tcp_specific_t));
+  if (!res || !tcp)
     RAISE_MALLOC;
 
+  FD_ZERO(&(tcp->incoming_socks));
+
+  res->socket_client = gras_trp_tcp_socket_client;
+  res->socket_server = gras_trp_tcp_socket_server;
+  res->socket_accept = gras_trp_tcp_socket_accept;
+  res->socket_close  = gras_trp_tcp_socket_close;
+
+  res->bloc_send     = gras_trp_tcp_bloc_send;
+  res->bloc_recv     = gras_trp_tcp_bloc_recv;
+
+  res->specific      = (void*)tcp;
+  res->free_specific = gras_trp_tcp_free_specific;
+
+  *dst = res;
   return no_error;
 }
 
-void
-gras_trp_tcp_exit(gras_trp_plugin_t *plugin) {
-  gras_trp_tcp_specific_t *specific = (gras_trp_tcp_specific_t*)plugin->specific;
+void gras_trp_tcp_free_specific(void *s) {
+  gras_trp_tcp_specific_t *specific = s;
   free(specific);
 }
 
-gras_error_t gras_trp_tcp_socket_client(const char *host,
+gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
+                                       const char *host,
                                        unsigned short port,
-                                       int raw, 
                                        unsigned int bufSize, 
-                                       /* OUT */ gras_trp_sock_t **dst){
+                                       /* OUT */ gras_socket_t **dst){
+  /*
+  int addrCount;
+  IPAddress addresses[10];
+  int i;
+  int sd;
+  
+  if (!(*sock=malloc(sizeof(gras_socket_t)))) {
+    fprintf(stderr,"Malloc error\n");
+    return malloc_error;
+  }
+  (*sock)->peer_addr=NULL;
+  
+  if (!(addrCount = IPAddressValues(host, addresses, 10))) {
+    fprintf(stderr,"grasOpenClientSocket: address retrieval of '%s' failed\n",host);
+    return system_error;
+  }
+  
+  for(i = 0; i < addrCount && i<10 ; i++) {
+    if(CallAddr(addresses[i], port, &sd, -1)) {
+      (*sock)->sock = sd;
+      (*sock)->port = port;
+      return no_error;
+    }
+  }
+  free(*sock);
+  fprintf(stderr,"grasOpenClientSocket: something wicked happenned while connecting to %s:%d",
+          host,port);
+  return system_error;
+  */
   RAISE_UNIMPLEMENTED;
 }
 
-gras_error_t gras_trp_tcp_socket_server(unsigned short port,
-                                       int raw, 
+/**
+ * gras_trp_tcp_socket_server:
+ *
+ * Open a socket used to receive messages. bufSize is in ko.
+ */
+gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
+                                       unsigned short port,
                                        unsigned int bufSize, 
-                                       /* OUT */ gras_trp_sock_t **dst){
-  RAISE_UNIMPLEMENTED;
-}
+                                       /* OUT */ gras_socket_t **dst){
+  int size = bufSize * 1024;
+  int on = 1;
+  int sd = -1;
+  struct sockaddr_in server;
+
+  gras_socket_t *res;
+  gras_trp_tcp_specific_t *data=(gras_trp_tcp_specific_t*)self -> specific;
+  res=malloc(sizeof(gras_socket_t));
+  if (!res)
+    RAISE_MALLOC;
+
+  server.sin_port = htons((u_short)port);
+  server.sin_addr.s_addr = INADDR_ANY;
+  server.sin_family = AF_INET;
+  if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+    free(res);
+    RAISE0(system_error,"socket allocation failed");
+  }
+
+  (void)setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));
+  (void)setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size));
+  (void)setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size));
+  if (bind(sd, (struct sockaddr *)&server, sizeof(server)) == -1) {
+    free(res);
+    close(sd);
+    RAISE1(system_error,"Cannot bind to port %d",port);
+  }
+
+  if (listen(sd, 5) != -1) {
+    free(res);
+    close(sd);
+    RAISE1(system_error,"Cannot listen to port %d",port);
+  }
+
+  FD_SET(sd, &(data->incoming_socks));
+
+  *dst=res;
+  res->plugin = self;
+  res->incoming = 1;
+  res->sd = sd;
+  res->port=port;
+  res->peer_port=-1;
+  res->peer_name=NULL;
 
-void gras_trp_tcp_socket_close(gras_trp_sock_t **sd){
-  ERROR1("%s not implemented",__FUNCTION__);
-  abort();
+  DEBUG2("Openned a server socket on port %d (sock %d)",port,sd);
+  
+  return no_error;
 }
 
-gras_error_t gras_trp_tcp_select(double timeOut,
-                                gras_trp_sock_t **sd){
-  RAISE_UNIMPLEMENTED;
+gras_error_t
+gras_trp_tcp_socket_accept(gras_socket_t  *sock,
+                          gras_socket_t **dst) {
+  gras_socket_t *res;
+  
+  struct sockaddr_in peer_in;
+  socklen_t peer_in_len = sizeof(peer_in);
+
+  int sd;
+  int tmp_errno;
+                               
+  res=malloc(sizeof(gras_socket_t));
+  if (!res)
+    RAISE_MALLOC;
+
+  sd = accept(sock->sd, (struct sockaddr *)&peer_in, &peer_in_len);
+  tmp_errno = errno;
+
+  if(sd == -1) {
+    gras_socket_close(sock);
+    RAISE1(system_error,
+          "Accept failed (%s). Droping server socket.", strerror(tmp_errno));
+  } else {
+    int i = 1;
+    socklen_t s = sizeof(int);
+  
+    if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) 
+       || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) {
+      WARNING0("setsockopt failed, cannot condition the accepted socket");
+    }
+    i = ((gras_trp_tcp_sock_specific_t*)sock->specific)->buffsize;
+    if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&i, s)
+       || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&i, s)) {
+      WARNING0("setsockopt failed, cannot set buffsize");      
+    }
+    res->plugin = sock->plugin;
+    res->incoming = 1;
+    res->sd = sd;
+    res->port= -1;
+    res->peer_port= peer_in.sin_port;
+
+    if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) {
+      res->peer_name = strdup("unknown");
+    } else {
+      struct in_addr addrAsInAddr;
+      char *tmp;
+      addrAsInAddr.s_addr = peer_in.sin_addr.s_addr;
+      
+      tmp = inet_ntoa(addrAsInAddr);
+      if (tmp != NULL) {
+       res->peer_name = strdup(inet_ntoa(addrAsInAddr));
+      } else {
+       res->peer_name = strdup("unknown");
+      }
+    }
+
+    VERB3("accepted socket %d to %s:%d\n", sd, res->peer_name,res->peer_port);
+    
+    *dst = res;
+
+    return no_error;
+  }
 }
+
+void gras_trp_tcp_socket_close(gras_socket_t *sock){
+  gras_trp_tcp_specific_t *tcp;
   
-gras_error_t gras_trp_tcp_bloc_send(gras_trp_sock_t *sd,
-                                   void *data,
-                                   size_t size,
-                                   double timeOut){
-  RAISE_UNIMPLEMENTED;
+  if (!sock) return; /* close only once */
+  tcp=sock->plugin->specific;
+
+  DEBUG1("close tcp connection %d\n", sock->sd);
+
+  /* FIXME: no pipe in GRAS so far  
+  if(!FD_ISSET(sd, &connectedPipes)) {
+    if(shutdown(sd, 2) < 0) {
+      GetNWSLock(&lock);
+      tmp_errno = errno;
+      ReleaseNWSLock(&lock);
+      
+      / * The other side may have beaten us to the reset. * /
+      if ((tmp_errno!=ENOTCONN) && (tmp_errno!=ECONNRESET)) {
+       WARN1("CloseSocket: shutdown error %d\n", tmp_errno);
+      }
+    }
+  } */
+
+  /* close the socket */
+  if(close(sock->sd) < 0) {
+    WARNING3("error while closing tcp socket %d: %d (%s)\n", sock->sd, errno, strerror(errno));
+  }
+
+  /* forget about it */
+  FD_CLR(sock->sd, &(tcp->incoming_socks));
+
 }
 
-gras_error_t gras_trp_tcp_bloc_recv(gras_trp_sock_t *sd,
-                                   void *data,
-                                   size_t size,
-                                   double timeOut){
-  RAISE_UNIMPLEMENTED;
+gras_error_t gras_trp_tcp_bloc_send(gras_socket_t *sock,
+                                   char *data,
+                                   size_t size){
+
+  gras_assert0(sock && !sock->incoming, "Ascked to send stuff on an incomming socket");
+  gras_assert0(size >= 0, "Cannot send a negative amount of data");
+
+  while (size) {
+    int status = 0;
+    
+    status = write(sock->sd, data, (size_t)size);
+    DEBUG3("write(%d, %p, %ld);\n", sock->sd, data, size);
+    
+    if (status == -1) {
+      RAISE4(system_error,"write(%d,%p,%d) failed: %s",
+            sock->sd, data, (int)size,
+            strerror(errno));
+    }
+    
+    if (status) {
+      size  -= status;
+      data  += status;
+    } else {
+      RAISE0(system_error,"file descriptor closed");
+    }
+  }
+
+  return no_error;
 }
 
-gras_error_t gras_trp_tcp_flush(gras_trp_sock_t *sd){
-  RAISE_UNIMPLEMENTED;
+gras_error_t gras_trp_tcp_bloc_recv(gras_socket_t *sock,
+                                   char *data,
+                                   size_t size){
+
+  gras_assert0(sock && !sock->incoming, "Ascked to receive stuff on an outcomming socket");
+  gras_assert0(size >= 0, "Cannot receive a negative amount of data");
+  
+  while (size) {
+    int status = 0;
+    
+    status = read(sock->sd, data, (size_t)size);
+    DEBUG3("read(%d, %p, %ld);\n", sock->sd, data, size);
+    
+    if (status == -1) {
+      RAISE4(system_error,"read(%d,%p,%d) failed: %s",
+            sock->sd, data, (int)size,
+            strerror(errno));
+    }
+    
+    if (status) {
+      size  -= status;
+      data  += status;
+    } else {
+      RAISE0(system_error,"file descriptor closed");
+    }
+  }
+  
+  return no_error;
 }
 
+
+/*
+ * Returns the tcp protocol number from the network protocol data base.
+ *
+ * getprotobyname() is not thread safe. We need to lock it.
+ */
+static int TcpProtoNumber(void) {
+  struct protoent *fetchedEntry;
+  static int returnValue = 0;
+  
+  if(returnValue == 0) {
+    fetchedEntry = getprotobyname("tcp");
+    gras_assert0(fetchedEntry, "getprotobyname(tcp) gave NULL");
+    returnValue = fetchedEntry->p_proto;
+  }
+  
+  return returnValue;
+}