Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Reintroduce raw sockets.
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Wed, 29 Sep 2004 09:28:42 +0000 (09:28 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Wed, 29 Sep 2004 09:28:42 +0000 (09:28 +0000)
 Created by gras_socket_{client,server}_ext;used with gras_raw_{send,recv}
 It should allow to kill the last bits of gras first version.

 This is not completely satisfactory yet (dupplicate code with
 chunk_{send,recv}; a bit out of the plugin mecanism), but it should work.

Simplify transport plugin interface by not passing any argument to _server
 and _client, but embeeding them in the socket struct directly.

git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@437 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/gras/Transport/sg_transport.c
src/gras/Transport/transport.c
src/gras/Transport/transport_interface.h
src/gras/Transport/transport_plugin_buf.c
src/gras/Transport/transport_plugin_sg.c
src/gras/Transport/transport_plugin_tcp.c
src/gras/Transport/transport_private.h

index 93dc5c7..b79eafc 100644 (file)
@@ -137,7 +137,7 @@ gras_trp_select(double timeout,
             MSG_host_get_name(MSG_host_self()));
       */
       /* MSG_process_sleep(1); */
-      MSG_process_sleep(0.01);
+      MSG_process_sleep(0.001);
     }
   } while (gras_os_time()-startTime < timeout
           || MSG_task_Iprobe((m_channel_t) pd->chan));
