Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Rework the Transport layer to simplify it and improve its performance
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 8 Sep 2005 18:54:47 +0000 (18:54 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 8 Sep 2005 18:54:47 +0000 (18:54 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@1700 48e7efb5-ca39-0410-a469-dd3cf9ba447f

12 files changed:
ChangeLog
src/Makefile.am
src/gras/DataDesc/ddt_exchange.c
src/gras/Msg/rl_msg.c
src/gras/Transport/sg_transport.c
src/gras/Transport/transport.c
src/gras/Transport/transport_interface.h
src/gras/Transport/transport_plugin_buf.c [deleted file]
src/gras/Transport/transport_plugin_file.c
src/gras/Transport/transport_plugin_sg.c
src/gras/Transport/transport_plugin_tcp.c
src/gras/Transport/transport_private.h

index 84754b0..fb5114a 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -15,12 +15,12 @@ SimGrid (3.0.1) unstable; urgency=low
       xbt_fifo_item_t xbt_fifo_get_prev_item(xbt_fifo_item_t i);
     [AL]
   * Bugfix: really disconnect fifo items which are remove_item()ed [AL]
-  * Doc: xbt_log module unmercifully reworked [MQ]
+  * Documentation: xbt_log module unmercifully reworked [MQ]
   
   MSG:
   * Add addtionnal checkings on channel values in communicating functions.
 
-  GRAS:
+  GRAS performance improvements:
   * Actually implement gras_datadesc_copy() so that we don't have to mimick
     RL communication on top of SG since it's so uneffective. 
     It may also be used for inter-thread communication in RL, one day. [MQ]
@@ -28,7 +28,37 @@ SimGrid (3.0.1) unstable; urgency=low
     Allows to:
     - improve message exchange performance on top of SG
     - deprecate transport_plugin_sg.c:gras_trp_sg_chunk_send() & recv()
-
+  * Don't exchange on the network the size of the used part of buffer,
+    instead, specify the possible buffer size to read(). [MQ] 
+    Advantages:
+     - reduces the amount of read/write calls (one pair per exchange)
+     - reduces the amount of exchanged data (the size)
+     - allows to retrieve all arrived data on receiver side, if we don't need
+       it right now (subsequent read will peek the buffer)
+     - allows the receiver to proceed with the begining of the stream before
+       everything is arrived
+     - make it possible to build an iov transport (using readv/writev)
+    Extra difficulty: 
+     - take care of the data with non-stable storage (like stacked data),
+       and bufferize them.
+  * If possible, TCP send uses vector I/O (when writev() is here) [MQ]
+     - Don't use it for receive since we send data sizes and data on the
+       same stream, so we wouldn't be able to chain large amount of chunks
+       before having to flush the stuff to read the size.
+  * Rework the transport plugin mecanism to simplify it and reduce the
+    amount of pointer dereferencement when searching for the right function 
+    to use. [MQ]
+
+  * I guess that now, we do almost as few system calls as possible while
+    doing as few data copy as possible.
+
+    To improve it further, we could try to send all the sizes first and then
+    all the data (to use iov on receiving size), but it's only a partial
+    solution: when you have 2 dimensional data, the sizes of the second
+    dimension is data of the first dimension, so you need 3 streams.
+
+    I'm not sure the potential performance gains justify the coding burden.
+    
  --
 
 SimGrid (3.00) stable; urgency=low
index 4cebdc3..03ca4dc 100644 (file)
@@ -131,8 +131,8 @@ COMMON_SRC=\
   gras/Virtu/process.c
 
 RL_SRC= \
-  gras/Transport/rl_transport.c  gras/Transport/transport_plugin_tcp.c  gras/Transport/transport_plugin_file.c  \
-  gras/Transport/transport_plugin_buf.c \
+  gras/Transport/rl_transport.c          \
+  gras/Transport/transport_plugin_file.c   gras/Transport/transport_plugin_tcp.c  \
   \
   gras/Virtu/rl_emul.c \
   gras/Virtu/rl_process.c        gras/Virtu/rl_time.c \
index 15c9a65..6519ec0 100644 (file)
@@ -11,7 +11,7 @@
 
 #include "xbt/ex.h"
 #include "gras/DataDesc/datadesc_private.h"
-#include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
+#include "gras/Transport/transport_interface.h" /* gras_trp_send/recv */
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ddt_exchange,datadesc,
                                 "Sending data over the network");
@@ -22,27 +22,17 @@ const char *gras_datadesc_cat_names[9] = {
 
 static gras_datadesc_type_t int_type = NULL;
 static gras_datadesc_type_t pointer_type = NULL;    
-static _XBT_INLINE void gras_dd_send_int(gras_socket_t sock,             int  i);
-static _XBT_INLINE void gras_dd_recv_int(gras_socket_t sock, int r_arch, int *i);
 
 static _XBT_INLINE void
-gras_dd_alloc_ref(xbt_dict_t  refs,  long int     size,
-                 char       **r_ref, long int     r_len,
-                 char       **l_ref, int detect_cycle);
-
-static _XBT_INLINE int
-gras_dd_is_r_null(char **r_ptr, long int length);
-
-static _XBT_INLINE void
-gras_dd_send_int(gras_socket_t sock,int i) {
+gras_dd_send_int(gras_socket_t sock,int *i, int stable) {
 
   if (!int_type) {
     int_type = gras_datadesc_by_name("int");
      xbt_assert(int_type);  
   }
    
-  DEBUG1("send_int(%d)",i);
-  gras_trp_chunk_send(sock, (char*)&i, int_type->size[GRAS_THISARCH]);
+  DEBUG1("send_int(%d)",*i);
+  gras_trp_send(sock, (char*)i, int_type->size[GRAS_THISARCH], stable);
 }
 
 static _XBT_INLINE void
@@ -54,13 +44,13 @@ gras_dd_recv_int(gras_socket_t sock, int r_arch, int *i) {
   }
 
   if (int_type->size[GRAS_THISARCH] >= int_type->size[r_arch]) {
-    gras_trp_chunk_recv(sock, (char*)i, int_type->size[r_arch]);
+    gras_trp_recv(sock, (char*)i, int_type->size[r_arch]);
     if (r_arch != GRAS_THISARCH)
       gras_dd_convert_elm(int_type,1,r_arch, i,i);
   } else {
     void *ptr = xbt_malloc(int_type->size[r_arch]);
 
-    gras_trp_chunk_recv(sock, (char*)ptr, int_type->size[r_arch]);
+    gras_trp_recv(sock, (char*)ptr, int_type->size[r_arch]);
     if (r_arch != GRAS_THISARCH)
       gras_dd_convert_elm(int_type,1,r_arch, ptr,i);
     free(ptr);
@@ -403,7 +393,7 @@ gras_datadesc_send_rec(gras_socket_t         sock,
 
   switch (type->category_code) {
   case e_gras_datadesc_type_cat_scalar:
-    gras_trp_chunk_send(sock, data, type->size[GRAS_THISARCH]);
+    gras_trp_send(sock, data, type->size[GRAS_THISARCH], 1);
     break;
 
   case e_gras_datadesc_type_cat_struct: {
@@ -457,7 +447,7 @@ gras_datadesc_send_rec(gras_socket_t         sock,
                 type->name, field_num, xbt_dynar_length(union_data.fields));
 
     /* Send the field number */
-    gras_dd_send_int(sock, field_num);
+    gras_dd_send_int(sock, &field_num, 0 /* not stable */);
     
     /* Send the content */
     field = xbt_dynar_get_as(union_data.fields, field_num, gras_dd_cat_field_t);
@@ -483,7 +473,7 @@ gras_datadesc_send_rec(gras_socket_t         sock,
     sub_type = ref_data.type;
     if (sub_type == NULL) {
       sub_type = (*ref_data.selector)(type,state,data);
-      gras_dd_send_int(sock, sub_type->code);
+      gras_dd_send_int(sock, &(sub_type->code),1 /*stable*/);
     }
     
     /* Send the actual value of the pointer for cycle handling */
@@ -492,8 +482,8 @@ gras_datadesc_send_rec(gras_socket_t         sock,
       xbt_assert(pointer_type);
     }
      
-    gras_trp_chunk_send(sock, (char*)data,
-                       pointer_type->size[GRAS_THISARCH]);
+    gras_trp_send(sock, (char*)data,
+                 pointer_type->size[GRAS_THISARCH], 1 /*stable*/);
     
     /* Send the pointed data only if not already sent */
     if (*(void**)data == NULL) {
@@ -531,7 +521,7 @@ gras_datadesc_send_rec(gras_socket_t         sock,
 
   case e_gras_datadesc_type_cat_array: {
     gras_dd_cat_array_t    array_data;
-    long int               count;
+    int                    count;
     char                  *ptr=data;
     long int               elm_size;
     
@@ -543,24 +533,26 @@ gras_datadesc_send_rec(gras_socket_t         sock,
       count = array_data.dynamic_size(type,state,data);
       xbt_assert1(count >=0,
                   "Invalid (negative) array size for type %s",type->name);
-      gras_dd_send_int(sock, count);
+      gras_dd_send_int(sock, &count, 0/*non-stable*/);
     }
     
     /* send the content */
     sub_type = array_data.type;
     elm_size = sub_type->aligned_size[GRAS_THISARCH];
     if (sub_type->category_code == e_gras_datadesc_type_cat_scalar) {
-      VERB1("Array of %ld scalars, send it in one shot",count);
-      gras_trp_chunk_send(sock, data, 
-                         sub_type->aligned_size[GRAS_THISARCH] * count);
+      VERB1("Array of %d scalars, send it in one shot",count);
+      gras_trp_send(sock, data, 
+                   sub_type->aligned_size[GRAS_THISARCH] * count,
+                   0 /* not stable */);
     } else if (sub_type->category_code == e_gras_datadesc_type_cat_array &&
               sub_type->category.array_data.fixed_size > 0 &&
               sub_type->category.array_data.type->category_code == e_gras_datadesc_type_cat_scalar) {
        
-      VERB1("Array of %ld fixed array of scalars, send it in one shot",count);
-      gras_trp_chunk_send(sock, data, 
-                         sub_type->category.array_data.type->aligned_size[GRAS_THISARCH] 
-                         * count * sub_type->category.array_data.fixed_size);
+      VERB1("Array of %d fixed array of scalars, send it in one shot",count);
+      gras_trp_send(sock, data, 
+                   sub_type->category.array_data.type->aligned_size[GRAS_THISARCH] 
+                   * count * sub_type->category.array_data.fixed_size,
+                   0 /* not stable */);
        
     } else {
       for (cpt=0; cpt<count; cpt++) {
@@ -640,13 +632,13 @@ gras_datadesc_recv_rec(gras_socket_t         sock,
   switch (type->category_code) {
   case e_gras_datadesc_type_cat_scalar:
     if (type->size[GRAS_THISARCH] == type->size[r_arch]) {
-      gras_trp_chunk_recv(sock, (char*)l_data, type->size[r_arch]);
+      gras_trp_recv(sock, (char*)l_data, type->size[r_arch]);
       if (r_arch != GRAS_THISARCH)
        gras_dd_convert_elm(type,1,r_arch, l_data,l_data);
     } else {
       void *ptr = xbt_malloc(type->size[r_arch]);
 
-      gras_trp_chunk_recv(sock, (char*)ptr, type->size[r_arch]);
+      gras_trp_recv(sock, (char*)ptr, type->size[r_arch]);
       if (r_arch != GRAS_THISARCH)
        gras_dd_convert_elm(type,1,r_arch, ptr,l_data);
       free(ptr);
@@ -740,8 +732,8 @@ gras_datadesc_recv_rec(gras_socket_t         sock,
 
     r_ref = xbt_malloc(pointer_type->size[r_arch]);
 
-    gras_trp_chunk_recv(sock, (char*)r_ref,
-                       pointer_type->size[r_arch]);
+    gras_trp_recv(sock, (char*)r_ref,
+                 pointer_type->size[r_arch]);
 
     /* Receive the pointed data only if not already sent */
     if (gras_dd_is_r_null(r_ref, pointer_type->size[r_arch])) {
@@ -838,15 +830,15 @@ gras_datadesc_recv_rec(gras_socket_t         sock,
       VERB1("Array of %d scalars, get it in one shoot", count);
       if (sub_type->aligned_size[GRAS_THISARCH] >= 
          sub_type->aligned_size[r_arch]) {
-       gras_trp_chunk_recv(sock, (char*)l_data, 
-                           sub_type->aligned_size[r_arch] * count);
+       gras_trp_recv(sock, (char*)l_data, 
+                     sub_type->aligned_size[r_arch] * count);
        if (r_arch != GRAS_THISARCH)
          gras_dd_convert_elm(sub_type,count,r_arch, l_data,l_data);
       } else {
        ptr = xbt_malloc(sub_type->aligned_size[r_arch] * count);
 
-       gras_trp_chunk_recv(sock, (char*)ptr, 
-                           sub_type->size[r_arch] * count);
+       gras_trp_recv(sock, (char*)ptr, 
+                     sub_type->size[r_arch] * count);
        if (r_arch != GRAS_THISARCH)
          gras_dd_convert_elm(sub_type,count,r_arch, ptr,l_data);
        free(ptr);
@@ -861,16 +853,16 @@ gras_datadesc_recv_rec(gras_socket_t         sock,
       VERB1("Array of %d fixed array of scalars, get it in one shot",count);
       if (subsub_type->aligned_size[GRAS_THISARCH] >= 
          subsub_type->aligned_size[r_arch]) {
-       gras_trp_chunk_recv(sock, (char*)l_data, 
-                           subsub_type->aligned_size[r_arch] * count * 
-                           array_data.fixed_size);
+       gras_trp_recv(sock, (char*)l_data, 
+                     subsub_type->aligned_size[r_arch] * count * 
+                     array_data.fixed_size);
        if (r_arch != GRAS_THISARCH)
          gras_dd_convert_elm(subsub_type,count*array_data.fixed_size,r_arch, l_data,l_data);
       } else {
        ptr = xbt_malloc(subsub_type->aligned_size[r_arch] * count*array_data.fixed_size);
 
-       gras_trp_chunk_recv(sock, (char*)ptr, 
-                           subsub_type->size[r_arch] * count*array_data.fixed_size);
+       gras_trp_recv(sock, (char*)ptr, 
+                     subsub_type->size[r_arch] * count*array_data.fixed_size);
        if (r_arch != GRAS_THISARCH)
          gras_dd_convert_elm(subsub_type,count*array_data.fixed_size,r_arch, ptr,l_data);
        free(ptr);
index b39e7db..61405d0 100644 (file)
@@ -11,7 +11,7 @@
 #include "gras/Msg/msg_private.h"
 
 #include "gras/DataDesc/datadesc_interface.h"
-#include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
+#include "gras/Transport/transport_interface.h" /* gras_trp_send/recv */
 
 XBT_LOG_EXTERNAL_CATEGORY(gras_msg);
 XBT_LOG_DEFAULT_CATEGORY(gras_msg);
@@ -36,7 +36,7 @@ gras_msg_send(gras_socket_t   sock,
 
   DEBUG3("send '%s' to %s:%d", msgtype->name, 
         gras_socket_peer_name(sock),gras_socket_peer_port(sock));
-  gras_trp_chunk_send(sock, _GRAS_header, 6);
+  gras_trp_send(sock, _GRAS_header, 6, 1 /* stable */);
 
   gras_datadesc_send(sock, string_type,   &msgtype->name);
   if (msgtype->ctn_type)
@@ -67,7 +67,7 @@ gras_msg_recv(gras_socket_t    sock,
   }
   
   TRY {
-    gras_trp_chunk_recv(sock, header, 6);
+    gras_trp_recv(sock, header, 6);
   } CATCH(e) {
     RETHROW1("Exception caught while trying to get the mesage header on socket %p: %s",
             sock);
index 1e999ea..7992390 100644 (file)
@@ -136,14 +136,10 @@ gras_socket_t gras_trp_select(double timeout) {
   
 /* dummy implementations of the functions used in RL mode */
 
-void gras_trp_tcp_setup(gras_trp_plugin_t plug) {
-}
-void gras_trp_file_setup(gras_trp_plugin_t plug) {
-  THROW0(mismatch_error,0,NULL);
-}
-void gras_trp_buf_setup(gras_trp_plugin_t plug) {
-  THROW0(mismatch_error,0,NULL);
-}
-void gras_trp_buf_init_sock(gras_socket_t sock) {
-}
+void gras_trp_tcp_setup(gras_trp_plugin_t plug) {  THROW0(mismatch_error,0,NULL); }
+void gras_trp_file_setup(gras_trp_plugin_t plug){  THROW0(mismatch_error,0,NULL); }
+void gras_trp_buf_setup(gras_trp_plugin_t plug) {  THROW0(mismatch_error,0,NULL); }
+void gras_trp_iov_setup(gras_trp_plugin_t plug) {  THROW0(mismatch_error,0,NULL); }
+
+gras_socket_t gras_trp_buf_init_sock(gras_socket_t sock) { return sock;}
    
index d88a401..4b2e1be 100644 (file)
@@ -81,7 +81,6 @@ void gras_trp_init(void){
 #endif
    
      /* Add plugins */
-     gras_trp_plugin_new("tcp", gras_trp_tcp_setup);
      gras_trp_plugin_new("file",gras_trp_file_setup);
      gras_trp_plugin_new("sg",gras_trp_sg_setup);
 
@@ -315,21 +314,15 @@ void gras_socket_close(gras_socket_t sock) {
 }
 
 /**
- * gras_trp_chunk_send:
+ * gras_trp_send:
  *
  * Send a bunch of bytes from on socket
+ * (stable if we know the storage will keep as is until the next trp_flush)
  */
 void
-gras_trp_chunk_send(gras_socket_t sd,
-                   char *data,
-                   long int size) {
-  xbt_assert1(sd->outgoing,
-              "Socket not suited for data send (outgoing=%c)",
-              sd->outgoing?'y':'n');
-  xbt_assert1(sd->plugin->chunk_send,
-              "No function chunk_send on transport plugin %s",
-              sd->plugin->name);
-  (*sd->plugin->chunk_send)(sd,data,size);
+gras_trp_send(gras_socket_t sd, char *data, long int size, int stable) {
+  xbt_assert0(sd->outgoing,"Socket not suited for data send");
+  (*sd->plugin->send)(sd,data,size,stable);
 }
 /**
  * gras_trp_chunk_recv:
@@ -337,15 +330,9 @@ gras_trp_chunk_send(gras_socket_t sd,
  * Receive a bunch of bytes from a socket
  */
 void
-gras_trp_chunk_recv(gras_socket_t sd,
-                   char *data,
-                   long int size) {
-  xbt_assert0(sd->incoming,
-              "Socket not suited for data receive");
-  xbt_assert1(sd->plugin->chunk_recv,
-              "No function chunk_recv on transport plugin %s",
-              sd->plugin->name);
-  (sd->plugin->chunk_recv)(sd,data,size,size);
+gras_trp_recv(gras_socket_t sd, char *data, long int size) {
+  xbt_assert0(sd->incoming,"Socket not suited for data receive");
+  (sd->plugin->recv)(sd,data,size);
 }
 
 /**
@@ -355,7 +342,8 @@ gras_trp_chunk_recv(gras_socket_t sd,
  */
 void
 gras_trp_flush(gras_socket_t sd) {
-  (sd->plugin->flush)(sd);
+  if (sd->plugin->flush)
+    (sd->plugin->flush)(sd);
 }
 
 gras_trp_plugin_t
@@ -402,12 +390,13 @@ void gras_socket_meas_send(gras_socket_t peer,
   XBT_IN;
 
   xbt_assert0(peer->meas,"Asked to send measurement data on a regular socket");
+  xbt_assert0(peer->outgoing,"Socket not suited for data send");
 
   for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
      CDEBUG5(trp_meas,"Sent %lu of %lu (msg_size=%ld) to %s:%d",
             exp_sofar,exp_size,msg_size,
             gras_socket_peer_name(peer), gras_socket_peer_port(peer));
-     gras_trp_chunk_send(peer,chunk,msg_size);
+     (*peer->plugin->raw_send)(peer,chunk,msg_size);
   }
   CDEBUG5(trp_meas,"Sent %lu of %lu (msg_size=%ld) to %s:%d",
          exp_sofar,exp_size,msg_size,
@@ -433,13 +422,15 @@ void gras_socket_meas_recv(gras_socket_t peer,
 
   XBT_IN;
 
-  xbt_assert0(peer->meas,"Asked to receive measurement data on a regular socket\n");
+  xbt_assert0(peer->meas,
+             "Asked to receive measurement data on a regular socket");
+  xbt_assert0(peer->incoming,"Socket not suited for data receive");
 
   for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
      CDEBUG5(trp_meas,"Recvd %ld of %lu (msg_size=%ld) from %s:%d",
             exp_sofar,exp_size,msg_size,
             gras_socket_peer_name(peer), gras_socket_peer_port(peer));
-     gras_trp_chunk_recv(peer,chunk,msg_size);
+     (peer->plugin->raw_recv)(peer,chunk,msg_size);
   }
   CDEBUG5(trp_meas,"Recvd %ld of %lu (msg_size=%ld) from %s:%d",
          exp_sofar,exp_size,msg_size,
index 249db65..5766ffd 100644 (file)
 /***
  *** Main user functions
  ***/
-void gras_trp_chunk_send(gras_socket_t sd,
-                        char *data,
-                        long int size);
-void gras_trp_chunk_recv(gras_socket_t sd,
-                        char *data,
-                        long int size);
+/* stable if we know the storage will keep as is until the next trp_flush */
+void gras_trp_send(gras_socket_t sd, char *data, long int size, int stable);
+void gras_trp_recv(gras_socket_t sd, char *data, long int size);
 void gras_trp_flush(gras_socket_t sd);
 
 /* Find which socket needs to be read next */
@@ -53,13 +50,21 @@ struct gras_trp_plugin_ {
     but should not free the socket itself (beside the specific part) */
   void (*socket_close)(gras_socket_t sd);
     
-  void (*chunk_send)(gras_socket_t sd,
-                    const char *data,
-                    unsigned long int size);
-  void (*chunk_recv)(gras_socket_t sd,
-                    char *data,
-                    unsigned long int size,
-                    unsigned long int bufsize);
+  /* send/recv may be buffered */
+  void (*send)(gras_socket_t sd,
+              const char *data,
+              unsigned long int size,
+              int stable /* storage will survive until flush*/);
+  int (*recv)(gras_socket_t sd,
+             char *data,
+             unsigned long int size);
+  /* raw_send/raw_recv is never buffered (use it for measurement stuff) */
+  void (*raw_send)(gras_socket_t sd,
+                  const char *data,
+                  unsigned long int size);
+  int (*raw_recv)(gras_socket_t sd,
+                 char *data,
+                 unsigned long int size);
 
   /* flush has to make sure that the pending communications are achieved */
   void (*flush)(gras_socket_t sd);
diff --git a/src/gras/Transport/transport_plugin_buf.c b/src/gras/Transport/transport_plugin_buf.c
deleted file mode 100644 (file)
index aca9d0a..0000000
+++ /dev/null
@@ -1,307 +0,0 @@
-/* $Id$ */
-
-/* buf trp (transport) - buffered transport using the TCP one            */
-
-/* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include <stdlib.h>
-#include <string.h>       /* memset */
-
-#include "portable.h"
-#include "xbt/misc.h"
-#include "xbt/sysdep.h"
-#include "xbt/ex.h"
-#include "transport_private.h"
-
-XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_buf,transport,
-      "Generic buffered transport (works on top of TCP or Files, but not SG)");
-
-
-static gras_trp_plugin_t _buf_super;
-
-/***
- *** Prototypes 
- ***/
-void hexa_print(const char*name, unsigned char *data, int size);   /* in gras.c */
-   
-void gras_trp_buf_socket_client(gras_trp_plugin_t self,
-                               gras_socket_t sock);
-void gras_trp_buf_socket_server(gras_trp_plugin_t self,
-                               gras_socket_t sock);
-gras_socket_t gras_trp_buf_socket_accept(gras_socket_t sock);
-
-void         gras_trp_buf_socket_close(gras_socket_t sd);
-  
-void gras_trp_buf_chunk_send(gras_socket_t sd,
-                            const char *data,
-                            unsigned long int size);
-
-void gras_trp_buf_chunk_recv(gras_socket_t sd,
-                            char *data,
-                            unsigned long int size,
-                            unsigned long int bufsize);
-void gras_trp_buf_flush(gras_socket_t sock);
-
-
-/***
- *** Specific plugin part
- ***/
-
-typedef struct {
-  int junk;
-} gras_trp_buf_plug_data_t;
-
-/***
- *** Specific socket part
- ***/
-
-typedef struct {
-  int size;
-  char *data;
-  int pos; /* for receive; not exchanged over the net */
-} gras_trp_buf_t;
-
-struct gras_trp_bufdata_{
-  gras_trp_buf_t in;
-  gras_trp_buf_t out;
-  int buffsize;
-};
-
-void gras_trp_buf_init_sock(gras_socket_t sock) {
-  gras_trp_bufdata_t *data=xbt_new(gras_trp_bufdata_t,1);
-  
-  XBT_IN;
-  data->buffsize = 100 * 1024 ; /* 100k */ 
-
-  data->in.size  = 0;
-  data->in.data  = xbt_malloc(data->buffsize);
-  data->in.pos   = 0; /* useless, indeed, since size==pos */
-   
-   /* In SG, the 4 first bytes are for the chunk size as htonl'ed, so that we can send it in one shoot.
-    * This is mandatory in SG because all emissions go to the same channel, so if we split them,
-    * they can get mixed. */
-  data->out.size = 0;
-  data->out.data = xbt_malloc(data->buffsize);
-  data->out.pos  = data->out.size;
-   
-  sock->bufdata = data;
-}
-
-/***
- *** Code
- ***/
-void
-gras_trp_buf_setup(gras_trp_plugin_t plug) {
-  gras_trp_buf_plug_data_t *data =xbt_new(gras_trp_buf_plug_data_t,1);
-
-  XBT_IN;
-  _buf_super = gras_trp_plugin_get_by_name("tcp");
-
-  DEBUG1("Derivate a buffer plugin from %s","tcp");
-
-  plug->socket_client = gras_trp_buf_socket_client;
-  plug->socket_server = gras_trp_buf_socket_server;
-  plug->socket_accept = gras_trp_buf_socket_accept;
-  plug->socket_close  = gras_trp_buf_socket_close;
-
-  plug->chunk_send    = gras_trp_buf_chunk_send;
-  plug->chunk_recv    = gras_trp_buf_chunk_recv;
-
-  plug->flush         = gras_trp_buf_flush;
-
-  plug->data = (void*)data;
-  plug->exit = NULL;
-}
-
-void gras_trp_buf_socket_client(gras_trp_plugin_t self,
-                               /* OUT */ gras_socket_t sock){
-
-  XBT_IN;
-  _buf_super->socket_client(_buf_super,sock);
-  sock->plugin = self;
-  gras_trp_buf_init_sock(sock);
-}
-
-/**
- * gras_trp_buf_socket_server:
- *
- * Open a socket used to receive messages.
- */
-void gras_trp_buf_socket_server(gras_trp_plugin_t self,
-                               /* OUT */ gras_socket_t sock){
-
-  XBT_IN;
-  _buf_super->socket_server(_buf_super,sock);
-  sock->plugin = self;
-  gras_trp_buf_init_sock(sock);
-}
-
-gras_socket_t
-gras_trp_buf_socket_accept(gras_socket_t  sock) {
-
-  gras_socket_t res;
-      
-  XBT_IN;
-  res = _buf_super->socket_accept(sock);
-  res->plugin = sock->plugin;
-  gras_trp_buf_init_sock(res);
-  XBT_OUT;
-  return res;
-}
-
-void gras_trp_buf_socket_close(gras_socket_t sock){
-  gras_trp_bufdata_t *data=sock->bufdata;
-
-  XBT_IN;
-  if (data->in.size!=data->in.pos) {
-     WARN3("Socket closed, but %d bytes were unread (size=%d,pos=%d)",
-          data->in.size - data->in.pos,data->in.size, data->in.pos);
-  }
-   
-  if (data->out.size!=data->out.pos) {
-    DEBUG2("Flush the socket before closing (in=%d,out=%d)",data->in.size, data->out.size);
-    gras_trp_buf_flush(sock);
-  }
-   
-  if (data->in.data)
-    free(data->in.data);
-  if (data->out.data)
-    free(data->out.data);
-  free(data);
-
-  _buf_super->socket_close(sock);
-}
-
-/**
- * gras_trp_buf_chunk_send:
- *
- * Send data on a buffered socket
- */
-void
-gras_trp_buf_chunk_send(gras_socket_t sock,
-                       const char *chunk,
-                       unsigned long int size) {
-
-  gras_trp_bufdata_t *data=(gras_trp_bufdata_t*)sock->bufdata;
-  int chunk_pos=0;
-
-  XBT_IN;
-  /* Let underneath plugin check for direction, we work even in duplex */
-  xbt_assert0(size >= 0, "Cannot send a negative amount of data");
-
-  while (chunk_pos < size) {
-    /* size of the chunck to receive in that shot */
-    long int thissize = min(size-chunk_pos,data->buffsize - data->out.size);
-    DEBUG5("Set the chars %d..%ld into the buffer (size=%ld, ctn='%.*s')",
-          (int)data->out.size,
-          ((int)data->out.size) + thissize -1,
-          size, chunk_pos, chunk);
-
-    memcpy(data->out.data + data->out.size, chunk + chunk_pos, thissize);
-
-    data->out.size += thissize;
-    chunk_pos      += thissize;
-    DEBUG5("New pos = %d; Still to send = %ld of %ld; ctn sofar='%.*s'",
-          data->out.size,size-chunk_pos,size,(int)chunk_pos,chunk);
-
-    if (data->out.size == data->buffsize) /* out of space. Flush it */
-      gras_trp_buf_flush(sock);
-  }
-
-  XBT_OUT;
-}
-
-/**
- * gras_trp_buf_chunk_recv:
- *
- * Receive data on a buffered socket.
- */
-void
-gras_trp_buf_chunk_recv(gras_socket_t sock,
-                       char *chunk,
-                       unsigned long int size,
-                       unsigned long int bufsize) {
-
-  xbt_ex_t e;
-  gras_trp_bufdata_t *data=sock->bufdata;
-  long int chunck_pos = 0;
-  /* Let underneath plugin check for direction, we work even in duplex */
-  xbt_assert0(sock, "Cannot recv on an NULL socket");
-  xbt_assert0(size >= 0, "Cannot receive a negative amount of data");
-  
-  XBT_IN;
-
-  while (chunck_pos < size) {
-    /* size of the chunck to receive in that shot */
-    long int thissize;
-
-    if (data->in.size == data->in.pos) { /* out of data. Get more */
-      int nextsize;
-      DEBUG0("Recv the size");
-      TRY {
-       _buf_super->chunk_recv(sock,(char*)&nextsize, 4,4);
-      } CATCH(e) {
-       RETHROW3("Unable to get the chunk size on %p (peer = %s:%d): %s",
-                sock,gras_socket_peer_name(sock),gras_socket_peer_port(sock));
-      }
-      data->in.size = (int)ntohl(nextsize);
-      VERB1("Recv the chunk (size=%d)",data->in.size);
-       
-      _buf_super->chunk_recv(sock, data->in.data, data->in.size, data->in.size);
-       
-      data->in.pos=0;
-    }
-     
-    thissize = min(size-chunck_pos ,  data->in.size - data->in.pos);
-    DEBUG2("Get the chars %d..%ld out of the buffer",
-          data->in.pos,
-          data->in.pos + thissize - 1);
-    memcpy(chunk+chunck_pos, data->in.data + data->in.pos, thissize);
-
-    data->in.pos += thissize;
-    chunck_pos   += thissize;
-    DEBUG5("New pos = %d; Still to receive = %ld of %ld. Ctn so far='%.*s'",
-          data->in.pos,size - chunck_pos,size,(int)chunck_pos,chunk);
-  }
-
-  XBT_OUT;
-}
-
-/**
- * gras_trp_buf_flush:
- *
- * Make sure the data is sent
- */
-void
-gras_trp_buf_flush(gras_socket_t sock) {
-  int size;
-  gras_trp_bufdata_t *data=sock->bufdata;
-  XBT_IN;    
-  
-  DEBUG0("Flush");
-  if (XBT_LOG_ISENABLED(trp_buf,xbt_log_priority_debug))
-     hexa_print("chunck to send ",(unsigned char *) data->out.data,data->out.size);
-  if ((data->out.size - data->out.pos) == 0) { 
-     DEBUG2("Nothing to flush (size=%d; pos=%d)",data->out.size,data->out.pos);
-     return;
-  }
-   
-  size = (int)data->out.size - data->out.pos;
-  DEBUG3("Send the size (=%d) to %s:%d",data->out.size-data->out.pos,
-        gras_socket_peer_name(sock),gras_socket_peer_port(sock));
-  size = (int)htonl(size);
-  _buf_super->chunk_send(sock,(char*) &size, 4);
-      
-
-  DEBUG3("Send the chunk (size=%d) to %s:%d",data->out.size,
-        gras_socket_peer_name(sock),gras_socket_peer_port(sock));
-  _buf_super->chunk_send(sock, data->out.data, data->out.size);
-  VERB1("Chunk sent (size=%d)",data->out.size);
-  if (XBT_LOG_ISENABLED(trp_buf,xbt_log_priority_debug))
-     hexa_print("chunck sent    ",(unsigned char *) data->out.data,data->out.size);
-  data->out.size = 0;
-}
index 7e7eae0..0d6f7ab 100644 (file)
@@ -19,15 +19,17 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_file,transport,
  ***/
 void gras_trp_file_close(gras_socket_t sd);
   
+void gras_trp_file_chunk_send_raw(gras_socket_t sd,
+                                 const char *data,
+                                 unsigned long int size);
 void gras_trp_file_chunk_send(gras_socket_t sd,
                              const char *data,
-                             unsigned long int size);
-
-void gras_trp_file_chunk_recv(gras_socket_t sd,
-                             char *data,
                              unsigned long int size,
-                             unsigned long int bufsize);
+                             int stable_ignored);
 
+int gras_trp_file_chunk_recv(gras_socket_t sd,
+                            char *data,
+                            unsigned long int size);
 
 /***
  *** Specific plugin part
@@ -54,8 +56,12 @@ gras_trp_file_setup(gras_trp_plugin_t plug) {
   FD_ZERO(&(file->incoming_socks));
 
   plug->socket_close = gras_trp_file_close;
-  plug->chunk_send   = gras_trp_file_chunk_send;
-  plug->chunk_recv   = gras_trp_file_chunk_recv;
+
+  plug->raw_send = gras_trp_file_chunk_send_raw;
+  plug->send = gras_trp_file_chunk_send;
+
+  plug->raw_recv = plug->recv = gras_trp_file_chunk_recv;
+
   plug->data         = (void*)file;
 }
 
@@ -170,7 +176,14 @@ void gras_trp_file_close(gras_socket_t sock){
 void
 gras_trp_file_chunk_send(gras_socket_t sock,
                         const char *data,
-                        unsigned long int size) {
+                        unsigned long int size,
+                        int stable_ignored) {
+  gras_trp_file_chunk_send_raw(sock,data,size);
+}
+void
+gras_trp_file_chunk_send_raw(gras_socket_t sock,
+                            const char *data,
+                            unsigned long int size) {
   
   xbt_assert0(sock->outgoing, "Cannot write on client file socket");
   xbt_assert0(size >= 0, "Cannot send a negative amount of data");
@@ -200,36 +213,36 @@ gras_trp_file_chunk_send(gras_socket_t sock,
  *
  * Receive data on a file pseudo-socket.
  */
-void
+int
 gras_trp_file_chunk_recv(gras_socket_t sock,
                         char *data,
-                        unsigned long int size,
-                        unsigned long int bufsize) {
+                        unsigned long int size) {
+
+  int got = 0;
 
   xbt_assert0(sock, "Cannot recv on an NULL socket");
   xbt_assert0(sock->incoming, "Cannot recv on client file socket");
   xbt_assert0(size >= 0, "Cannot receive a negative amount of data");
-  xbt_assert0(bufsize>=size,"Not enough buffer size to receive that much data");
 
   while (size) {
     int status = 0;
     
-    status = read(sock->sd, data, (long int)bufsize);
-    DEBUG3("read(%d, %p, %ld);", sock->sd, data, size);
+    status = read(sock->sd, data+got, (long int)size);
+    DEBUG3("read(%d, %p, %ld);", sock->sd, data+got, size);
     
     if (status == -1) {
       THROW4(system_error,0,"read(%d,%p,%d) failed: %s",
-            sock->sd, data, (int)size,
+            sock->sd, data+got, (int)size,
             strerror(errno));
     }
     
     if (status) {
       size    -= status;
-      bufsize -= status;
-      data    += status;
+      got    += status;
     } else {
       THROW0(system_error,0,"file descriptor closed");
     }
   }
+  return got;
 }
 
index 8c81ca2..092c9b7 100644 (file)
@@ -24,7 +24,6 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_sg,transport,"SimGrid pseudo-transport");
 /***
  *** Prototypes 
  ***/
-void hexa_print(unsigned char *data, int size);   /* in gras.c */
 
 /* retrieve the port record associated to a numerical port on an host */
 static void find_port(gras_hostdata_t *hd, int port, gras_sg_portrec_t *hpd);
@@ -36,14 +35,17 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self,
                               /* OUT */ gras_socket_t sock);
 void gras_trp_sg_socket_close(gras_socket_t sd);
 
+void gras_trp_sg_chunk_send_raw(gras_socket_t sd,
+                               const char *data,
+                               unsigned long int size);
 void gras_trp_sg_chunk_send(gras_socket_t sd,
                            const char *data,
-                           unsigned long int size);
+                           unsigned long int size,
+                           int stable_ignored);
 
-void gras_trp_sg_chunk_recv(gras_socket_t sd,
+int gras_trp_sg_chunk_recv(gras_socket_t sd,
                            char *data,
-                           unsigned long int size,
-                           unsigned long int bufsize);
+                           unsigned long int size);
 
 /***
  *** Specific plugin part
@@ -84,8 +86,9 @@ gras_trp_sg_setup(gras_trp_plugin_t plug) {
   plug->socket_server = gras_trp_sg_socket_server;
   plug->socket_close  = gras_trp_sg_socket_close;
 
-  plug->chunk_send    = gras_trp_sg_chunk_send;
-  plug->chunk_recv    = gras_trp_sg_chunk_recv;
+  plug->raw_send = gras_trp_sg_chunk_send_raw;
+  plug->send = gras_trp_sg_chunk_send;
+  plug->raw_recv = plug->recv = gras_trp_sg_chunk_recv;
 
   plug->flush         = NULL; /* nothing cached */
 }
@@ -236,7 +239,14 @@ typedef struct {
 
 void gras_trp_sg_chunk_send(gras_socket_t sock,
                            const char *data,
-                           unsigned long int size) {
+                           unsigned long int size,
+                           int stable_ignored) {
+  gras_trp_sg_chunk_send_raw(sock,data,size);
+}
+
+void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
+                               const char *data,
+                               unsigned long int size) {
   m_task_t task=NULL;
   static unsigned int count=0;
   char name[256];
@@ -262,10 +272,9 @@ void gras_trp_sg_chunk_send(gras_socket_t sock,
   }
 }
 
-void gras_trp_sg_chunk_recv(gras_socket_t sock,
+int gras_trp_sg_chunk_recv(gras_socket_t sock,
                            char *data,
-                           unsigned long int size,
-                           unsigned long int bufsize){
+                           unsigned long int size){
   gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_get("gras_trp");
 
   m_task_t task=NULL;
@@ -282,23 +291,13 @@ void gras_trp_sg_chunk_recv(gras_socket_t sock,
   DEBUG1("Got chuck %s",MSG_task_get_name(task));
 
   task_data = MSG_task_get_data(task);
-  if (size != -1) {    
-     if (task_data->size != size)
-       THROW5(mismatch_error,0,
-             "Got %d bytes when %ld where expected (in %s->%s:%d)",
-             task_data->size, size,
-             MSG_host_get_name(sock_data->to_host),
-             MSG_host_get_name(MSG_host_self()), sock_data->to_chan);
-     memcpy(data,task_data->data,size);
-  } else {
-     /* damn, the size is embeeded at the begining of the chunk */
-     int netsize;
-     
-     memcpy((char*)&netsize,task_data->data,4);
-     DEBUG1("netsize embeeded = %d",netsize);
-
-     memcpy(data,task_data->data,netsize+4);
-  }
+  if (task_data->size != size)
+    THROW5(mismatch_error,0,
+          "Got %d bytes when %ld where expected (in %s->%s:%d)",
+          task_data->size, size,
+          MSG_host_get_name(sock_data->to_host),
+          MSG_host_get_name(MSG_host_self()), sock_data->to_chan);
+  memcpy(data,task_data->data,size);
   free(task_data->data);
   free(task_data);
 
@@ -306,5 +305,6 @@ void gras_trp_sg_chunk_recv(gras_socket_t sock,
     THROW0(system_error,0,"Error in MSG_task_destroy()");
 
   XBT_OUT;
+  return size;
 }
 
index 21cdf35..256820f 100644 (file)
@@ -1,94 +1,63 @@
 /* $Id$ */
 
-/* tcp trp (transport) - send/receive a bunch of bytes from a tcp socket    */
+/* buf trp (transport) - buffered transport using the TCP one            */
 
 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
+#include <stdlib.h>
+#include <string.h>       /* memset */
+
 #include "portable.h"
+#include "xbt/misc.h"
+#include "xbt/sysdep.h"
 #include "xbt/ex.h"
-#if 0
-#  include <signal.h>       /* close() pipe() read() write() */
-#  include <sys/wait.h>     /* waitpid() */
-#endif
-
-
 #include "transport_private.h"
 
-XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport,"TCP transport");
-
-/***
- *** Prototypes 
- ***/
-void gras_trp_tcp_socket_client(gras_trp_plugin_t self, gras_socket_t sock);
-void gras_trp_tcp_socket_server(gras_trp_plugin_t self, gras_socket_t sock);
-gras_socket_t gras_trp_tcp_socket_accept(gras_socket_t  sock);
-
-void          gras_trp_tcp_socket_close(gras_socket_t sd);
-  
-void gras_trp_tcp_chunk_send(gras_socket_t sd,
-                            const char *data,
-                            unsigned long int size);
-
-void gras_trp_tcp_chunk_recv(gras_socket_t sd,
-                            char *data,
-                            unsigned long int size,
-                            unsigned long int bufsize);
-
-void gras_trp_tcp_exit(gras_trp_plugin_t plug);
-
-
-static int TcpProtoNumber(void);
-/***
- *** Specific plugin part
- ***/
+#ifndef MIN
+#define MIN(a,b) ((a)<(b)?(a):(b))
+#endif
 
-typedef struct {
-  fd_set msg_socks;
-  fd_set meas_socks;
-} gras_trp_tcp_plug_data_t;
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport,
+      "TCP buffered transport");
 
 /***
  *** Specific socket part
  ***/
 
-typedef struct {
-  int buffsize;
-} gras_trp_tcp_sock_data_t;
-
+typedef enum { buffering_buf, buffering_iov } buffering_kind;
 
-/***
- *** Code
- ***/
-void gras_trp_tcp_setup(gras_trp_plugin_t plug) {
-
-  gras_trp_tcp_plug_data_t *data = xbt_new(gras_trp_tcp_plug_data_t,1);
+typedef struct {
+  int size;
+  char *data;
+  int pos; /* for receive; not exchanged over the net */
+} gras_trp_buf_t;
 
-  FD_ZERO(&(data->msg_socks));
-  FD_ZERO(&(data->meas_socks));
 
-  plug->socket_client = gras_trp_tcp_socket_client;
-  plug->socket_server = gras_trp_tcp_socket_server;
-  plug->socket_accept = gras_trp_tcp_socket_accept;
-  plug->socket_close  = gras_trp_tcp_socket_close;
+struct gras_trp_bufdata_{
+  int buffsize;
+  gras_trp_buf_t in_buf;
+  gras_trp_buf_t out_buf;
 
-  plug->chunk_send    = gras_trp_tcp_chunk_send;
-  plug->chunk_recv    = gras_trp_tcp_chunk_recv;
+#ifdef HAVE_READV
+  xbt_dynar_t in_buf_v;
+  xbt_dynar_t out_buf_v;
+#endif
 
-  plug->flush = NULL; /* nothing's cached */
+  buffering_kind in;
+  buffering_kind out;
+};
 
-  plug->data = (void*)data;
-  plug->exit = gras_trp_tcp_exit;
-}
 
-void gras_trp_tcp_exit(gras_trp_plugin_t plug) {
-  DEBUG1("Exit plugin TCP (free %p)", plug->data);
-  free(plug->data);
-}
+/*****************************/
+/****[ SOCKET MANAGEMENT ]****/
+/*****************************/
+static int _gras_tcp_proto_number(void);
 
-void gras_trp_tcp_socket_client(gras_trp_plugin_t self, gras_socket_t sock){
+static inline void gras_trp_sock_socket_client(gras_trp_plugin_t ignored,
+                                              gras_socket_t sock){
   
   struct sockaddr_in addr;
   struct hostent *he;
@@ -127,21 +96,21 @@ void gras_trp_tcp_socket_client(gras_trp_plugin_t self, gras_socket_t sock){
           "Failed to connect socket to %s:%d (%s)",
           sock->peer_name, sock->peer_port, sock_errstr);
   }
-  VERB4("Connect to %s:%d (sd=%d, port %d here)",sock->peer_name, sock->peer_port, sock->sd, sock->port);
+  VERB4("Connect to %s:%d (sd=%d, port %d here)",
+       sock->peer_name, sock->peer_port, sock->sd, sock->port);
 }
 
 /**
- * gras_trp_tcp_socket_server:
+ * gras_trp_sock_socket_server:
  *
  * Open a socket used to receive messages.
  */
-void gras_trp_tcp_socket_server(gras_trp_plugin_t self, gras_socket_t sock){
+static inline void gras_trp_sock_socket_server(gras_trp_plugin_t ignored,
+                                              gras_socket_t sock){
   int size = sock->bufSize * 1024; 
   int on = 1;
   struct sockaddr_in server;
 
-  gras_trp_tcp_plug_data_t *tcp=(gras_trp_tcp_plug_data_t*)self->data;
   sock->outgoing  = 1; /* TCP => duplex mode */
 
   server.sin_port = htons((u_short)sock->port);
@@ -171,15 +140,10 @@ void gras_trp_tcp_socket_server(gras_trp_plugin_t self, gras_socket_t sock){
     THROW2(system_error,0,"Cannot listen on port %d: %s",sock->port,sock_errstr);
   }
 
-  if (sock->meas)
-    FD_SET(sock->sd, &(tcp->meas_socks));
-  else
-    FD_SET(sock->sd, &(tcp->msg_socks));
-
   VERB2("Openned a server socket on port %d (sd=%d)",sock->port,sock->sd);
 }
 
-gras_socket_t gras_trp_tcp_socket_accept(gras_socket_t  sock) {
+static gras_socket_t gras_trp_sock_socket_accept(gras_socket_t sock) {
   gras_socket_t res;
   
   struct sockaddr_in peer_in;
@@ -205,7 +169,7 @@ gras_socket_t gras_trp_tcp_socket_accept(gras_socket_t  sock) {
   }
   
   if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) 
-      || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s))
+      || setsockopt(sd, _gras_tcp_proto_number(), TCP_NODELAY, (char *)&i, s))
     THROW1(system_error,0,"setsockopt failed, cannot condition the socket: %s",
           sock_errstr);
 
@@ -246,11 +210,9 @@ gras_socket_t gras_trp_tcp_socket_accept(gras_socket_t  sock) {
   return res;
 }
 
-void gras_trp_tcp_socket_close(gras_socket_t sock){
-  gras_trp_tcp_plug_data_t *tcp;
+static void gras_trp_sock_socket_close(gras_socket_t sock){
   
   if (!sock) return; /* close only once */
-  tcp=sock->plugin->data;
 
   VERB1("close tcp connection %d", sock->sd);
 
@@ -268,16 +230,6 @@ void gras_trp_tcp_socket_close(gras_socket_t sock){
     }
   } */
 
-#ifndef HAVE_WINSOCK_H
-  /* forget about the socket 
-     ... but not when using winsock since accept'ed socket can not fit 
-     into the fd_set*/
-  if (sock->meas){
-    FD_CLR(sock->sd, &(tcp->meas_socks));
-  } else {
-    FD_CLR(sock->sd, &(tcp->msg_socks));
-  }
-#endif
    
   /* close the socket */
   if(tcp_close(sock->sd) < 0) {
@@ -286,20 +238,20 @@ void gras_trp_tcp_socket_close(gras_socket_t sock){
   }
 
 }
-
-/**
- * gras_trp_tcp_chunk_send:
- *
- * Send data on a TCP socket
- */
-void
-gras_trp_tcp_chunk_send(gras_socket_t sock,
-                       const char *data,
-                       unsigned long int size) {
+/************************************/
+/****[ end of SOCKET MANAGEMENT ]****/
+/************************************/
+
+
+/************************************/
+/****[ UNBUFFERED DATA EXCHANGE ]****/
+/************************************/
+/* Temptation to merge this with file data exchange is great, 
+   but doesn't work on BillWare (see tcp_write() in portable.h) */
+static inline void gras_trp_tcp_send(gras_socket_t sock,
+                                    const char *data,
+                                    unsigned long int size) {
   
-  /* TCP sockets are in duplex mode, don't check direction */
-  xbt_assert0(size >= 0, "Cannot send a negative amount of data");
-
   while (size) {
     int status = 0;
     
@@ -321,52 +273,379 @@ gras_trp_tcp_chunk_send(gras_socket_t sock,
     }
   }
 }
-/**
- * gras_trp_tcp_chunk_recv:
- *
- * Receive data on a TCP socket.
- */
-void
-gras_trp_tcp_chunk_recv(gras_socket_t sock,
-                       char *data,
-                       unsigned long int size,
-                       unsigned long int bufsize) {
-
-  /* TCP sockets are in duplex mode, don't check direction */
-  xbt_assert0(sock, "Cannot recv on an NULL socket");
-  xbt_assert0(size >= 0, "Cannot receive a negative amount of data");
-  xbt_assert0(bufsize>=size,"Not enough buffer size to receive that much data");
-  
-  while (size) {
+static inline int 
+gras_trp_tcp_recv_withbuffer(gras_socket_t sock,
+                                  char *data,
+                                  unsigned long int size,
+                                  unsigned long int bufsize) {
+
+  int got = 0;
+
+  while (size>got) {
     int status = 0;
     
-    DEBUG3("read(%d, %p, %ld);", sock->sd, data, size);
-    status = tcp_read(sock->sd, data, (size_t)bufsize);
+    DEBUG5("read(%d, %p, %ld) got %d so far (%s)",
+         sock->sd, data+got, bufsize, got,
+         hexa_str(data,got));
+    status = tcp_read(sock->sd, data+got, (size_t)bufsize);
     
     if (status < 0) {
       THROW4(system_error,0,"read(%d,%p,%d) failed: %s",
-            sock->sd, data, (int)size,
+            sock->sd, data+got, (int)size,
             sock_errstr);
     }
+    DEBUG2("Got %d more bytes (%s)",status,hexa_str(data+got,status));
     
     if (status) {
-      size    -= status;
       bufsize -= status;
-      data    += status;
+      got     += status;
     } else {
-      gras_socket_close(sock);
       THROW0(system_error,0,"Socket closed by remote side");
     }
   }
+  return got;
+}
+
+static int gras_trp_tcp_recv(gras_socket_t sock,
+                                  char *data,
+                                  unsigned long int size) {
+  return gras_trp_tcp_recv_withbuffer(sock,data,size,size);
+
 }
+/*******************************************/
+/****[ end of UNBUFFERED DATA EXCHANGE ]****/
+/*******************************************/
+
+/**********************************/
+/****[ BUFFERED DATA EXCHANGE ]****/
+/**********************************/
+
+/* Make sure the data is sent */
+static void
+gras_trp_bufiov_flush(gras_socket_t sock) {
+#ifdef HAVE_READV
+  xbt_dynar_t vect;
+  int size;
+#endif
+  gras_trp_bufdata_t *data=sock->bufdata;
+  XBT_IN;    
+  
+  DEBUG0("Flush");
+  if (data->out == buffering_buf) {
+    if (XBT_LOG_ISENABLED(trp_tcp,xbt_log_priority_debug))
+      hexa_print("chunk to send ",
+                (unsigned char *) data->out_buf.data,data->out_buf.size);
+    if ((data->out_buf.size - data->out_buf.pos) != 0) { 
+      DEBUG3("Send the chunk (size=%d) to %s:%d",data->out_buf.size,
+            gras_socket_peer_name(sock),gras_socket_peer_port(sock));
+      gras_trp_tcp_send(sock, data->out_buf.data, data->out_buf.size);
+      VERB1("Chunk sent (size=%d)",data->out_buf.size);
+      data->out_buf.size = 0;
+    }
+  }
+
+#ifdef HAVE_READV
+  if (data->out == buffering_iov) {
+    vect = sock->bufdata->out_buf_v;
+    if ((size = xbt_dynar_length(vect))) {
+      DEBUG1("Flush %d chunks out of this socket",size);
+      writev(sock->sd,xbt_dynar_get_ptr(vect,0),size);
+      xbt_dynar_reset(vect);
+    }
+    data->out_buf.size = 0; /* reset the buffer containing non-stable data */
+  }
+
+  if (data->in == buffering_iov) {
+    vect = sock->bufdata->in_buf_v;
+    if ((size = xbt_dynar_length(vect))) {
+      DEBUG1("Get %d chunks from of this socket",size);
+      readv(sock->sd,xbt_dynar_get_ptr(vect,0),size);
+      xbt_dynar_reset(vect);
+    }
+  }
+#endif
+}
+static void
+gras_trp_buf_send(gras_socket_t sock,
+                 const char *chunk,
+                 unsigned long int size,
+                 int stable_ignored) {
+
+  gras_trp_bufdata_t *data=(gras_trp_bufdata_t*)sock->bufdata;
+  int chunk_pos=0;
+
+  XBT_IN;
+
+  while (chunk_pos < size) {
+    /* size of the chunk to receive in that shot */
+    long int thissize = min(size-chunk_pos,data->buffsize-data->out_buf.size);
+    DEBUG4("Set the chars %d..%ld into the buffer; size=%ld, ctn=(%s)",
+          (int)data->out_buf.size,
+          ((int)data->out_buf.size) + thissize -1,
+          size,
+          hexa_str((char*)chunk,thissize));
+
+    memcpy(data->out_buf.data + data->out_buf.size, chunk + chunk_pos, thissize);
+
+    data->out_buf.size += thissize;
+    chunk_pos      += thissize;
+    DEBUG4("New pos = %d; Still to send = %ld of %ld; ctn sofar=(%s)",
+          data->out_buf.size,size-chunk_pos,size,hexa_str((char*)chunk,chunk_pos));
+
+    if (data->out_buf.size == data->buffsize) /* out of space. Flush it */
+      gras_trp_bufiov_flush(sock);
+  }
+
+  XBT_OUT;
+}
+
+static int
+gras_trp_buf_recv(gras_socket_t sock,
+                 char *chunk,
+                 unsigned long int size) {
+
+  gras_trp_bufdata_t *data=sock->bufdata;
+  long int chunk_pos = 0;
+   
+  XBT_IN;
 
+  while (chunk_pos < size) {
+    /* size of the chunk to receive in that shot */
+    long int thissize;
+
+    if (data->in_buf.size == data->in_buf.pos) { /* out of data. Get more */
+
+      DEBUG2("Get more data (size=%d,bufsize=%d)",
+            (int)MIN(size-chunk_pos,data->buffsize),
+            (int)data->buffsize);
+
+       
+      data->in_buf.size = 
+       gras_trp_tcp_recv_withbuffer(sock, data->in_buf.data, 
+                                    MIN(size-chunk_pos,data->buffsize),
+                                    data->buffsize);
+       
+      data->in_buf.pos=0;
+    }
+     
+    thissize = min(size-chunk_pos ,  data->in_buf.size - data->in_buf.pos);
+    memcpy(chunk+chunk_pos, data->in_buf.data + data->in_buf.pos, thissize);
+
+    data->in_buf.pos += thissize;
+    chunk_pos        += thissize;
+    DEBUG4("New pos = %d; Still to receive = %ld of %ld. Ctn so far=(%s)",
+          data->in_buf.pos,size - chunk_pos,size,hexa_str(chunk,chunk_pos));
+  }
+
+  XBT_OUT;
+  return chunk_pos;
+}
+
+/*****************************************/
+/****[ end of BUFFERED DATA EXCHANGE ]****/
+/*****************************************/
+
+/********************************/
+/****[ VECTOR DATA EXCHANGE ]****/
+/********************************/
+#ifdef HAVE_READV
+static void
+gras_trp_iov_send(gras_socket_t sock,
+                 const char *chunk,
+                 unsigned long int size,
+                 int stable) {
+  struct iovec elm;
+  gras_trp_bufdata_t *data=(gras_trp_bufdata_t*)sock->bufdata;
+    
+
+  DEBUG1("Buffer one chunk to be sent later (%s)",
+       hexa_str((char*)chunk,size));
+
+  elm.iov_len = (size_t)size;
+
+  if (!stable) {
+    /* data storage won't last until flush. Save it in a buffer if we can */
+
+    if (size > data->buffsize-data->out_buf.size) {
+      /* buffer too small: 
+        flush the socket, using data in its actual storage */
+      elm.iov_base = (void*)chunk;
+      xbt_dynar_push(data->out_buf_v,&elm);
+
+      gras_trp_bufiov_flush(sock);      
+      return;
+    } else {
+      /* buffer big enough: 
+        copy data into it, and chain it for upcoming writev */
+      memcpy(data->out_buf.data + data->out_buf.size, chunk, size);
+      elm.iov_base = (void*)(data->out_buf.data + data->out_buf.size);
+      data->out_buf.size += size;
+
+      xbt_dynar_push(data->out_buf_v,&elm);
+    }
+
+  } else {
+    /* data storage stable. Chain it */
+    
+    elm.iov_base = (void*)chunk;
+    xbt_dynar_push(data->out_buf_v,&elm);
+  }
+}
+static int
+gras_trp_iov_recv(gras_socket_t sock,
+                 char *chunk,
+                 unsigned long int size) {
+  struct iovec elm;
+
+  DEBUG0("Buffer one chunk to be received later");
+  elm.iov_base = (void*)chunk;
+  elm.iov_len = (size_t)size;
+  xbt_dynar_push(sock->bufdata->in_buf_v,&elm);
+
+  return size;
+}
+
+#endif
+/***************************************/
+/****[ end of VECTOR DATA EXCHANGE ]****/
+/***************************************/
+
+
+/***
+ *** Prototypes of BUFFERED
+ ***/
+   
+void gras_trp_buf_socket_client(gras_trp_plugin_t self,
+                               gras_socket_t sock);
+void gras_trp_buf_socket_server(gras_trp_plugin_t self,
+                               gras_socket_t sock);
+gras_socket_t gras_trp_buf_socket_accept(gras_socket_t sock);
+
+void         gras_trp_buf_socket_close(gras_socket_t sd);
+  
+
+gras_socket_t gras_trp_buf_init_sock(gras_socket_t sock) {
+  gras_trp_bufdata_t *data=xbt_new(gras_trp_bufdata_t,1);
+  
+  data->buffsize = 100 * 1024 ; /* 100k */ 
+
+  data->in_buf.size  = 0;
+  data->in_buf.data  = xbt_malloc(data->buffsize);
+  data->in_buf.pos   = 0; /* useless, indeed, since size==pos */
+   
+  data->out_buf.size = 0;
+  data->out_buf.data = xbt_malloc(data->buffsize);
+  data->out_buf.pos  = data->out_buf.size;
+
+  data->in_buf_v = data->out_buf_v = NULL;
+#ifdef HAVE_READV
+  data->in_buf_v=xbt_dynar_new(sizeof(struct iovec),NULL);
+  data->out_buf_v=xbt_dynar_new(sizeof(struct iovec),NULL);
+#endif
+   
+  data->in = buffering_buf;
+  data->out = buffering_iov;
+  /*data->out = buffering_buf;*/
+
+  sock->bufdata = data;
+  return sock;
+}
+
+/***
+ *** Code
+ ***/
+void
+gras_trp_buf_setup(gras_trp_plugin_t plug) {
+
+  plug->socket_client = gras_trp_buf_socket_client;
+  plug->socket_server = gras_trp_buf_socket_server;
+  plug->socket_accept = gras_trp_buf_socket_accept;
+  plug->socket_close  = gras_trp_buf_socket_close;
+
+  plug->send = gras_trp_iov_send;
+  /*plug->send = gras_trp_buf_send;*/
+  plug->recv = gras_trp_buf_recv;
+
+  plug->raw_send    = gras_trp_tcp_send;
+  plug->raw_recv    = gras_trp_tcp_recv;
+
+  plug->flush         = gras_trp_bufiov_flush;
+
+  plug->data = NULL;
+  plug->exit = NULL;
+}
+
+void gras_trp_buf_socket_client(gras_trp_plugin_t self,
+                               /* OUT */ gras_socket_t sock){
+
+  gras_trp_sock_socket_client(NULL,sock);
+  gras_trp_buf_init_sock(sock);
+}
+
+/**
+ * gras_trp_buf_socket_server:
+ *
+ * Open a socket used to receive messages.
+ */
+void gras_trp_buf_socket_server(gras_trp_plugin_t self,
+                               /* OUT */ gras_socket_t sock){
+
+  gras_trp_sock_socket_server(NULL,sock);
+  gras_trp_buf_init_sock(sock);
+}
+
+gras_socket_t gras_trp_buf_socket_accept(gras_socket_t sock) {
+  return gras_trp_buf_init_sock(gras_trp_sock_socket_accept(sock));
+}
+
+void gras_trp_buf_socket_close(gras_socket_t sock){
+  gras_trp_bufdata_t *data=sock->bufdata;
+
+  if (data->in_buf.size!=data->in_buf.pos) {
+     WARN3("Socket closed, but %d bytes were unread (size=%d,pos=%d)",
+          data->in_buf.size - data->in_buf.pos,
+          data->in_buf.size, data->in_buf.pos);
+  }
+  if (data->in_buf.data)
+    free(data->in_buf.data);
+   
+  if (data->out_buf.size!=data->out_buf.pos) {
+    DEBUG2("Flush the socket before closing (in=%d,out=%d)",
+          data->in_buf.size, data->out_buf.size);
+    gras_trp_bufiov_flush(sock);
+  }   
+  if (data->out_buf.data)
+    free(data->out_buf.data);
+
+#ifdef HAVE_READV
+  if (data->in_buf_v) {
+    if (xbt_dynar_length(data->in_buf_v)) 
+      WARN0("Socket closed, but some bytes were unread");
+    xbt_dynar_free(&data->in_buf_v);
+  }
+  if (data->out_buf_v) {
+    if (xbt_dynar_length(data->out_buf_v)) {
+      DEBUG0("Flush the socket before closing");
+      gras_trp_bufiov_flush(sock);
+    }
+    xbt_dynar_free(&data->out_buf_v);
+  }
+#endif
+
+  free(data);
+  gras_trp_sock_socket_close(sock);
+}
+
+/****************************/
+/****[ HELPER FUNCTIONS ]****/
+/****************************/
 
 /*
  * Returns the tcp protocol number from the network protocol data base.
  *
  * getprotobyname() is not thread safe. We need to lock it.
  */
-static int TcpProtoNumber(void) {
+static int _gras_tcp_proto_number(void) {
   struct protoent *fetchedEntry;
   static int returnValue = 0;
   
@@ -453,3 +732,7 @@ const char *gras_wsa_err2string( int err ) {
    return "unknown WSA error";
 }
 #endif /* HAVE_WINSOCK_H */
+
+/***********************************/
+/****[ end of HELPER FUNCTIONS ]****/
+/***********************************/
index 0a8d031..484cac4 100644 (file)
@@ -60,6 +60,7 @@ void gras_trp_socket_new(int incomming,
 typedef void (*gras_trp_setup_t)(gras_trp_plugin_t dst);
 
 void gras_trp_tcp_setup(gras_trp_plugin_t plug);
+void gras_trp_iov_setup(gras_trp_plugin_t plug);
 void gras_trp_file_setup(gras_trp_plugin_t plug);
 void gras_trp_sg_setup(gras_trp_plugin_t plug);
 void gras_trp_buf_setup(gras_trp_plugin_t plug);
@@ -82,17 +83,8 @@ void gras_trp_buf_setup(gras_trp_plugin_t plug);
 
 */
 
-void gras_trp_buf_init_sock(gras_socket_t sock);
+gras_socket_t gras_trp_buf_init_sock(gras_socket_t sock);
 
-
-/* Data exchange over measurement sockets */ /* FIXME: KILLME */
-/*
-void gras_socket_meas_exchange(gras_socket_t peer,
-                                     int sender,
-                                     unsigned int timeout,
-                                     unsigned long int expSize,
-                                     unsigned long int msgSize);
-*/
 xbt_dynar_t gras_socketset_get(void); /* FIXME:KILLME */
 
 #endif /* GRAS_TRP_PRIVATE_H */