Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
The change Arnaud wanted so much: Big Star Eradication (plus some more, check the...
[simgrid.git] / src / gras / Transport / transport_plugin_tcp.c
index 46638a3..d502ce1 100644 (file)
 #include <stdlib.h>
 #include <string.h>       /* memset */
 
-#include "gras_private.h"
 #include "transport_private.h"
 
-GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport);
+GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport,"TCP transport");
 
 /***
  *** Prototypes 
  ***/
 gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
-                                       const char *host,
-                                       unsigned short port,
-                                       /* OUT */ gras_socket_t *sock);
+                                       gras_socket_t sock);
 gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
-                                       unsigned short port,
-                                       /* OUT */ gras_socket_t *sock);
-gras_error_t gras_trp_tcp_socket_accept(gras_socket_t  *sock,
-                                       gras_socket_t **dst);
+                                       gras_socket_t sock);
+gras_error_t gras_trp_tcp_socket_accept(gras_socket_t  sock,
+                                       gras_socket_t *dst);
 
-void         gras_trp_tcp_socket_close(gras_socket_t *sd);
+void         gras_trp_tcp_socket_close(gras_socket_t sd);
   
-gras_error_t gras_trp_tcp_chunk_send(gras_socket_t *sd,
-                                    char *data,
+gras_error_t gras_trp_tcp_chunk_send(gras_socket_t sd,
+                                    const char *data,
                                     long int size);
 
-gras_error_t gras_trp_tcp_chunk_recv(gras_socket_t *sd,
+gras_error_t gras_trp_tcp_chunk_recv(gras_socket_t sd,
                                     char *data,
                                     long int size);
 
@@ -74,12 +70,9 @@ typedef struct {
 /***
  *** Code
  ***/
-gras_error_t
-gras_trp_tcp_setup(gras_trp_plugin_t *plug) {
+gras_error_t gras_trp_tcp_setup(gras_trp_plugin_t *plug) {
 
-  gras_trp_tcp_plug_data_t *data = malloc(sizeof(gras_trp_tcp_plug_data_t));
-  if (!data)
-    RAISE_MALLOC;
+  gras_trp_tcp_plug_data_t *data = gras_new(gras_trp_tcp_plug_data_t,1);
 
   FD_ZERO(&(data->msg_socks));
   FD_ZERO(&(data->raw_socks));
@@ -92,6 +85,8 @@ gras_trp_tcp_setup(gras_trp_plugin_t *plug) {
   plug->chunk_send    = gras_trp_tcp_chunk_send;
   plug->chunk_recv    = gras_trp_tcp_chunk_recv;
 
+  plug->flush = NULL; /* nothing's cached */
+
   plug->data = (void*)data;
   plug->exit = gras_trp_tcp_exit;
 
@@ -100,17 +95,16 @@ gras_trp_tcp_setup(gras_trp_plugin_t *plug) {
 
 void gras_trp_tcp_exit(gras_trp_plugin_t *plug) {
   DEBUG1("Exit plugin TCP (free %p)", plug->data);
-  free(plug->data);
+  gras_free(plug->data);
 }
 
 gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
-                                       const char *host,
-                                       unsigned short port,
-                                       /* OUT */ gras_socket_t *sock){
+                                       gras_socket_t sock){
   
   struct sockaddr_in addr;
   struct hostent *he;
   struct in_addr *haddr;
+  int size = sock->bufSize * 1024; 
 
   sock->incoming = 1; /* TCP sockets are duplex'ed */
 
@@ -121,12 +115,18 @@ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
           "Failed to create socket: %s",
           strerror (errno));
   }
+
+  if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) ||
+      setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
+     WARN1("setsockopt failed, cannot set buffer size: %s",
+          strerror(errno));
+  }
   
-  he = gethostbyname (host);
+  he = gethostbyname (sock->peer_name);
   if (he == NULL) {
     RAISE2(system_error,
           "Failed to lookup hostname %s: %s",
-          host, strerror (errno));
+          sock->peer_name, strerror (errno));
   }
   
   haddr = ((struct in_addr *) (he->h_addr_list)[0]);
@@ -134,13 +134,13 @@ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
   memset(&addr, 0, sizeof(struct sockaddr_in));
   memcpy (&addr.sin_addr, haddr, sizeof(struct in_addr));
   addr.sin_family = AF_INET;
-  addr.sin_port = htons (port);
+  addr.sin_port = htons (sock->peer_port);
 
   if (connect (sock->sd, (struct sockaddr*) &addr, sizeof (addr)) < 0) {
     close(sock->sd);
     RAISE3(system_error,
           "Failed to connect socket to %s:%d (%s)",
-          host, port, strerror (errno));
+          sock->peer_name, sock->peer_port, strerror (errno));
   }
   
   return no_error;
@@ -152,9 +152,8 @@ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
  * Open a socket used to receive messages.
  */
 gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
-                                       unsigned short port,
-                                       /* OUT */ gras_socket_t *sock){
-//  int size = bufSize * 1024;
+                                       /* OUT */ gras_socket_t sock){
+  int size = sock->bufSize * 1024; 
   int on = 1;
   struct sockaddr_in server;
 
@@ -162,27 +161,32 @@ gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
  
   sock->outgoing  = 1; /* TCP => duplex mode */
 
-  server.sin_port = htons((u_short)port);
+  server.sin_port = htons((u_short)sock->port);
   server.sin_addr.s_addr = INADDR_ANY;
   server.sin_family = AF_INET;
   if((sock->sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-    RAISE1(system_error,"socket allocation failed: %s", strerror(errno));
+    RAISE1(system_error,"Socket allocation failed: %s", strerror(errno));
   }
 
-  (void)setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, 
-                  (char *)&on, sizeof(on));
-   /*
-  (void)setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size));
-  (void)setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size));
-    */
+  if (setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on))) {
+     RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s",
+           strerror(errno));
+  }
+   
+  if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) ||
+      setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
+     WARN1("setsockopt failed, cannot set buffer size: %s",
+          strerror(errno));
+  }
+       
   if (bind(sock->sd, (struct sockaddr *)&server, sizeof(server)) == -1) {
     close(sock->sd);
-    RAISE2(system_error,"Cannot bind to port %d: %s",port, strerror(errno));
+    RAISE2(system_error,"Cannot bind to port %d: %s",sock->port, strerror(errno));
   }
 
   if (listen(sock->sd, 5) < 0) {
     close(sock->sd);
-    RAISE2(system_error,"Cannot listen to port %d: %s",port,strerror(errno));
+    RAISE2(system_error,"Cannot listen to port %d: %s",sock->port,strerror(errno));
   }
 
   if (sock->raw)