@@ -155,3 +155,6 @@ gras_error_t gras_trp_tcp_setup(gras_trp_plugin_t *plug) {
 gras_error_t gras_trp_file_setup(gras_trp_plugin_t *plug) {
   return mismatch_error;
 }
+
+
+   
index 252efee..d5d62b4 100644 (file)
@@ -130,14 +130,18 @@ gras_error_t gras_trp_socket_new(int incoming,
 
 
 /**
- * gras_socket_server:
+ * gras_socket_server_ext:
  *
  * Opens a server socket and make it ready to be listened to.
  * In real life, you'll get a TCP socket.
  */
 gras_error_t
-gras_socket_server(unsigned short port,
-                  /* OUT */ gras_socket_t **dst) {
+gras_socket_server_ext(unsigned short port,
+                      
+                      unsigned long int bufSize,
+                      int raw,
+                      
+                      /* OUT */ gras_socket_t **dst) {
  
   gras_error_t errcode;
   gras_trp_plugin_t *trp;
@@ -152,9 +156,11 @@ gras_socket_server(unsigned short port,
   TRY(gras_trp_socket_new(1,&sock));
   sock->plugin= trp;
   sock->port=port;
+  sock->bufSize = bufSize;
+  sock->raw = raw;
 
   /* Call plugin socket creation function */
-  errcode = trp->socket_server(trp, port, sock);
+  errcode = trp->socket_server(trp, sock);
   DEBUG3("in=%c out=%c accept=%c",
         sock->incoming?'y':'n', 
         sock->outgoing?'y':'n',
@@ -169,17 +175,21 @@ gras_socket_server(unsigned short port,
 
   return no_error;
 }
-
+   
 /**
- * gras_socket_client:
+ * gras_socket_client_ext:
  *
  * Opens a client socket to a remote host.
  * In real life, you'll get a TCP socket.
  */
 gras_error_t
-gras_socket_client(const char *host,
-                  unsigned short port,
-                  /* OUT */ gras_socket_t **dst) {
+gras_socket_client_ext(const char *host,
+                      unsigned short port,
+                      
+                      unsigned long int bufSize,
+                      int raw,
+                      
+                      /* OUT */ gras_socket_t **dst) {
  
   gras_error_t errcode;
   gras_trp_plugin_t *trp;
@@ -195,11 +205,11 @@ gras_socket_client(const char *host,
   sock->plugin= trp;
   sock->peer_port = port;
   sock->peer_name = (char*)strdup(host?host:"localhost");
+  sock->bufSize = bufSize;
+  sock->raw = raw;
 
   /* plugin-specific */
-  errcode= (*trp->socket_client)(trp, 
-                                host ? host : "localhost", port,
-                                sock);
+  errcode= (*trp->socket_client)(trp, sock);
   DEBUG3("in=%c out=%c accept=%c",
         sock->incoming?'y':'n', 
         sock->outgoing?'y':'n',
@@ -215,6 +225,32 @@ gras_socket_client(const char *host,
   return no_error;
 }
 
+/**
+ * gras_socket_server:
+ *
+ * Opens a server socket and make it ready to be listened to.
+ * In real life, you'll get a TCP socket.
+ */
+gras_error_t
+gras_socket_server(unsigned short port,
+                  /* OUT */ gras_socket_t **dst) {
+   return gras_socket_server_ext(port,32,0,dst);
+}
+
+/**
+ * gras_socket_client:
+ *
+ * Opens a client socket to a remote host.
+ * In real life, you'll get a TCP socket.
+ */
+gras_error_t
+gras_socket_client(const char *host,
+                  unsigned short port,
+                  /* OUT */ gras_socket_t **dst) {
+   return gras_socket_client_ext(host,port,32,0,dst);
+}
+
+
 void gras_socket_close(gras_socket_t *sock) {
   gras_dynar_t *sockets = gras_socketset_get();
   gras_socket_t *sock_iter;
@@ -299,3 +335,21 @@ int   gras_socket_peer_port(gras_socket_t *sock) {
 char *gras_socket_peer_name(gras_socket_t *sock) {
   return sock->peer_name;
 }
+
+gras_error_t gras_socket_raw_send(gras_socket_t *peer, 
+                                 unsigned int timeout,
+                                 unsigned long int expSize, 
+                                 unsigned long int msgSize) {
+  
+  gras_assert0(peer->raw,"Asked to send raw data on a regular socket\n");
+  return gras_socket_raw_exchange(peer,1,timeout,expSize,msgSize);   
+}
+
+gras_error_t gras_socket_raw_recv(gras_socket_t *peer, 
+                                 unsigned int timeout,
+                                 unsigned long int expSize, 
+                                 unsigned long int msgSize){
+  
+  gras_assert0(peer->raw,"Asked to recveive raw data on a regular socket\n");
+  return gras_socket_raw_exchange(peer,0,timeout,expSize,msgSize);   
+}
index 78421a5..63e3429 100644 (file)
@@ -41,14 +41,12 @@ typedef struct gras_trp_plugin_ gras_trp_plugin_t;
 struct gras_trp_plugin_ {
   char          *name;
  
-  /* dst pointers are created and initialized with default values 
-     before call to socket_client/server*/
+  /* dst pointers are created and initialized with default values
+     before call to socket_client/server. 
+     Retrive the info you need from there. */
   gras_error_t (*socket_client)(gras_trp_plugin_t *self,
-                               const char *host,
-                               unsigned short port,
                                /* OUT */ gras_socket_t *dst);
   gras_error_t (*socket_server)(gras_trp_plugin_t *self,
-                               unsigned short port,
                                /* OUT */ gras_socket_t *dst);
    
   gras_error_t (*socket_accept)(gras_socket_t  *sock,
index 8ed2e02..46638be 100644 (file)
@@ -22,11 +22,8 @@ GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_buf,transport,
  *** Prototypes 
  ***/
 gras_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self,
-                                       const char *host,
-                                       unsigned short port,
                                        /* OUT */ gras_socket_t *sock);
 gras_error_t gras_trp_buf_socket_server(gras_trp_plugin_t *self,
-                                       unsigned short port,
                                        /* OUT */ gras_socket_t *sock);
 gras_error_t gras_trp_buf_socket_accept(gras_socket_t  *sock,
                                        gras_socket_t **dst);
@@ -120,14 +117,12 @@ gras_trp_buf_setup(gras_trp_plugin_t *plug) {
 }
 
 gras_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self,
-                                       const char *host,
-                                       unsigned short port,
                                        /* OUT */ gras_socket_t *sock){
   gras_error_t errcode;
   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)self->data)->super;
 
   GRAS_IN;
-  TRY(super->socket_client(super,host,port,sock));
+  TRY(super->socket_client(super,sock));
   sock->plugin = self;
   TRY(gras_trp_buf_init_sock(sock));
     
@@ -140,13 +135,12 @@ gras_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self,
  * Open a socket used to receive messages.
  */
 gras_error_t gras_trp_buf_socket_server(gras_trp_plugin_t *self,
-                                       unsigned short port,
                                        /* OUT */ gras_socket_t *sock){
   gras_error_t errcode;
   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)self->data)->super;
 
   GRAS_IN;
-  TRY(super->socket_server(super,port,sock));
+  TRY(super->socket_server(super,sock));
   sock->plugin = self;
   TRY(gras_trp_buf_init_sock(sock));
   return no_error;
index c0df9da..ae0ca84 100644 (file)
@@ -30,11 +30,8 @@ static gras_error_t find_port(gras_hostdata_t *hd, int port,
 
 
 gras_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self,
-                                      const char *host,
-                                      unsigned short port,
                                       /* OUT */ gras_socket_t *sock);
 gras_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self,
-                                      unsigned short port,
                                       /* OUT */ gras_socket_t *sock);
 void         gras_trp_sg_socket_close(gras_socket_t *sd);
 
@@ -97,8 +94,6 @@ gras_trp_sg_setup(gras_trp_plugin_t *plug) {
 }
 
 gras_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self,
-                                      const char *host,
-                                      unsigned short port,
                                       /* OUT */ gras_socket_t *sock){
 
   gras_error_t errcode;
@@ -109,34 +104,34 @@ gras_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self,
   gras_sg_portrec_t pr;
 
   /* make sure this socket will reach someone */
-  if (!(peer=MSG_get_host_by_name(host))) {
-      fprintf(stderr,"GRAS: can't connect to %s: no such host.\n",host);
+  if (!(peer=MSG_get_host_by_name(sock->peer_name))) {
+      fprintf(stderr,"GRAS: can't connect to %s: no such host.\n",sock->peer_name);
       return mismatch_error;
   }
   if (!(hd=(gras_hostdata_t *)MSG_host_get_data(peer))) {
     RAISE1(mismatch_error,
           "can't connect to %s: no process on this host",
-          host);
+          sock->peer_name);
   }
-  errcode = find_port(hd,port,&pr);
+  errcode = find_port(hd,sock->peer_port,&pr);
   if (errcode != no_error && errcode != mismatch_error) 
     return errcode;
 
   if (errcode == mismatch_error) {
     RAISE2(mismatch_error,
           "can't connect to %s:%d, no process listen on this port",
-          host,port);
+          sock->peer_name,sock->peer_port);
   } 
 
   if (pr.raw && !sock->raw) {
     RAISE2(mismatch_error,
           "can't connect to %s:%d in regular mode, the process listen "
-          "in raw mode on this port",host,port);
+          "in raw mode on this port",sock->peer_name,sock->peer_port);
   }
   if (!pr.raw && sock->raw) {
     RAISE2(mismatch_error,
           "can't connect to %s:%d in raw mode, the process listen "
-          "in regular mode on this port",host,port);
+          "in regular mode on this port",sock->peer_name,sock->peer_port);
   }
 
   /* create the socket */
@@ -153,13 +148,13 @@ gras_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self,
 
   DEBUG6("%s (PID %d) connects in %s mode to %s:%d (to_PID=%d)",
          MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
-         sock->raw?"RAW":"regular",host,port,data->to_PID);
+         sock->raw?"RAW":"regular",
+        sock->peer_name,sock->peer_port,data->to_PID);
 
   return no_error;
 }
 
 gras_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self,
-                                      unsigned short port,
                                       gras_socket_t *sock){
 
   gras_error_t errcode;
@@ -175,16 +170,16 @@ gras_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self,
 
   sock->accepting = 0; /* no such nuisance in SG */
 
-  errcode = find_port(hd,port,&pr);
+  errcode = find_port(hd,sock->port,&pr);
   switch (errcode) {
   case no_error: /* Port already used... */
     RAISE2(mismatch_error,
           "can't listen on address %s:%d: port already in use\n.",
-          host,port);
+          host,sock->port);
 
   case mismatch_error: /* Port not used so far. Do it */
     pr.tochan = sock->raw ? pd->rawChan : pd->chan;
-    pr.port   = port;
+    pr.port   = sock->port;
     pr.raw    = sock->raw;
     TRY(gras_dynar_push(hd->ports,&pr));
     
@@ -205,7 +200,7 @@ gras_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self,
 
   INFO6("'%s' (%d) ears on %s:%d%s (%p)",
     MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(),
-    host,port,sock->raw? " (mode RAW)":"",sock);
+    host,sock->port,sock->raw? " (mode RAW)":"",sock);
 
   return no_error;
 }
@@ -246,11 +241,9 @@ gras_error_t gras_trp_sg_chunk_send(gras_socket_t *sock,
   m_task_t task=NULL;
   static unsigned int count=0;
   char name[256];
-  gras_trp_sg_sock_data_t *sock_data;
+  gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)sock->data;
   sg_task_data_t *task_data;
   
-  sock_data = (gras_trp_sg_sock_data_t *)sock->data;
-
   sprintf(name,"Chunk[%d]",count++);
 
   if (!(task_data=gras_new(sg_task_data_t,1)))
@@ -305,3 +298,75 @@ gras_error_t gras_trp_sg_chunk_recv(gras_socket_t *sock,
   GRAS_OUT;
   return no_error;
 }
+
+/* Data exchange over raw sockets */
+gras_error_t gras_socket_raw_exchange(gras_socket_t *peer,
+                                     int sender,
+                                     unsigned int timeout,
+                                     unsigned long int expSize,
+                                     unsigned long int msgSize) {
+  unsigned int bytesTotal;
+  static unsigned int count=0;
+  m_task_t task=NULL;
+  char name[256];
+  gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)peer->data;
+  
+  gras_procdata_t *pd=gras_procdata_get();
+  double startTime;
+  
+  startTime=gras_os_time(); /* used only in sender mode */
+
+  for(bytesTotal = 0;  bytesTotal < expSize;  bytesTotal += msgSize) {
+
+    if (sender) {
+    
+      sprintf(name,"Raw data[%d]",count++);
+      
+      task=MSG_task_create(name,0,((double)msgSize)/(1024.0*1024.0),NULL);
+
+      DEBUG5("%f:%s: gras_socket_raw_send(%f %s -> %s) BEGIN",
+            gras_os_time(), MSG_process_get_name(MSG_process_self()),
+            ((double)msgSize)/(1024.0*1024.0),
+            MSG_host_get_name( MSG_host_self()), peer->peer_name);
+
+      if (MSG_task_put(task, sock_data->to_host,sock_data->to_chan) != MSG_OK)
+       RAISE0(system_error,"Problem during the MSG_task_put()");
+              
+      DEBUG5("%f:%s: gras_socket_raw_send(%f %s -> %s) END",
+            gras_os_time(), MSG_process_get_name(MSG_process_self()),
+            ((double)msgSize)/(1024.0*1024.0),
+            MSG_host_get_name( MSG_host_self()), peer->peer_name);
+                  
+    } else { /* we are receiver, simulate a select */
+      
+      task=NULL;
+      DEBUG2("%f:%s: gras_socket_raw_recv() BEGIN\n",
+            gras_os_time(), MSG_process_get_name(MSG_process_self()));
+      do {
+       if (MSG_task_Iprobe((m_channel_t) pd->rawChan)) {
+         if (MSG_task_get(&task, (m_channel_t) pd->rawChan) != MSG_OK) {
+           fprintf(stderr,"GRAS: Error in MSG_task_get()\n");
+           return unknown_error;
+         }
+         
+         if (MSG_task_destroy(task) != MSG_OK) {
+           fprintf(stderr,"GRAS: Error in MSG_task_destroy()\n");
+           return unknown_error;
+         }
+         
+         DEBUG2("%f:%s: gras_socket_raw_recv() END\n",
+                gras_os_time(), MSG_process_get_name(MSG_process_self()));
+         break;
+       } else {
+         MSG_process_sleep(0.0001);
+       }
+       
+      } while (gras_os_time() - startTime < timeout);
+      
+      if (gras_os_time() - startTime > timeout)
+       return timeout_error;
+    } /* receiver part */
+  } /* foreach msg */
+
+  return no_error;
+}
index 988b6e7..189c2cf 100644 (file)
@@ -30,11 +30,8 @@ GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport,"TCP transport");
  *** Prototypes 
  ***/
 gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
