From 87468a9f6b682ee41edc3a16a100554ef48032c9 Mon Sep 17 00:00:00 2001 From: mquinson Date: Thu, 8 Sep 2005 18:54:47 +0000 Subject: [PATCH] Rework the Transport layer to simplify it and improve its performance git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@1700 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- ChangeLog | 36 +- src/Makefile.am | 4 +- src/gras/DataDesc/ddt_exchange.c | 78 ++- src/gras/Msg/rl_msg.c | 6 +- src/gras/Transport/sg_transport.c | 16 +- src/gras/Transport/transport.c | 41 +- src/gras/Transport/transport_interface.h | 31 +- src/gras/Transport/transport_plugin_buf.c | 307 ------------ src/gras/Transport/transport_plugin_file.c | 47 +- src/gras/Transport/transport_plugin_sg.c | 56 +-- src/gras/Transport/transport_plugin_tcp.c | 537 ++++++++++++++++----- src/gras/Transport/transport_private.h | 12 +- 12 files changed, 583 insertions(+), 588 deletions(-) delete mode 100644 src/gras/Transport/transport_plugin_buf.c diff --git a/ChangeLog b/ChangeLog index 84754b0e3e..fb5114a200 100644 --- a/ChangeLog +++ b/ChangeLog @@ -15,12 +15,12 @@ SimGrid (3.0.1) unstable; urgency=low xbt_fifo_item_t xbt_fifo_get_prev_item(xbt_fifo_item_t i); [AL] * Bugfix: really disconnect fifo items which are remove_item()ed [AL] - * Doc: xbt_log module unmercifully reworked [MQ] + * Documentation: xbt_log module unmercifully reworked [MQ] MSG: * Add addtionnal checkings on channel values in communicating functions. - GRAS: + GRAS performance improvements: * Actually implement gras_datadesc_copy() so that we don't have to mimick RL communication on top of SG since it's so uneffective. It may also be used for inter-thread communication in RL, one day. [MQ] @@ -28,7 +28,37 @@ SimGrid (3.0.1) unstable; urgency=low Allows to: - improve message exchange performance on top of SG - deprecate transport_plugin_sg.c:gras_trp_sg_chunk_send() & recv() - + * Don't exchange on the network the size of the used part of buffer, + instead, specify the possible buffer size to read(). [MQ] + Advantages: + - reduces the amount of read/write calls (one pair per exchange) + - reduces the amount of exchanged data (the size) + - allows to retrieve all arrived data on receiver side, if we don't need + it right now (subsequent read will peek the buffer) + - allows the receiver to proceed with the begining of the stream before + everything is arrived + - make it possible to build an iov transport (using readv/writev) + Extra difficulty: + - take care of the data with non-stable storage (like stacked data), + and bufferize them. + * If possible, TCP send uses vector I/O (when writev() is here) [MQ] + - Don't use it for receive since we send data sizes and data on the + same stream, so we wouldn't be able to chain large amount of chunks + before having to flush the stuff to read the size. + * Rework the transport plugin mecanism to simplify it and reduce the + amount of pointer dereferencement when searching for the right function + to use. [MQ] + + * I guess that now, we do almost as few system calls as possible while + doing as few data copy as possible. + + To improve it further, we could try to send all the sizes first and then + all the data (to use iov on receiving size), but it's only a partial + solution: when you have 2 dimensional data, the sizes of the second + dimension is data of the first dimension, so you need 3 streams. + + I'm not sure the potential performance gains justify the coding burden. + -- SimGrid (3.00) stable; urgency=low diff --git a/src/Makefile.am b/src/Makefile.am index 4cebdc3f3f..03ca4dc2fe 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -131,8 +131,8 @@ COMMON_SRC=\ gras/Virtu/process.c RL_SRC= \ - gras/Transport/rl_transport.c gras/Transport/transport_plugin_tcp.c gras/Transport/transport_plugin_file.c \ - gras/Transport/transport_plugin_buf.c \ + gras/Transport/rl_transport.c \ + gras/Transport/transport_plugin_file.c gras/Transport/transport_plugin_tcp.c \ \ gras/Virtu/rl_emul.c \ gras/Virtu/rl_process.c gras/Virtu/rl_time.c \ diff --git a/src/gras/DataDesc/ddt_exchange.c b/src/gras/DataDesc/ddt_exchange.c index 15c9a658a0..6519ec0ecc 100644 --- a/src/gras/DataDesc/ddt_exchange.c +++ b/src/gras/DataDesc/ddt_exchange.c @@ -11,7 +11,7 @@ #include "xbt/ex.h" #include "gras/DataDesc/datadesc_private.h" -#include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */ +#include "gras/Transport/transport_interface.h" /* gras_trp_send/recv */ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ddt_exchange,datadesc, "Sending data over the network"); @@ -22,27 +22,17 @@ const char *gras_datadesc_cat_names[9] = { static gras_datadesc_type_t int_type = NULL; static gras_datadesc_type_t pointer_type = NULL; -static _XBT_INLINE void gras_dd_send_int(gras_socket_t sock, int i); -static _XBT_INLINE void gras_dd_recv_int(gras_socket_t sock, int r_arch, int *i); static _XBT_INLINE void -gras_dd_alloc_ref(xbt_dict_t refs, long int size, - char **r_ref, long int r_len, - char **l_ref, int detect_cycle); - -static _XBT_INLINE int -gras_dd_is_r_null(char **r_ptr, long int length); - -static _XBT_INLINE void -gras_dd_send_int(gras_socket_t sock,int i) { +gras_dd_send_int(gras_socket_t sock,int *i, int stable) { if (!int_type) { int_type = gras_datadesc_by_name("int"); xbt_assert(int_type); } - DEBUG1("send_int(%d)",i); - gras_trp_chunk_send(sock, (char*)&i, int_type->size[GRAS_THISARCH]); + DEBUG1("send_int(%d)",*i); + gras_trp_send(sock, (char*)i, int_type->size[GRAS_THISARCH], stable); } static _XBT_INLINE void @@ -54,13 +44,13 @@ gras_dd_recv_int(gras_socket_t sock, int r_arch, int *i) { } if (int_type->size[GRAS_THISARCH] >= int_type->size[r_arch]) { - gras_trp_chunk_recv(sock, (char*)i, int_type->size[r_arch]); + gras_trp_recv(sock, (char*)i, int_type->size[r_arch]); if (r_arch != GRAS_THISARCH) gras_dd_convert_elm(int_type,1,r_arch, i,i); } else { void *ptr = xbt_malloc(int_type->size[r_arch]); - gras_trp_chunk_recv(sock, (char*)ptr, int_type->size[r_arch]); + gras_trp_recv(sock, (char*)ptr, int_type->size[r_arch]); if (r_arch != GRAS_THISARCH) gras_dd_convert_elm(int_type,1,r_arch, ptr,i); free(ptr); @@ -403,7 +393,7 @@ gras_datadesc_send_rec(gras_socket_t sock, switch (type->category_code) { case e_gras_datadesc_type_cat_scalar: - gras_trp_chunk_send(sock, data, type->size[GRAS_THISARCH]); + gras_trp_send(sock, data, type->size[GRAS_THISARCH], 1); break; case e_gras_datadesc_type_cat_struct: { @@ -457,7 +447,7 @@ gras_datadesc_send_rec(gras_socket_t sock, type->name, field_num, xbt_dynar_length(union_data.fields)); /* Send the field number */ - gras_dd_send_int(sock, field_num); + gras_dd_send_int(sock, &field_num, 0 /* not stable */); /* Send the content */ field = xbt_dynar_get_as(union_data.fields, field_num, gras_dd_cat_field_t); @@ -483,7 +473,7 @@ gras_datadesc_send_rec(gras_socket_t sock, sub_type = ref_data.type; if (sub_type == NULL) { sub_type = (*ref_data.selector)(type,state,data); - gras_dd_send_int(sock, sub_type->code); + gras_dd_send_int(sock, &(sub_type->code),1 /*stable*/); } /* Send the actual value of the pointer for cycle handling */ @@ -492,8 +482,8 @@ gras_datadesc_send_rec(gras_socket_t sock, xbt_assert(pointer_type); } - gras_trp_chunk_send(sock, (char*)data, - pointer_type->size[GRAS_THISARCH]); + gras_trp_send(sock, (char*)data, + pointer_type->size[GRAS_THISARCH], 1 /*stable*/); /* Send the pointed data only if not already sent */ if (*(void**)data == NULL) { @@ -531,7 +521,7 @@ gras_datadesc_send_rec(gras_socket_t sock, case e_gras_datadesc_type_cat_array: { gras_dd_cat_array_t array_data; - long int count; + int count; char *ptr=data; long int elm_size; @@ -543,24 +533,26 @@ gras_datadesc_send_rec(gras_socket_t sock, count = array_data.dynamic_size(type,state,data); xbt_assert1(count >=0, "Invalid (negative) array size for type %s",type->name); - gras_dd_send_int(sock, count); + gras_dd_send_int(sock, &count, 0/*non-stable*/); } /* send the content */ sub_type = array_data.type; elm_size = sub_type->aligned_size[GRAS_THISARCH]; if (sub_type->category_code == e_gras_datadesc_type_cat_scalar) { - VERB1("Array of %ld scalars, send it in one shot",count); - gras_trp_chunk_send(sock, data, - sub_type->aligned_size[GRAS_THISARCH] * count); + VERB1("Array of %d scalars, send it in one shot",count); + gras_trp_send(sock, data, + sub_type->aligned_size[GRAS_THISARCH] * count, + 0 /* not stable */); } else if (sub_type->category_code == e_gras_datadesc_type_cat_array && sub_type->category.array_data.fixed_size > 0 && sub_type->category.array_data.type->category_code == e_gras_datadesc_type_cat_scalar) { - VERB1("Array of %ld fixed array of scalars, send it in one shot",count); - gras_trp_chunk_send(sock, data, - sub_type->category.array_data.type->aligned_size[GRAS_THISARCH] - * count * sub_type->category.array_data.fixed_size); + VERB1("Array of %d fixed array of scalars, send it in one shot",count); + gras_trp_send(sock, data, + sub_type->category.array_data.type->aligned_size[GRAS_THISARCH] + * count * sub_type->category.array_data.fixed_size, + 0 /* not stable */); } else { for (cpt=0; cptcategory_code) { case e_gras_datadesc_type_cat_scalar: if (type->size[GRAS_THISARCH] == type->size[r_arch]) { - gras_trp_chunk_recv(sock, (char*)l_data, type->size[r_arch]); + gras_trp_recv(sock, (char*)l_data, type->size[r_arch]); if (r_arch != GRAS_THISARCH) gras_dd_convert_elm(type,1,r_arch, l_data,l_data); } else { void *ptr = xbt_malloc(type->size[r_arch]); - gras_trp_chunk_recv(sock, (char*)ptr, type->size[r_arch]); + gras_trp_recv(sock, (char*)ptr, type->size[r_arch]); if (r_arch != GRAS_THISARCH) gras_dd_convert_elm(type,1,r_arch, ptr,l_data); free(ptr); @@ -740,8 +732,8 @@ gras_datadesc_recv_rec(gras_socket_t sock, r_ref = xbt_malloc(pointer_type->size[r_arch]); - gras_trp_chunk_recv(sock, (char*)r_ref, - pointer_type->size[r_arch]); + gras_trp_recv(sock, (char*)r_ref, + pointer_type->size[r_arch]); /* Receive the pointed data only if not already sent */ if (gras_dd_is_r_null(r_ref, pointer_type->size[r_arch])) { @@ -838,15 +830,15 @@ gras_datadesc_recv_rec(gras_socket_t sock, VERB1("Array of %d scalars, get it in one shoot", count); if (sub_type->aligned_size[GRAS_THISARCH] >= sub_type->aligned_size[r_arch]) { - gras_trp_chunk_recv(sock, (char*)l_data, - sub_type->aligned_size[r_arch] * count); + gras_trp_recv(sock, (char*)l_data, + sub_type->aligned_size[r_arch] * count); if (r_arch != GRAS_THISARCH) gras_dd_convert_elm(sub_type,count,r_arch, l_data,l_data); } else { ptr = xbt_malloc(sub_type->aligned_size[r_arch] * count); - gras_trp_chunk_recv(sock, (char*)ptr, - sub_type->size[r_arch] * count); + gras_trp_recv(sock, (char*)ptr, + sub_type->size[r_arch] * count); if (r_arch != GRAS_THISARCH) gras_dd_convert_elm(sub_type,count,r_arch, ptr,l_data); free(ptr); @@ -861,16 +853,16 @@ gras_datadesc_recv_rec(gras_socket_t sock, VERB1("Array of %d fixed array of scalars, get it in one shot",count); if (subsub_type->aligned_size[GRAS_THISARCH] >= subsub_type->aligned_size[r_arch]) { - gras_trp_chunk_recv(sock, (char*)l_data, - subsub_type->aligned_size[r_arch] * count * - array_data.fixed_size); + gras_trp_recv(sock, (char*)l_data, + subsub_type->aligned_size[r_arch] * count * + array_data.fixed_size); if (r_arch != GRAS_THISARCH) gras_dd_convert_elm(subsub_type,count*array_data.fixed_size,r_arch, l_data,l_data); } else { ptr = xbt_malloc(subsub_type->aligned_size[r_arch] * count*array_data.fixed_size); - gras_trp_chunk_recv(sock, (char*)ptr, - subsub_type->size[r_arch] * count*array_data.fixed_size); + gras_trp_recv(sock, (char*)ptr, + subsub_type->size[r_arch] * count*array_data.fixed_size); if (r_arch != GRAS_THISARCH) gras_dd_convert_elm(subsub_type,count*array_data.fixed_size,r_arch, ptr,l_data); free(ptr); diff --git a/src/gras/Msg/rl_msg.c b/src/gras/Msg/rl_msg.c index b39e7db01e..61405d0425 100644 --- a/src/gras/Msg/rl_msg.c +++ b/src/gras/Msg/rl_msg.c @@ -11,7 +11,7 @@ #include "gras/Msg/msg_private.h" #include "gras/DataDesc/datadesc_interface.h" -#include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */ +#include "gras/Transport/transport_interface.h" /* gras_trp_send/recv */ XBT_LOG_EXTERNAL_CATEGORY(gras_msg); XBT_LOG_DEFAULT_CATEGORY(gras_msg); @@ -36,7 +36,7 @@ gras_msg_send(gras_socket_t sock, DEBUG3("send '%s' to %s:%d", msgtype->name, gras_socket_peer_name(sock),gras_socket_peer_port(sock)); - gras_trp_chunk_send(sock, _GRAS_header, 6); + gras_trp_send(sock, _GRAS_header, 6, 1 /* stable */); gras_datadesc_send(sock, string_type, &msgtype->name); if (msgtype->ctn_type) @@ -67,7 +67,7 @@ gras_msg_recv(gras_socket_t sock, } TRY { - gras_trp_chunk_recv(sock, header, 6); + gras_trp_recv(sock, header, 6); } CATCH(e) { RETHROW1("Exception caught while trying to get the mesage header on socket %p: %s", sock); diff --git a/src/gras/Transport/sg_transport.c b/src/gras/Transport/sg_transport.c index 1e999eaef7..7992390e1c 100644 --- a/src/gras/Transport/sg_transport.c +++ b/src/gras/Transport/sg_transport.c @@ -136,14 +136,10 @@ gras_socket_t gras_trp_select(double timeout) { /* dummy implementations of the functions used in RL mode */ -void gras_trp_tcp_setup(gras_trp_plugin_t plug) { -} -void gras_trp_file_setup(gras_trp_plugin_t plug) { - THROW0(mismatch_error,0,NULL); -} -void gras_trp_buf_setup(gras_trp_plugin_t plug) { - THROW0(mismatch_error,0,NULL); -} -void gras_trp_buf_init_sock(gras_socket_t sock) { -} +void gras_trp_tcp_setup(gras_trp_plugin_t plug) { THROW0(mismatch_error,0,NULL); } +void gras_trp_file_setup(gras_trp_plugin_t plug){ THROW0(mismatch_error,0,NULL); } +void gras_trp_buf_setup(gras_trp_plugin_t plug) { THROW0(mismatch_error,0,NULL); } +void gras_trp_iov_setup(gras_trp_plugin_t plug) { THROW0(mismatch_error,0,NULL); } + +gras_socket_t gras_trp_buf_init_sock(gras_socket_t sock) { return sock;} diff --git a/src/gras/Transport/transport.c b/src/gras/Transport/transport.c index d88a401c1e..4b2e1befa1 100644 --- a/src/gras/Transport/transport.c +++ b/src/gras/Transport/transport.c @@ -81,7 +81,6 @@ void gras_trp_init(void){ #endif /* Add plugins */ - gras_trp_plugin_new("tcp", gras_trp_tcp_setup); gras_trp_plugin_new("file",gras_trp_file_setup); gras_trp_plugin_new("sg",gras_trp_sg_setup); @@ -315,21 +314,15 @@ void gras_socket_close(gras_socket_t sock) { } /** - * gras_trp_chunk_send: + * gras_trp_send: * * Send a bunch of bytes from on socket + * (stable if we know the storage will keep as is until the next trp_flush) */ void -gras_trp_chunk_send(gras_socket_t sd, - char *data, - long int size) { - xbt_assert1(sd->outgoing, - "Socket not suited for data send (outgoing=%c)", - sd->outgoing?'y':'n'); - xbt_assert1(sd->plugin->chunk_send, - "No function chunk_send on transport plugin %s", - sd->plugin->name); - (*sd->plugin->chunk_send)(sd,data,size); +gras_trp_send(gras_socket_t sd, char *data, long int size, int stable) { + xbt_assert0(sd->outgoing,"Socket not suited for data send"); + (*sd->plugin->send)(sd,data,size,stable); } /** * gras_trp_chunk_recv: @@ -337,15 +330,9 @@ gras_trp_chunk_send(gras_socket_t sd, * Receive a bunch of bytes from a socket */ void -gras_trp_chunk_recv(gras_socket_t sd, - char *data, - long int size) { - xbt_assert0(sd->incoming, - "Socket not suited for data receive"); - xbt_assert1(sd->plugin->chunk_recv, - "No function chunk_recv on transport plugin %s", - sd->plugin->name); - (sd->plugin->chunk_recv)(sd,data,size,size); +gras_trp_recv(gras_socket_t sd, char *data, long int size) { + xbt_assert0(sd->incoming,"Socket not suited for data receive"); + (sd->plugin->recv)(sd,data,size); } /** @@ -355,7 +342,8 @@ gras_trp_chunk_recv(gras_socket_t sd, */ void gras_trp_flush(gras_socket_t sd) { - (sd->plugin->flush)(sd); + if (sd->plugin->flush) + (sd->plugin->flush)(sd); } gras_trp_plugin_t @@ -402,12 +390,13 @@ void gras_socket_meas_send(gras_socket_t peer, XBT_IN; xbt_assert0(peer->meas,"Asked to send measurement data on a regular socket"); + xbt_assert0(peer->outgoing,"Socket not suited for data send"); for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) { CDEBUG5(trp_meas,"Sent %lu of %lu (msg_size=%ld) to %s:%d", exp_sofar,exp_size,msg_size, gras_socket_peer_name(peer), gras_socket_peer_port(peer)); - gras_trp_chunk_send(peer,chunk,msg_size); + (*peer->plugin->raw_send)(peer,chunk,msg_size); } CDEBUG5(trp_meas,"Sent %lu of %lu (msg_size=%ld) to %s:%d", exp_sofar,exp_size,msg_size, @@ -433,13 +422,15 @@ void gras_socket_meas_recv(gras_socket_t peer, XBT_IN; - xbt_assert0(peer->meas,"Asked to receive measurement data on a regular socket\n"); + xbt_assert0(peer->meas, + "Asked to receive measurement data on a regular socket"); + xbt_assert0(peer->incoming,"Socket not suited for data receive"); for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) { CDEBUG5(trp_meas,"Recvd %ld of %lu (msg_size=%ld) from %s:%d", exp_sofar,exp_size,msg_size, gras_socket_peer_name(peer), gras_socket_peer_port(peer)); - gras_trp_chunk_recv(peer,chunk,msg_size); + (peer->plugin->raw_recv)(peer,chunk,msg_size); } CDEBUG5(trp_meas,"Recvd %ld of %lu (msg_size=%ld) from %s:%d", exp_sofar,exp_size,msg_size, diff --git a/src/gras/Transport/transport_interface.h b/src/gras/Transport/transport_interface.h index 249db65393..5766ffda9f 100644 --- a/src/gras/Transport/transport_interface.h +++ b/src/gras/Transport/transport_interface.h @@ -15,12 +15,9 @@ /*** *** Main user functions ***/ -void gras_trp_chunk_send(gras_socket_t sd, - char *data, - long int size); -void gras_trp_chunk_recv(gras_socket_t sd, - char *data, - long int size); +/* stable if we know the storage will keep as is until the next trp_flush */ +void gras_trp_send(gras_socket_t sd, char *data, long int size, int stable); +void gras_trp_recv(gras_socket_t sd, char *data, long int size); void gras_trp_flush(gras_socket_t sd); /* Find which socket needs to be read next */ @@ -53,13 +50,21 @@ struct gras_trp_plugin_ { but should not free the socket itself (beside the specific part) */ void (*socket_close)(gras_socket_t sd); - void (*chunk_send)(gras_socket_t sd, - const char *data, - unsigned long int size); - void (*chunk_recv)(gras_socket_t sd, - char *data, - unsigned long int size, - unsigned long int bufsize); + /* send/recv may be buffered */ + void (*send)(gras_socket_t sd, + const char *data, + unsigned long int size, + int stable /* storage will survive until flush*/); + int (*recv)(gras_socket_t sd, + char *data, + unsigned long int size); + /* raw_send/raw_recv is never buffered (use it for measurement stuff) */ + void (*raw_send)(gras_socket_t sd, + const char *data, + unsigned long int size); + int (*raw_recv)(gras_socket_t sd, + char *data, + unsigned long int size); /* flush has to make sure that the pending communications are achieved */ void (*flush)(gras_socket_t sd); diff --git a/src/gras/Transport/transport_plugin_buf.c b/src/gras/Transport/transport_plugin_buf.c deleted file mode 100644 index aca9d0a812..0000000000 --- a/src/gras/Transport/transport_plugin_buf.c +++ /dev/null @@ -1,307 +0,0 @@ -/* $Id$ */ - -/* buf trp (transport) - buffered transport using the TCP one */ - -/* Copyright (c) 2004 Martin Quinson. All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -#include -#include /* memset */ - -#include "portable.h" -#include "xbt/misc.h" -#include "xbt/sysdep.h" -#include "xbt/ex.h" -#include "transport_private.h" - -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_buf,transport, - "Generic buffered transport (works on top of TCP or Files, but not SG)"); - - -static gras_trp_plugin_t _buf_super; - -/*** - *** Prototypes - ***/ -void hexa_print(const char*name, unsigned char *data, int size); /* in gras.c */ - -void gras_trp_buf_socket_client(gras_trp_plugin_t self, - gras_socket_t sock); -void gras_trp_buf_socket_server(gras_trp_plugin_t self, - gras_socket_t sock); -gras_socket_t gras_trp_buf_socket_accept(gras_socket_t sock); - -void gras_trp_buf_socket_close(gras_socket_t sd); - -void gras_trp_buf_chunk_send(gras_socket_t sd, - const char *data, - unsigned long int size); - -void gras_trp_buf_chunk_recv(gras_socket_t sd, - char *data, - unsigned long int size, - unsigned long int bufsize); -void gras_trp_buf_flush(gras_socket_t sock); - - -/*** - *** Specific plugin part - ***/ - -typedef struct { - int junk; -} gras_trp_buf_plug_data_t; - -/*** - *** Specific socket part - ***/ - -typedef struct { - int size; - char *data; - int pos; /* for receive; not exchanged over the net */ -} gras_trp_buf_t; - -struct gras_trp_bufdata_{ - gras_trp_buf_t in; - gras_trp_buf_t out; - int buffsize; -}; - -void gras_trp_buf_init_sock(gras_socket_t sock) { - gras_trp_bufdata_t *data=xbt_new(gras_trp_bufdata_t,1); - - XBT_IN; - data->buffsize = 100 * 1024 ; /* 100k */ - - data->in.size = 0; - data->in.data = xbt_malloc(data->buffsize); - data->in.pos = 0; /* useless, indeed, since size==pos */ - - /* In SG, the 4 first bytes are for the chunk size as htonl'ed, so that we can send it in one shoot. - * This is mandatory in SG because all emissions go to the same channel, so if we split them, - * they can get mixed. */ - data->out.size = 0; - data->out.data = xbt_malloc(data->buffsize); - data->out.pos = data->out.size; - - sock->bufdata = data; -} - -/*** - *** Code - ***/ -void -gras_trp_buf_setup(gras_trp_plugin_t plug) { - gras_trp_buf_plug_data_t *data =xbt_new(gras_trp_buf_plug_data_t,1); - - XBT_IN; - _buf_super = gras_trp_plugin_get_by_name("tcp"); - - DEBUG1("Derivate a buffer plugin from %s","tcp"); - - plug->socket_client = gras_trp_buf_socket_client; - plug->socket_server = gras_trp_buf_socket_server; - plug->socket_accept = gras_trp_buf_socket_accept; - plug->socket_close = gras_trp_buf_socket_close; - - plug->chunk_send = gras_trp_buf_chunk_send; - plug->chunk_recv = gras_trp_buf_chunk_recv; - - plug->flush = gras_trp_buf_flush; - - plug->data = (void*)data; - plug->exit = NULL; -} - -void gras_trp_buf_socket_client(gras_trp_plugin_t self, - /* OUT */ gras_socket_t sock){ - - XBT_IN; - _buf_super->socket_client(_buf_super,sock); - sock->plugin = self; - gras_trp_buf_init_sock(sock); -} - -/** - * gras_trp_buf_socket_server: - * - * Open a socket used to receive messages. - */ -void gras_trp_buf_socket_server(gras_trp_plugin_t self, - /* OUT */ gras_socket_t sock){ - - XBT_IN; - _buf_super->socket_server(_buf_super,sock); - sock->plugin = self; - gras_trp_buf_init_sock(sock); -} - -gras_socket_t -gras_trp_buf_socket_accept(gras_socket_t sock) { - - gras_socket_t res; - - XBT_IN; - res = _buf_super->socket_accept(sock); - res->plugin = sock->plugin; - gras_trp_buf_init_sock(res); - XBT_OUT; - return res; -} - -void gras_trp_buf_socket_close(gras_socket_t sock){ - gras_trp_bufdata_t *data=sock->bufdata; - - XBT_IN; - if (data->in.size!=data->in.pos) { - WARN3("Socket closed, but %d bytes were unread (size=%d,pos=%d)", - data->in.size - data->in.pos,data->in.size, data->in.pos); - } - - if (data->out.size!=data->out.pos) { - DEBUG2("Flush the socket before closing (in=%d,out=%d)",data->in.size, data->out.size); - gras_trp_buf_flush(sock); - } - - if (data->in.data) - free(data->in.data); - if (data->out.data) - free(data->out.data); - free(data); - - _buf_super->socket_close(sock); -} - -/** - * gras_trp_buf_chunk_send: - * - * Send data on a buffered socket - */ -void -gras_trp_buf_chunk_send(gras_socket_t sock, - const char *chunk, - unsigned long int size) { - - gras_trp_bufdata_t *data=(gras_trp_bufdata_t*)sock->bufdata; - int chunk_pos=0; - - XBT_IN; - /* Let underneath plugin check for direction, we work even in duplex */ - xbt_assert0(size >= 0, "Cannot send a negative amount of data"); - - while (chunk_pos < size) { - /* size of the chunck to receive in that shot */ - long int thissize = min(size-chunk_pos,data->buffsize - data->out.size); - DEBUG5("Set the chars %d..%ld into the buffer (size=%ld, ctn='%.*s')", - (int)data->out.size, - ((int)data->out.size) + thissize -1, - size, chunk_pos, chunk); - - memcpy(data->out.data + data->out.size, chunk + chunk_pos, thissize); - - data->out.size += thissize; - chunk_pos += thissize; - DEBUG5("New pos = %d; Still to send = %ld of %ld; ctn sofar='%.*s'", - data->out.size,size-chunk_pos,size,(int)chunk_pos,chunk); - - if (data->out.size == data->buffsize) /* out of space. Flush it */ - gras_trp_buf_flush(sock); - } - - XBT_OUT; -} - -/** - * gras_trp_buf_chunk_recv: - * - * Receive data on a buffered socket. - */ -void -gras_trp_buf_chunk_recv(gras_socket_t sock, - char *chunk, - unsigned long int size, - unsigned long int bufsize) { - - xbt_ex_t e; - gras_trp_bufdata_t *data=sock->bufdata; - long int chunck_pos = 0; - - /* Let underneath plugin check for direction, we work even in duplex */ - xbt_assert0(sock, "Cannot recv on an NULL socket"); - xbt_assert0(size >= 0, "Cannot receive a negative amount of data"); - - XBT_IN; - - while (chunck_pos < size) { - /* size of the chunck to receive in that shot */ - long int thissize; - - if (data->in.size == data->in.pos) { /* out of data. Get more */ - int nextsize; - DEBUG0("Recv the size"); - TRY { - _buf_super->chunk_recv(sock,(char*)&nextsize, 4,4); - } CATCH(e) { - RETHROW3("Unable to get the chunk size on %p (peer = %s:%d): %s", - sock,gras_socket_peer_name(sock),gras_socket_peer_port(sock)); - } - data->in.size = (int)ntohl(nextsize); - VERB1("Recv the chunk (size=%d)",data->in.size); - - _buf_super->chunk_recv(sock, data->in.data, data->in.size, data->in.size); - - data->in.pos=0; - } - - thissize = min(size-chunck_pos , data->in.size - data->in.pos); - DEBUG2("Get the chars %d..%ld out of the buffer", - data->in.pos, - data->in.pos + thissize - 1); - memcpy(chunk+chunck_pos, data->in.data + data->in.pos, thissize); - - data->in.pos += thissize; - chunck_pos += thissize; - DEBUG5("New pos = %d; Still to receive = %ld of %ld. Ctn so far='%.*s'", - data->in.pos,size - chunck_pos,size,(int)chunck_pos,chunk); - } - - XBT_OUT; -} - -/** - * gras_trp_buf_flush: - * - * Make sure the data is sent - */ -void -gras_trp_buf_flush(gras_socket_t sock) { - int size; - gras_trp_bufdata_t *data=sock->bufdata; - XBT_IN; - - DEBUG0("Flush"); - if (XBT_LOG_ISENABLED(trp_buf,xbt_log_priority_debug)) - hexa_print("chunck to send ",(unsigned char *) data->out.data,data->out.size); - if ((data->out.size - data->out.pos) == 0) { - DEBUG2("Nothing to flush (size=%d; pos=%d)",data->out.size,data->out.pos); - return; - } - - size = (int)data->out.size - data->out.pos; - DEBUG3("Send the size (=%d) to %s:%d",data->out.size-data->out.pos, - gras_socket_peer_name(sock),gras_socket_peer_port(sock)); - size = (int)htonl(size); - _buf_super->chunk_send(sock,(char*) &size, 4); - - - DEBUG3("Send the chunk (size=%d) to %s:%d",data->out.size, - gras_socket_peer_name(sock),gras_socket_peer_port(sock)); - _buf_super->chunk_send(sock, data->out.data, data->out.size); - VERB1("Chunk sent (size=%d)",data->out.size); - if (XBT_LOG_ISENABLED(trp_buf,xbt_log_priority_debug)) - hexa_print("chunck sent ",(unsigned char *) data->out.data,data->out.size); - data->out.size = 0; -} diff --git a/src/gras/Transport/transport_plugin_file.c b/src/gras/Transport/transport_plugin_file.c index 7e7eae0aa5..0d6f7ab0da 100644 --- a/src/gras/Transport/transport_plugin_file.c +++ b/src/gras/Transport/transport_plugin_file.c @@ -19,15 +19,17 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_file,transport, ***/ void gras_trp_file_close(gras_socket_t sd); +void gras_trp_file_chunk_send_raw(gras_socket_t sd, + const char *data, + unsigned long int size); void gras_trp_file_chunk_send(gras_socket_t sd, const char *data, - unsigned long int size); - -void gras_trp_file_chunk_recv(gras_socket_t sd, - char *data, unsigned long int size, - unsigned long int bufsize); + int stable_ignored); +int gras_trp_file_chunk_recv(gras_socket_t sd, + char *data, + unsigned long int size); /*** *** Specific plugin part @@ -54,8 +56,12 @@ gras_trp_file_setup(gras_trp_plugin_t plug) { FD_ZERO(&(file->incoming_socks)); plug->socket_close = gras_trp_file_close; - plug->chunk_send = gras_trp_file_chunk_send; - plug->chunk_recv = gras_trp_file_chunk_recv; + + plug->raw_send = gras_trp_file_chunk_send_raw; + plug->send = gras_trp_file_chunk_send; + + plug->raw_recv = plug->recv = gras_trp_file_chunk_recv; + plug->data = (void*)file; } @@ -170,7 +176,14 @@ void gras_trp_file_close(gras_socket_t sock){ void gras_trp_file_chunk_send(gras_socket_t sock, const char *data, - unsigned long int size) { + unsigned long int size, + int stable_ignored) { + gras_trp_file_chunk_send_raw(sock,data,size); +} +void +gras_trp_file_chunk_send_raw(gras_socket_t sock, + const char *data, + unsigned long int size) { xbt_assert0(sock->outgoing, "Cannot write on client file socket"); xbt_assert0(size >= 0, "Cannot send a negative amount of data"); @@ -200,36 +213,36 @@ gras_trp_file_chunk_send(gras_socket_t sock, * * Receive data on a file pseudo-socket. */ -void +int gras_trp_file_chunk_recv(gras_socket_t sock, char *data, - unsigned long int size, - unsigned long int bufsize) { + unsigned long int size) { + + int got = 0; xbt_assert0(sock, "Cannot recv on an NULL socket"); xbt_assert0(sock->incoming, "Cannot recv on client file socket"); xbt_assert0(size >= 0, "Cannot receive a negative amount of data"); - xbt_assert0(bufsize>=size,"Not enough buffer size to receive that much data"); while (size) { int status = 0; - status = read(sock->sd, data, (long int)bufsize); - DEBUG3("read(%d, %p, %ld);", sock->sd, data, size); + status = read(sock->sd, data+got, (long int)size); + DEBUG3("read(%d, %p, %ld);", sock->sd, data+got, size); if (status == -1) { THROW4(system_error,0,"read(%d,%p,%d) failed: %s", - sock->sd, data, (int)size, + sock->sd, data+got, (int)size, strerror(errno)); } if (status) { size -= status; - bufsize -= status; - data += status; + got += status; } else { THROW0(system_error,0,"file descriptor closed"); } } + return got; } diff --git a/src/gras/Transport/transport_plugin_sg.c b/src/gras/Transport/transport_plugin_sg.c index 8c81ca2ad3..092c9b75d3 100644 --- a/src/gras/Transport/transport_plugin_sg.c +++ b/src/gras/Transport/transport_plugin_sg.c @@ -24,7 +24,6 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_sg,transport,"SimGrid pseudo-transport"); /*** *** Prototypes ***/ -void hexa_print(unsigned char *data, int size); /* in gras.c */ /* retrieve the port record associated to a numerical port on an host */ static void find_port(gras_hostdata_t *hd, int port, gras_sg_portrec_t *hpd); @@ -36,14 +35,17 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self, /* OUT */ gras_socket_t sock); void gras_trp_sg_socket_close(gras_socket_t sd); +void gras_trp_sg_chunk_send_raw(gras_socket_t sd, + const char *data, + unsigned long int size); void gras_trp_sg_chunk_send(gras_socket_t sd, const char *data, - unsigned long int size); + unsigned long int size, + int stable_ignored); -void gras_trp_sg_chunk_recv(gras_socket_t sd, +int gras_trp_sg_chunk_recv(gras_socket_t sd, char *data, - unsigned long int size, - unsigned long int bufsize); + unsigned long int size); /*** *** Specific plugin part @@ -84,8 +86,9 @@ gras_trp_sg_setup(gras_trp_plugin_t plug) { plug->socket_server = gras_trp_sg_socket_server; plug->socket_close = gras_trp_sg_socket_close; - plug->chunk_send = gras_trp_sg_chunk_send; - plug->chunk_recv = gras_trp_sg_chunk_recv; + plug->raw_send = gras_trp_sg_chunk_send_raw; + plug->send = gras_trp_sg_chunk_send; + plug->raw_recv = plug->recv = gras_trp_sg_chunk_recv; plug->flush = NULL; /* nothing cached */ } @@ -236,7 +239,14 @@ typedef struct { void gras_trp_sg_chunk_send(gras_socket_t sock, const char *data, - unsigned long int size) { + unsigned long int size, + int stable_ignored) { + gras_trp_sg_chunk_send_raw(sock,data,size); +} + +void gras_trp_sg_chunk_send_raw(gras_socket_t sock, + const char *data, + unsigned long int size) { m_task_t task=NULL; static unsigned int count=0; char name[256]; @@ -262,10 +272,9 @@ void gras_trp_sg_chunk_send(gras_socket_t sock, } } -void gras_trp_sg_chunk_recv(gras_socket_t sock, +int gras_trp_sg_chunk_recv(gras_socket_t sock, char *data, - unsigned long int size, - unsigned long int bufsize){ + unsigned long int size){ gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_get("gras_trp"); m_task_t task=NULL; @@ -282,23 +291,13 @@ void gras_trp_sg_chunk_recv(gras_socket_t sock, DEBUG1("Got chuck %s",MSG_task_get_name(task)); task_data = MSG_task_get_data(task); - if (size != -1) { - if (task_data->size != size) - THROW5(mismatch_error,0, - "Got %d bytes when %ld where expected (in %s->%s:%d)", - task_data->size, size, - MSG_host_get_name(sock_data->to_host), - MSG_host_get_name(MSG_host_self()), sock_data->to_chan); - memcpy(data,task_data->data,size); - } else { - /* damn, the size is embeeded at the begining of the chunk */ - int netsize; - - memcpy((char*)&netsize,task_data->data,4); - DEBUG1("netsize embeeded = %d",netsize); - - memcpy(data,task_data->data,netsize+4); - } + if (task_data->size != size) + THROW5(mismatch_error,0, + "Got %d bytes when %ld where expected (in %s->%s:%d)", + task_data->size, size, + MSG_host_get_name(sock_data->to_host), + MSG_host_get_name(MSG_host_self()), sock_data->to_chan); + memcpy(data,task_data->data,size); free(task_data->data); free(task_data); @@ -306,5 +305,6 @@ void gras_trp_sg_chunk_recv(gras_socket_t sock, THROW0(system_error,0,"Error in MSG_task_destroy()"); XBT_OUT; + return size; } diff --git a/src/gras/Transport/transport_plugin_tcp.c b/src/gras/Transport/transport_plugin_tcp.c index 21cdf355db..256820f776 100644 --- a/src/gras/Transport/transport_plugin_tcp.c +++ b/src/gras/Transport/transport_plugin_tcp.c @@ -1,94 +1,63 @@ /* $Id$ */ -/* tcp trp (transport) - send/receive a bunch of bytes from a tcp socket */ +/* buf trp (transport) - buffered transport using the TCP one */ /* Copyright (c) 2004 Martin Quinson. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ +#include +#include /* memset */ + #include "portable.h" +#include "xbt/misc.h" +#include "xbt/sysdep.h" #include "xbt/ex.h" -#if 0 -# include /* close() pipe() read() write() */ -# include /* waitpid() */ -#endif - - #include "transport_private.h" -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport,"TCP transport"); - -/*** - *** Prototypes - ***/ -void gras_trp_tcp_socket_client(gras_trp_plugin_t self, gras_socket_t sock); -void gras_trp_tcp_socket_server(gras_trp_plugin_t self, gras_socket_t sock); -gras_socket_t gras_trp_tcp_socket_accept(gras_socket_t sock); - -void gras_trp_tcp_socket_close(gras_socket_t sd); - -void gras_trp_tcp_chunk_send(gras_socket_t sd, - const char *data, - unsigned long int size); - -void gras_trp_tcp_chunk_recv(gras_socket_t sd, - char *data, - unsigned long int size, - unsigned long int bufsize); - -void gras_trp_tcp_exit(gras_trp_plugin_t plug); - - -static int TcpProtoNumber(void); -/*** - *** Specific plugin part - ***/ +#ifndef MIN +#define MIN(a,b) ((a)<(b)?(a):(b)) +#endif -typedef struct { - fd_set msg_socks; - fd_set meas_socks; -} gras_trp_tcp_plug_data_t; +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport, + "TCP buffered transport"); /*** *** Specific socket part ***/ -typedef struct { - int buffsize; -} gras_trp_tcp_sock_data_t; - +typedef enum { buffering_buf, buffering_iov } buffering_kind; -/*** - *** Code - ***/ -void gras_trp_tcp_setup(gras_trp_plugin_t plug) { - - gras_trp_tcp_plug_data_t *data = xbt_new(gras_trp_tcp_plug_data_t,1); +typedef struct { + int size; + char *data; + int pos; /* for receive; not exchanged over the net */ +} gras_trp_buf_t; - FD_ZERO(&(data->msg_socks)); - FD_ZERO(&(data->meas_socks)); - plug->socket_client = gras_trp_tcp_socket_client; - plug->socket_server = gras_trp_tcp_socket_server; - plug->socket_accept = gras_trp_tcp_socket_accept; - plug->socket_close = gras_trp_tcp_socket_close; +struct gras_trp_bufdata_{ + int buffsize; + gras_trp_buf_t in_buf; + gras_trp_buf_t out_buf; - plug->chunk_send = gras_trp_tcp_chunk_send; - plug->chunk_recv = gras_trp_tcp_chunk_recv; +#ifdef HAVE_READV + xbt_dynar_t in_buf_v; + xbt_dynar_t out_buf_v; +#endif - plug->flush = NULL; /* nothing's cached */ + buffering_kind in; + buffering_kind out; +}; - plug->data = (void*)data; - plug->exit = gras_trp_tcp_exit; -} -void gras_trp_tcp_exit(gras_trp_plugin_t plug) { - DEBUG1("Exit plugin TCP (free %p)", plug->data); - free(plug->data); -} +/*****************************/ +/****[ SOCKET MANAGEMENT ]****/ +/*****************************/ +static int _gras_tcp_proto_number(void); -void gras_trp_tcp_socket_client(gras_trp_plugin_t self, gras_socket_t sock){ +static inline void gras_trp_sock_socket_client(gras_trp_plugin_t ignored, + gras_socket_t sock){ struct sockaddr_in addr; struct hostent *he; @@ -127,21 +96,21 @@ void gras_trp_tcp_socket_client(gras_trp_plugin_t self, gras_socket_t sock){ "Failed to connect socket to %s:%d (%s)", sock->peer_name, sock->peer_port, sock_errstr); } - VERB4("Connect to %s:%d (sd=%d, port %d here)",sock->peer_name, sock->peer_port, sock->sd, sock->port); + VERB4("Connect to %s:%d (sd=%d, port %d here)", + sock->peer_name, sock->peer_port, sock->sd, sock->port); } /** - * gras_trp_tcp_socket_server: + * gras_trp_sock_socket_server: * * Open a socket used to receive messages. */ -void gras_trp_tcp_socket_server(gras_trp_plugin_t self, gras_socket_t sock){ +static inline void gras_trp_sock_socket_server(gras_trp_plugin_t ignored, + gras_socket_t sock){ int size = sock->bufSize * 1024; int on = 1; struct sockaddr_in server; - gras_trp_tcp_plug_data_t *tcp=(gras_trp_tcp_plug_data_t*)self->data; - sock->outgoing = 1; /* TCP => duplex mode */ server.sin_port = htons((u_short)sock->port); @@ -171,15 +140,10 @@ void gras_trp_tcp_socket_server(gras_trp_plugin_t self, gras_socket_t sock){ THROW2(system_error,0,"Cannot listen on port %d: %s",sock->port,sock_errstr); } - if (sock->meas) - FD_SET(sock->sd, &(tcp->meas_socks)); - else - FD_SET(sock->sd, &(tcp->msg_socks)); - VERB2("Openned a server socket on port %d (sd=%d)",sock->port,sock->sd); } -gras_socket_t gras_trp_tcp_socket_accept(gras_socket_t sock) { +static gras_socket_t gras_trp_sock_socket_accept(gras_socket_t sock) { gras_socket_t res; struct sockaddr_in peer_in; @@ -205,7 +169,7 @@ gras_socket_t gras_trp_tcp_socket_accept(gras_socket_t sock) { } if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) - || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) + || setsockopt(sd, _gras_tcp_proto_number(), TCP_NODELAY, (char *)&i, s)) THROW1(system_error,0,"setsockopt failed, cannot condition the socket: %s", sock_errstr); @@ -246,11 +210,9 @@ gras_socket_t gras_trp_tcp_socket_accept(gras_socket_t sock) { return res; } -void gras_trp_tcp_socket_close(gras_socket_t sock){ - gras_trp_tcp_plug_data_t *tcp; +static void gras_trp_sock_socket_close(gras_socket_t sock){ if (!sock) return; /* close only once */ - tcp=sock->plugin->data; VERB1("close tcp connection %d", sock->sd); @@ -268,16 +230,6 @@ void gras_trp_tcp_socket_close(gras_socket_t sock){ } } */ -#ifndef HAVE_WINSOCK_H - /* forget about the socket - ... but not when using winsock since accept'ed socket can not fit - into the fd_set*/ - if (sock->meas){ - FD_CLR(sock->sd, &(tcp->meas_socks)); - } else { - FD_CLR(sock->sd, &(tcp->msg_socks)); - } -#endif /* close the socket */ if(tcp_close(sock->sd) < 0) { @@ -286,20 +238,20 @@ void gras_trp_tcp_socket_close(gras_socket_t sock){ } } - -/** - * gras_trp_tcp_chunk_send: - * - * Send data on a TCP socket - */ -void -gras_trp_tcp_chunk_send(gras_socket_t sock, - const char *data, - unsigned long int size) { +/************************************/ +/****[ end of SOCKET MANAGEMENT ]****/ +/************************************/ + + +/************************************/ +/****[ UNBUFFERED DATA EXCHANGE ]****/ +/************************************/ +/* Temptation to merge this with file data exchange is great, + but doesn't work on BillWare (see tcp_write() in portable.h) */ +static inline void gras_trp_tcp_send(gras_socket_t sock, + const char *data, + unsigned long int size) { - /* TCP sockets are in duplex mode, don't check direction */ - xbt_assert0(size >= 0, "Cannot send a negative amount of data"); - while (size) { int status = 0; @@ -321,52 +273,379 @@ gras_trp_tcp_chunk_send(gras_socket_t sock, } } } -/** - * gras_trp_tcp_chunk_recv: - * - * Receive data on a TCP socket. - */ -void -gras_trp_tcp_chunk_recv(gras_socket_t sock, - char *data, - unsigned long int size, - unsigned long int bufsize) { - - /* TCP sockets are in duplex mode, don't check direction */ - xbt_assert0(sock, "Cannot recv on an NULL socket"); - xbt_assert0(size >= 0, "Cannot receive a negative amount of data"); - xbt_assert0(bufsize>=size,"Not enough buffer size to receive that much data"); - - while (size) { +static inline int +gras_trp_tcp_recv_withbuffer(gras_socket_t sock, + char *data, + unsigned long int size, + unsigned long int bufsize) { + + int got = 0; + + while (size>got) { int status = 0; - DEBUG3("read(%d, %p, %ld);", sock->sd, data, size); - status = tcp_read(sock->sd, data, (size_t)bufsize); + DEBUG5("read(%d, %p, %ld) got %d so far (%s)", + sock->sd, data+got, bufsize, got, + hexa_str(data,got)); + status = tcp_read(sock->sd, data+got, (size_t)bufsize); if (status < 0) { THROW4(system_error,0,"read(%d,%p,%d) failed: %s", - sock->sd, data, (int)size, + sock->sd, data+got, (int)size, sock_errstr); } + DEBUG2("Got %d more bytes (%s)",status,hexa_str(data+got,status)); if (status) { - size -= status; bufsize -= status; - data += status; + got += status; } else { - gras_socket_close(sock); THROW0(system_error,0,"Socket closed by remote side"); } } + return got; +} + +static int gras_trp_tcp_recv(gras_socket_t sock, + char *data, + unsigned long int size) { + return gras_trp_tcp_recv_withbuffer(sock,data,size,size); + } +/*******************************************/ +/****[ end of UNBUFFERED DATA EXCHANGE ]****/ +/*******************************************/ + +/**********************************/ +/****[ BUFFERED DATA EXCHANGE ]****/ +/**********************************/ + +/* Make sure the data is sent */ +static void +gras_trp_bufiov_flush(gras_socket_t sock) { +#ifdef HAVE_READV + xbt_dynar_t vect; + int size; +#endif + gras_trp_bufdata_t *data=sock->bufdata; + XBT_IN; + + DEBUG0("Flush"); + if (data->out == buffering_buf) { + if (XBT_LOG_ISENABLED(trp_tcp,xbt_log_priority_debug)) + hexa_print("chunk to send ", + (unsigned char *) data->out_buf.data,data->out_buf.size); + if ((data->out_buf.size - data->out_buf.pos) != 0) { + DEBUG3("Send the chunk (size=%d) to %s:%d",data->out_buf.size, + gras_socket_peer_name(sock),gras_socket_peer_port(sock)); + gras_trp_tcp_send(sock, data->out_buf.data, data->out_buf.size); + VERB1("Chunk sent (size=%d)",data->out_buf.size); + data->out_buf.size = 0; + } + } + +#ifdef HAVE_READV + if (data->out == buffering_iov) { + vect = sock->bufdata->out_buf_v; + if ((size = xbt_dynar_length(vect))) { + DEBUG1("Flush %d chunks out of this socket",size); + writev(sock->sd,xbt_dynar_get_ptr(vect,0),size); + xbt_dynar_reset(vect); + } + data->out_buf.size = 0; /* reset the buffer containing non-stable data */ + } + + if (data->in == buffering_iov) { + vect = sock->bufdata->in_buf_v; + if ((size = xbt_dynar_length(vect))) { + DEBUG1("Get %d chunks from of this socket",size); + readv(sock->sd,xbt_dynar_get_ptr(vect,0),size); + xbt_dynar_reset(vect); + } + } +#endif +} +static void +gras_trp_buf_send(gras_socket_t sock, + const char *chunk, + unsigned long int size, + int stable_ignored) { + + gras_trp_bufdata_t *data=(gras_trp_bufdata_t*)sock->bufdata; + int chunk_pos=0; + + XBT_IN; + + while (chunk_pos < size) { + /* size of the chunk to receive in that shot */ + long int thissize = min(size-chunk_pos,data->buffsize-data->out_buf.size); + DEBUG4("Set the chars %d..%ld into the buffer; size=%ld, ctn=(%s)", + (int)data->out_buf.size, + ((int)data->out_buf.size) + thissize -1, + size, + hexa_str((char*)chunk,thissize)); + + memcpy(data->out_buf.data + data->out_buf.size, chunk + chunk_pos, thissize); + + data->out_buf.size += thissize; + chunk_pos += thissize; + DEBUG4("New pos = %d; Still to send = %ld of %ld; ctn sofar=(%s)", + data->out_buf.size,size-chunk_pos,size,hexa_str((char*)chunk,chunk_pos)); + + if (data->out_buf.size == data->buffsize) /* out of space. Flush it */ + gras_trp_bufiov_flush(sock); + } + + XBT_OUT; +} + +static int +gras_trp_buf_recv(gras_socket_t sock, + char *chunk, + unsigned long int size) { + + gras_trp_bufdata_t *data=sock->bufdata; + long int chunk_pos = 0; + + XBT_IN; + while (chunk_pos < size) { + /* size of the chunk to receive in that shot */ + long int thissize; + + if (data->in_buf.size == data->in_buf.pos) { /* out of data. Get more */ + + DEBUG2("Get more data (size=%d,bufsize=%d)", + (int)MIN(size-chunk_pos,data->buffsize), + (int)data->buffsize); + + + data->in_buf.size = + gras_trp_tcp_recv_withbuffer(sock, data->in_buf.data, + MIN(size-chunk_pos,data->buffsize), + data->buffsize); + + data->in_buf.pos=0; + } + + thissize = min(size-chunk_pos , data->in_buf.size - data->in_buf.pos); + memcpy(chunk+chunk_pos, data->in_buf.data + data->in_buf.pos, thissize); + + data->in_buf.pos += thissize; + chunk_pos += thissize; + DEBUG4("New pos = %d; Still to receive = %ld of %ld. Ctn so far=(%s)", + data->in_buf.pos,size - chunk_pos,size,hexa_str(chunk,chunk_pos)); + } + + XBT_OUT; + return chunk_pos; +} + +/*****************************************/ +/****[ end of BUFFERED DATA EXCHANGE ]****/ +/*****************************************/ + +/********************************/ +/****[ VECTOR DATA EXCHANGE ]****/ +/********************************/ +#ifdef HAVE_READV +static void +gras_trp_iov_send(gras_socket_t sock, + const char *chunk, + unsigned long int size, + int stable) { + struct iovec elm; + gras_trp_bufdata_t *data=(gras_trp_bufdata_t*)sock->bufdata; + + + DEBUG1("Buffer one chunk to be sent later (%s)", + hexa_str((char*)chunk,size)); + + elm.iov_len = (size_t)size; + + if (!stable) { + /* data storage won't last until flush. Save it in a buffer if we can */ + + if (size > data->buffsize-data->out_buf.size) { + /* buffer too small: + flush the socket, using data in its actual storage */ + elm.iov_base = (void*)chunk; + xbt_dynar_push(data->out_buf_v,&elm); + + gras_trp_bufiov_flush(sock); + return; + } else { + /* buffer big enough: + copy data into it, and chain it for upcoming writev */ + memcpy(data->out_buf.data + data->out_buf.size, chunk, size); + elm.iov_base = (void*)(data->out_buf.data + data->out_buf.size); + data->out_buf.size += size; + + xbt_dynar_push(data->out_buf_v,&elm); + } + + } else { + /* data storage stable. Chain it */ + + elm.iov_base = (void*)chunk; + xbt_dynar_push(data->out_buf_v,&elm); + } +} +static int +gras_trp_iov_recv(gras_socket_t sock, + char *chunk, + unsigned long int size) { + struct iovec elm; + + DEBUG0("Buffer one chunk to be received later"); + elm.iov_base = (void*)chunk; + elm.iov_len = (size_t)size; + xbt_dynar_push(sock->bufdata->in_buf_v,&elm); + + return size; +} + +#endif +/***************************************/ +/****[ end of VECTOR DATA EXCHANGE ]****/ +/***************************************/ + + +/*** + *** Prototypes of BUFFERED + ***/ + +void gras_trp_buf_socket_client(gras_trp_plugin_t self, + gras_socket_t sock); +void gras_trp_buf_socket_server(gras_trp_plugin_t self, + gras_socket_t sock); +gras_socket_t gras_trp_buf_socket_accept(gras_socket_t sock); + +void gras_trp_buf_socket_close(gras_socket_t sd); + + +gras_socket_t gras_trp_buf_init_sock(gras_socket_t sock) { + gras_trp_bufdata_t *data=xbt_new(gras_trp_bufdata_t,1); + + data->buffsize = 100 * 1024 ; /* 100k */ + + data->in_buf.size = 0; + data->in_buf.data = xbt_malloc(data->buffsize); + data->in_buf.pos = 0; /* useless, indeed, since size==pos */ + + data->out_buf.size = 0; + data->out_buf.data = xbt_malloc(data->buffsize); + data->out_buf.pos = data->out_buf.size; + + data->in_buf_v = data->out_buf_v = NULL; +#ifdef HAVE_READV + data->in_buf_v=xbt_dynar_new(sizeof(struct iovec),NULL); + data->out_buf_v=xbt_dynar_new(sizeof(struct iovec),NULL); +#endif + + data->in = buffering_buf; + data->out = buffering_iov; + /*data->out = buffering_buf;*/ + + sock->bufdata = data; + return sock; +} + +/*** + *** Code + ***/ +void +gras_trp_buf_setup(gras_trp_plugin_t plug) { + + plug->socket_client = gras_trp_buf_socket_client; + plug->socket_server = gras_trp_buf_socket_server; + plug->socket_accept = gras_trp_buf_socket_accept; + plug->socket_close = gras_trp_buf_socket_close; + + plug->send = gras_trp_iov_send; + /*plug->send = gras_trp_buf_send;*/ + plug->recv = gras_trp_buf_recv; + + plug->raw_send = gras_trp_tcp_send; + plug->raw_recv = gras_trp_tcp_recv; + + plug->flush = gras_trp_bufiov_flush; + + plug->data = NULL; + plug->exit = NULL; +} + +void gras_trp_buf_socket_client(gras_trp_plugin_t self, + /* OUT */ gras_socket_t sock){ + + gras_trp_sock_socket_client(NULL,sock); + gras_trp_buf_init_sock(sock); +} + +/** + * gras_trp_buf_socket_server: + * + * Open a socket used to receive messages. + */ +void gras_trp_buf_socket_server(gras_trp_plugin_t self, + /* OUT */ gras_socket_t sock){ + + gras_trp_sock_socket_server(NULL,sock); + gras_trp_buf_init_sock(sock); +} + +gras_socket_t gras_trp_buf_socket_accept(gras_socket_t sock) { + return gras_trp_buf_init_sock(gras_trp_sock_socket_accept(sock)); +} + +void gras_trp_buf_socket_close(gras_socket_t sock){ + gras_trp_bufdata_t *data=sock->bufdata; + + if (data->in_buf.size!=data->in_buf.pos) { + WARN3("Socket closed, but %d bytes were unread (size=%d,pos=%d)", + data->in_buf.size - data->in_buf.pos, + data->in_buf.size, data->in_buf.pos); + } + if (data->in_buf.data) + free(data->in_buf.data); + + if (data->out_buf.size!=data->out_buf.pos) { + DEBUG2("Flush the socket before closing (in=%d,out=%d)", + data->in_buf.size, data->out_buf.size); + gras_trp_bufiov_flush(sock); + } + if (data->out_buf.data) + free(data->out_buf.data); + +#ifdef HAVE_READV + if (data->in_buf_v) { + if (xbt_dynar_length(data->in_buf_v)) + WARN0("Socket closed, but some bytes were unread"); + xbt_dynar_free(&data->in_buf_v); + } + if (data->out_buf_v) { + if (xbt_dynar_length(data->out_buf_v)) { + DEBUG0("Flush the socket before closing"); + gras_trp_bufiov_flush(sock); + } + xbt_dynar_free(&data->out_buf_v); + } +#endif + + free(data); + gras_trp_sock_socket_close(sock); +} + +/****************************/ +/****[ HELPER FUNCTIONS ]****/ +/****************************/ /* * Returns the tcp protocol number from the network protocol data base. * * getprotobyname() is not thread safe. We need to lock it. */ -static int TcpProtoNumber(void) { +static int _gras_tcp_proto_number(void) { struct protoent *fetchedEntry; static int returnValue = 0; @@ -453,3 +732,7 @@ const char *gras_wsa_err2string( int err ) { return "unknown WSA error"; } #endif /* HAVE_WINSOCK_H */ + +/***********************************/ +/****[ end of HELPER FUNCTIONS ]****/ +/***********************************/ diff --git a/src/gras/Transport/transport_private.h b/src/gras/Transport/transport_private.h index 0a8d031c6c..484cac4651 100644 --- a/src/gras/Transport/transport_private.h +++ b/src/gras/Transport/transport_private.h @@ -60,6 +60,7 @@ void gras_trp_socket_new(int incomming, typedef void (*gras_trp_setup_t)(gras_trp_plugin_t dst); void gras_trp_tcp_setup(gras_trp_plugin_t plug); +void gras_trp_iov_setup(gras_trp_plugin_t plug); void gras_trp_file_setup(gras_trp_plugin_t plug); void gras_trp_sg_setup(gras_trp_plugin_t plug); void gras_trp_buf_setup(gras_trp_plugin_t plug); @@ -82,17 +83,8 @@ void gras_trp_buf_setup(gras_trp_plugin_t plug); */ -void gras_trp_buf_init_sock(gras_socket_t sock); +gras_socket_t gras_trp_buf_init_sock(gras_socket_t sock); - -/* Data exchange over measurement sockets */ /* FIXME: KILLME */ -/* -void gras_socket_meas_exchange(gras_socket_t peer, - int sender, - unsigned int timeout, - unsigned long int expSize, - unsigned long int msgSize); -*/ xbt_dynar_t gras_socketset_get(void); /* FIXME:KILLME */ #endif /* GRAS_TRP_PRIVATE_H */ -- 2.20.1