@@ -190,15 +194,15 @@ gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
   else
     FD_SET(sock->sd, &(tcp->msg_socks));
 
-  DEBUG2("Openned a server socket on port %d (sock %d)",port,sock->sd);
+  DEBUG2("Openned a server socket on port %d (sock %d)",sock->port,sock->sd);
   
   return no_error;
 }
 
 gras_error_t
-gras_trp_tcp_socket_accept(gras_socket_t  *sock,
-                          gras_socket_t **dst) {
-  gras_socket_t *res;
+gras_trp_tcp_socket_accept(gras_socket_t  sock,
+                          gras_socket_t *dst) {
+  gras_socket_t res;
   gras_error_t errcode;
   
   struct sockaddr_in peer_in;
@@ -206,8 +210,9 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
 
   int sd;
   int tmp_errno;
+  int size;
                        
-  TRY(gras_trp_socket_new(1,&res));
+  gras_trp_socket_new(1,&res);
 
   sd = accept(sock->sd, (struct sockaddr *)&peer_in, &peer_in_len);
   tmp_errno = errno;
@@ -222,16 +227,17 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
   
     if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) 
        || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) {
-      WARN0("setsockopt failed, cannot condition the accepted socket");
+       RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s",
+             strerror(errno));
     }
  
-    /* FIXME: bufSize removed until we can have optionsets 
-       i = ((gras_trp_tcp_sock_specific_t*)sock->specific)->buffsize;
-       if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&i, s)
-       || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&i, s)) {
-       WARNING0("setsockopt failed, cannot set buffsize");     
-       }
-    */
+    (*dst)->bufSize = sock->bufSize;
+    size = sock->bufSize * 1024;
+    if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size))
+       || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
+       WARN1("setsockopt failed, cannot set buffer size: %s",
+            strerror(errno));
+    }
      
     res->plugin    = sock->plugin;
     res->incoming  = sock->incoming;