-                                       const char *host,
-                                       unsigned short port,
                                        /* OUT */ gras_socket_t *sock);
 gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
-                                       unsigned short port,
                                        /* OUT */ gras_socket_t *sock);
 gras_error_t gras_trp_tcp_socket_accept(gras_socket_t  *sock,
                                        gras_socket_t **dst);
@@ -106,13 +103,12 @@ void gras_trp_tcp_exit(gras_trp_plugin_t *plug) {
 }
 
 gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
-                                       const char *host,
-                                       unsigned short port,
                                        /* OUT */ gras_socket_t *sock){
   
   struct sockaddr_in addr;
   struct hostent *he;
   struct in_addr *haddr;
+  int size = sock->bufSize * 1024; 
 
   sock->incoming = 1; /* TCP sockets are duplex'ed */
 
@@ -123,12 +119,18 @@ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
           "Failed to create socket: %s",
           strerror (errno));
   }
+
+  if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) ||
+      setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
+     WARN1("setsockopt failed, cannot set buffer size: %s",
+          strerror(errno));
+  }
   
-  he = gethostbyname (host);
+  he = gethostbyname (sock->peer_name);
   if (he == NULL) {
     RAISE2(system_error,
           "Failed to lookup hostname %s: %s",
-          host, strerror (errno));
+          sock->peer_name, strerror (errno));
   }
   
   haddr = ((struct in_addr *) (he->h_addr_list)[0]);
@@ -136,13 +138,13 @@ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
   memset(&addr, 0, sizeof(struct sockaddr_in));
   memcpy (&addr.sin_addr, haddr, sizeof(struct in_addr));
   addr.sin_family = AF_INET;
-  addr.sin_port = htons (port);
+  addr.sin_port = htons (sock->peer_port);
 
   if (connect (sock->sd, (struct sockaddr*) &addr, sizeof (addr)) < 0) {
     close(sock->sd);
     RAISE3(system_error,
           "Failed to connect socket to %s:%d (%s)",
-          host, port, strerror (errno));
+          sock->peer_name, sock->peer_port, strerror (errno));
   }
   
   return no_error;
@@ -154,9 +156,8 @@ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
  * Open a socket used to receive messages.
  */
 gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
-                                       unsigned short port,
                                        /* OUT */ gras_socket_t *sock){
-/*  int size = bufSize * 1024; */
+  int size = sock->bufSize * 1024; 
   int on = 1;
   struct sockaddr_in server;
 
@@ -164,27 +165,32 @@ gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
  
   sock->outgoing  = 1; /* TCP => duplex mode */
 
-  server.sin_port = htons((u_short)port);
+  server.sin_port = htons((u_short)sock->port);
   server.sin_addr.s_addr = INADDR_ANY;
   server.sin_family = AF_INET;
   if((sock->sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-    RAISE1(system_error,"socket allocation failed: %s", strerror(errno));
+    RAISE1(system_error,"Socket allocation failed: %s", strerror(errno));
   }
 
-  (void)setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, 
-                  (char *)&on, sizeof(on));
-   /*
-  (void)setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size));
-  (void)setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size));
-    */
+  if (setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on))) {
+     RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s",
+           strerror(errno));
+  }
+   
+  if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) ||
+      setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
+     WARN1("setsockopt failed, cannot set buffer size: %s",
+          strerror(errno));
+  }
+       
   if (bind(sock->sd, (struct sockaddr *)&server, sizeof(server)) == -1) {
     close(sock->sd);
-    RAISE2(system_error,"Cannot bind to port %d: %s",port, strerror(errno));
+    RAISE2(system_error,"Cannot bind to port %d: %s",sock->port, strerror(errno));
   }
 
   if (listen(sock->sd, 5) < 0) {
     close(sock->sd);
-    RAISE2(system_error,"Cannot listen to port %d: %s",port,strerror(errno));
+    RAISE2(system_error,"Cannot listen to port %d: %s",sock->port,strerror(errno));
   }
 
   if (sock->raw)