@@ -243,7 +249,7 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
 
     /* FIXME: Lock to protect inet_ntoa */
     if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) {
-      res->peer_name = strdup("unknown");
+      res->peer_name = (char*)strdup("unknown");
     } else {
       struct in_addr addrAsInAddr;
       char *tmp;
@@ -252,9 +258,9 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
       
       tmp = inet_ntoa(addrAsInAddr);
       if (tmp != NULL) {
-       res->peer_name = strdup(tmp);
+       res->peer_name = (char*)strdup(tmp);
       } else {
-       res->peer_name = strdup("unknown");
+       res->peer_name = (char*)strdup("unknown");
       }
     }
 
@@ -266,7 +272,7 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
   }
 }
 
-void gras_trp_tcp_socket_close(gras_socket_t *sock){
+void gras_trp_tcp_socket_close(gras_socket_t sock){
   gras_trp_tcp_plug_data_t *tcp;
   
   if (!sock) return; /* close only once */
@@ -307,8 +313,8 @@ void gras_trp_tcp_socket_close(gras_socket_t *sock){
  * Send data on a TCP socket
  */
 gras_error_t 
-gras_trp_tcp_chunk_send(gras_socket_t *sock,
-                       char *data,
+gras_trp_tcp_chunk_send(gras_socket_t sock,
+                       const char *data,
                        long int size) {
   
   /* TCP sockets are in duplex mode, don't check direction */
@@ -342,7 +348,7 @@ gras_trp_tcp_chunk_send(gras_socket_t *sock,
  * Receive data on a TCP socket.
  */
 gras_error_t 
-gras_trp_tcp_chunk_recv(gras_socket_t *sock,
+gras_trp_tcp_chunk_recv(gras_socket_t sock,
                        char *data,
                        long int size) {
 
@@ -391,3 +397,51 @@ static int TcpProtoNumber(void) {
   
   return returnValue;
 }
+
+/* Data exchange over raw sockets. Placing this in there is a kind of crude hack.
+   It means that the only possible raw are TCP where we may want to do UDP for them. 
+   But I fail to find a good internal organization for now. We may want to split 
+   raw and regular sockets more efficiently.
+*/
+gras_error_t gras_socket_raw_exchange(gras_socket_t peer,
+                                     int sender,
+                                     unsigned int timeout,
+                                     unsigned long int exp_size,
+                                     unsigned long int msg_size) {
+   char *chunk;
+   int res_last, msg_sofar, exp_sofar;
+   
+   fd_set rd_set;
+   int rv;
+   
+   struct timeval timeOut;
+   
+   chunk = gras_malloc(msg_size);
+
+   for   (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
+      for(msg_sofar=0; msg_sofar < msg_size; msg_sofar += res_last) {
+        
+        if(sender) {
+           res_last = send(peer->sd, chunk, msg_size - msg_sofar, 0);
+        } else {
+           res_last = 0;
+           FD_ZERO(&rd_set);
+           FD_SET(peer->sd,&rd_set);
+           timeOut.tv_sec = timeout;
+           timeOut.tv_usec = 0;
+               
+           if (0 < select(peer->sd+1,&rd_set,NULL,NULL,&timeOut))
+             res_last = recv(peer->sd, chunk, msg_size-msg_sofar, 0);
+           
+        }
+        if (res_last == 0) {
+          /* No progress done, bail out */
+          gras_free(chunk);
+          RAISE0(unknown_error,"Not exchanged a single byte, bailing out");
+        }
+      }
+   }
+   
+   gras_free(chunk);
+   return no_error;
+}