@@ -192,7 +198,7 @@ gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
   else
     FD_SET(sock->sd, &(tcp->msg_socks));
 
-  DEBUG2("Openned a server socket on port %d (sock %d)",port,sock->sd);
+  DEBUG2("Openned a server socket on port %d (sock %d)",sock->port,sock->sd);
   
   return no_error;
 }
@@ -208,6 +214,7 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
 
   int sd;
   int tmp_errno;
+  int size;
                        
   TRY(gras_trp_socket_new(1,&res));
 
@@ -224,16 +231,17 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
   
     if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) 
        || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) {
-      WARN0("setsockopt failed, cannot condition the accepted socket");
+       RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s",
+             strerror(errno));
     }
  
-    /* FIXME: bufSize removed until we can have optionsets 
-       i = ((gras_trp_tcp_sock_specific_t*)sock->specific)->buffsize;
-       if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&i, s)
-       || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&i, s)) {
-       WARNING0("setsockopt failed, cannot set buffsize");     
-       }
-    */
+    (*dst)->bufSize = sock->bufSize;
+    size = sock->bufSize * 1024;
+    if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size))
+       || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) {
+       WARN1("setsockopt failed, cannot set buffer size: %s",
+            strerror(errno));
+    }
      
     res->plugin    = sock->plugin;
     res->incoming  = sock->incoming;
@@ -245,7 +253,7 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
 
     /* FIXME: Lock to protect inet_ntoa */
     if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) {
-      res->peer_name = strdup("unknown");
+      res->peer_name = (char*)strdup("unknown");
     } else {
       struct in_addr addrAsInAddr;
       char *tmp;
@@ -254,9 +262,9 @@ gras_trp_tcp_socket_accept(gras_socket_t  *sock,
       
       tmp = inet_ntoa(addrAsInAddr);
       if (tmp != NULL) {
-       res->peer_name = strdup(tmp);
+       res->peer_name = (char*)strdup(tmp);
       } else {
-       res->peer_name = strdup("unknown");
+       res->peer_name = (char*)strdup("unknown");
       }
     }
 
@@ -393,3 +401,52 @@ static int TcpProtoNumber(void) {
   
   return returnValue;
 }
+
+/* Data exchange over raw sockets. Placing this in there is a kind of crude hack.
+   It means that the only possible raw are TCP where we may want to do UDP for them. 
+   But I fail to find a good internal organization for now. We may want to split 
+   raw and regular sockets more efficiently.
+*/
+gras_error_t gras_socket_raw_exchange(gras_socket_t *peer,
+                                     int sender,
+                                     unsigned int timeout,
+                                     unsigned long int exp_size,
+                                     unsigned long int msg_size) {
+   char *chunk;
+   int res_last, msg_sofar, exp_sofar;
+   
+   fd_set rd_set;
+   int rv;
+   
+   struct timeval timeOut;
+   
+   if (!(chunk = (char *)gras_malloc(msg_size)))
+     RAISE_MALLOC;
+   
+   for   (exp_sofar=0; exp_sofar < exp_size; exp_size += msg_sofar) {
+      for(msg_sofar=0; msg_sofar < msg_size; msg_size += res_last) {
+        
+        if(sender) {
+           res_last = send(peer->sd, chunk, msg_size - msg_sofar, 0);
+        } else {
+           res_last = 0;
+           FD_ZERO(&rd_set);
+           FD_SET(peer->sd,&rd_set);
+           timeOut.tv_sec = timeout;
+           timeOut.tv_usec = 0;
+               
+           if (0 < select(peer->sd+1,&rd_set,NULL,NULL,&timeOut))
+             res_last = recv(peer->sd, chunk, msg_size-msg_sofar, 0);
+           
+        }
+        if (res_last == 0) {
+          /* No progress done, bail out */
+          gras_free(chunk);
+          RAISE0(unknown_error,"Not exchanged a single byte, bailing out");
+        }
+      }
+   }
+   
+   gras_free(chunk);
+   return no_error;
+}
index 3f22f44..22a8539 100644 (file)
@@ -31,6 +31,8 @@ struct s_gras_socket  {
   int outgoing :1; /* true if we can write on this sock */
   int accepting :1; /* true if master incoming sock in tcp */
   int raw :1; /* true if this is an experiment socket instead of messaging */
+
+  unsigned long int bufSize; /* what to say to the OS. field here to remember it when accepting */
    
   int  sd; 
   int  port; /* port on this side */
@@ -75,4 +77,11 @@ gras_error_t gras_trp_buf_setup(gras_trp_plugin_t *plug);
 gras_error_t gras_trp_buf_init_sock(gras_socket_t *sock);
 
 
+/* Data exchange over raw sockets */
+gras_error_t gras_socket_raw_exchange(gras_socket_t *peer,
+                                     int sender,
+                                     unsigned int timeout,
+                                     unsigned long int expSize,
+                                     unsigned long int msgSize);
+
 #endif /* GRAS_TRP_PRIVATE_H */