X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/9189fe94c14ef9e31142d1603a1979ea7e731a0a..a9af1b0795c4a75f8919d7a87491fe7ab041b272:/src/gras/Transport/transport_plugin_tcp.c diff --git a/src/gras/Transport/transport_plugin_tcp.c b/src/gras/Transport/transport_plugin_tcp.c index 51f44fa65f..b92dccc722 100644 --- a/src/gras/Transport/transport_plugin_tcp.c +++ b/src/gras/Transport/transport_plugin_tcp.c @@ -1,98 +1,84 @@ /* $Id$ */ -/* tcp trp (transport) - send/receive a bunch of bytes from a tcp socket */ +/* buf trp (transport) - buffered transport using the TCP one */ /* Copyright (c) 2004 Martin Quinson. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ +#include +#include /* memset */ + #include "portable.h" +#include "xbt/misc.h" +#include "xbt/sysdep.h" #include "xbt/ex.h" -#if 0 -# include /* close() pipe() read() write() */ -# include /* waitpid() */ -#endif - - #include "transport_private.h" -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport,"TCP transport"); +/* FIXME maybe READV is sometime a good thing? */ +#undef HAVE_READV -/*** - *** Prototypes - ***/ -void gras_trp_tcp_socket_client(gras_trp_plugin_t self, gras_socket_t sock); -void gras_trp_tcp_socket_server(gras_trp_plugin_t self, gras_socket_t sock); -gras_socket_t gras_trp_tcp_socket_accept(gras_socket_t sock); - -void gras_trp_tcp_socket_close(gras_socket_t sd); - -void gras_trp_tcp_chunk_send(gras_socket_t sd, - const char *data, - unsigned long int size); - -void gras_trp_tcp_chunk_recv(gras_socket_t sd, - char *data, - unsigned long int size); +#ifdef HAVE_READV +#include +#endif -void gras_trp_tcp_exit(gras_trp_plugin_t plug); - - -static int TcpProtoNumber(void); -/*** - *** Specific plugin part - ***/ +#ifndef MIN +#define MIN(a,b) ((a)<(b)?(a):(b)) +#endif -typedef struct { - fd_set msg_socks; - fd_set meas_socks; -} gras_trp_tcp_plug_data_t; +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_tcp,gras_trp, + "TCP buffered transport"); /*** *** Specific socket part ***/ +typedef enum { buffering_buf, buffering_iov } buffering_kind; + typedef struct { - int buffsize; -} gras_trp_tcp_sock_data_t; + int size; + char *data; + int pos; /* for receive; not exchanged over the net */ +} gras_trp_buf_t; -/*** - *** Code - ***/ -void gras_trp_tcp_setup(gras_trp_plugin_t plug) { - - gras_trp_tcp_plug_data_t *data = xbt_new(gras_trp_tcp_plug_data_t,1); +struct gras_trp_bufdata_{ + int buffsize; + gras_trp_buf_t in_buf; + gras_trp_buf_t out_buf; - FD_ZERO(&(data->msg_socks)); - FD_ZERO(&(data->meas_socks)); +#ifdef HAVE_READV + xbt_dynar_t in_buf_v; + xbt_dynar_t out_buf_v; +#endif - plug->socket_client = gras_trp_tcp_socket_client; - plug->socket_server = gras_trp_tcp_socket_server; - plug->socket_accept = gras_trp_tcp_socket_accept; - plug->socket_close = gras_trp_tcp_socket_close; + buffering_kind in; + buffering_kind out; +}; - plug->chunk_send = gras_trp_tcp_chunk_send; - plug->chunk_recv = gras_trp_tcp_chunk_recv; - plug->flush = NULL; /* nothing's cached */ +/*****************************/ +/****[ SOCKET MANAGEMENT ]****/ +/*****************************/ +/* we exchange port number on client side on socket creation, + so we need to be able to talk right now. */ +static inline void gras_trp_tcp_send(gras_socket_t sock, const char *data, + unsigned long int size); +static int gras_trp_tcp_recv(gras_socket_t sock, char *data, + unsigned long int size); - plug->data = (void*)data; - plug->exit = gras_trp_tcp_exit; -} -void gras_trp_tcp_exit(gras_trp_plugin_t plug) { - DEBUG1("Exit plugin TCP (free %p)", plug->data); - free(plug->data); -} +static int _gras_tcp_proto_number(void); -void gras_trp_tcp_socket_client(gras_trp_plugin_t self, gras_socket_t sock){ +static inline void gras_trp_sock_socket_client(gras_trp_plugin_t ignored, + gras_socket_t sock){ struct sockaddr_in addr; struct hostent *he; struct in_addr *haddr; - int size = sock->bufSize * 1024; + int size = sock->buf_size; + uint32_t myport = htonl(((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport); sock->incoming = 1; /* TCP sockets are duplex'ed */ @@ -126,21 +112,25 @@ void gras_trp_tcp_socket_client(gras_trp_plugin_t self, gras_socket_t sock){ "Failed to connect socket to %s:%d (%s)", sock->peer_name, sock->peer_port, sock_errstr); } - VERB4("Connect to %s:%d (sd=%d, port %d here)",sock->peer_name, sock->peer_port, sock->sd, sock->port); + + gras_trp_tcp_send(sock,(char*)&myport,sizeof(uint32_t)); + DEBUG1("peerport sent to %d", sock->peer_port); + + VERB4("Connect to %s:%d (sd=%d, port %d here)", + sock->peer_name, sock->peer_port, sock->sd, sock->port); } /** - * gras_trp_tcp_socket_server: + * gras_trp_sock_socket_server: * * Open a socket used to receive messages. */ -void gras_trp_tcp_socket_server(gras_trp_plugin_t self, gras_socket_t sock){ - int size = sock->bufSize * 1024; +static inline void gras_trp_sock_socket_server(gras_trp_plugin_t ignored, + gras_socket_t sock){ + int size = sock->buf_size; int on = 1; struct sockaddr_in server; - gras_trp_tcp_plug_data_t *tcp=(gras_trp_tcp_plug_data_t*)self->data; - sock->outgoing = 1; /* TCP => duplex mode */ server.sin_port = htons((u_short)sock->port); @@ -170,15 +160,10 @@ void gras_trp_tcp_socket_server(gras_trp_plugin_t self, gras_socket_t sock){ THROW2(system_error,0,"Cannot listen on port %d: %s",sock->port,sock_errstr); } - if (sock->meas) - FD_SET(sock->sd, &(tcp->meas_socks)); - else - FD_SET(sock->sd, &(tcp->msg_socks)); - VERB2("Openned a server socket on port %d (sd=%d)",sock->port,sock->sd); } -gras_socket_t gras_trp_tcp_socket_accept(gras_socket_t sock) { +static gras_socket_t gras_trp_sock_socket_accept(gras_socket_t sock) { gras_socket_t res; struct sockaddr_in peer_in; @@ -190,6 +175,8 @@ gras_socket_t gras_trp_tcp_socket_accept(gras_socket_t sock) { int i = 1; socklen_t s = sizeof(int); + + uint32_t hisport; XBT_IN; gras_trp_socket_new(1,&res); @@ -204,12 +191,12 @@ gras_socket_t gras_trp_tcp_socket_accept(gras_socket_t sock) { } if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) - || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) + || setsockopt(sd, _gras_tcp_proto_number(), TCP_NODELAY, (char *)&i, s)) THROW1(system_error,0,"setsockopt failed, cannot condition the socket: %s", sock_errstr); - res->bufSize = sock->bufSize; - size = sock->bufSize * 1024; + res->buf_size = sock->buf_size; + size = sock->buf_size; if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) WARN1("setsockopt failed, cannot set buffer size: %s", sock_errstr); @@ -220,7 +207,10 @@ gras_socket_t gras_trp_tcp_socket_accept(gras_socket_t sock) { res->accepting = 0; res->sd = sd; res->port = -1; - res->peer_port = peer_in.sin_port; + + gras_trp_tcp_recv(res,(char*)&hisport,sizeof(hisport)); + res->peer_port = ntohl(hisport); + DEBUG1("peerport %d received",res->peer_port); /* FIXME: Lock to protect inet_ntoa */ if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) { @@ -245,11 +235,9 @@ gras_socket_t gras_trp_tcp_socket_accept(gras_socket_t sock) { return res; } -void gras_trp_tcp_socket_close(gras_socket_t sock){ - gras_trp_tcp_plug_data_t *tcp; +static void gras_trp_sock_socket_close(gras_socket_t sock){ if (!sock) return; /* close only once */ - tcp=sock->plugin->data; VERB1("close tcp connection %d", sock->sd); @@ -267,16 +255,6 @@ void gras_trp_tcp_socket_close(gras_socket_t sock){ } } */ -#ifndef HAVE_WINSOCK_H - /* forget about the socket - ... but not when using winsock since accept'ed socket can not fit - into the fd_set*/ - if (sock->meas){ - FD_CLR(sock->sd, &(tcp->meas_socks)); - } else { - FD_CLR(sock->sd, &(tcp->msg_socks)); - } -#endif /* close the socket */ if(tcp_close(sock->sd) < 0) { @@ -285,20 +263,20 @@ void gras_trp_tcp_socket_close(gras_socket_t sock){ } } - -/** - * gras_trp_tcp_chunk_send: - * - * Send data on a TCP socket - */ -void -gras_trp_tcp_chunk_send(gras_socket_t sock, - const char *data, - unsigned long int size) { +/************************************/ +/****[ end of SOCKET MANAGEMENT ]****/ +/************************************/ + + +/************************************/ +/****[ UNBUFFERED DATA EXCHANGE ]****/ +/************************************/ +/* Temptation to merge this with file data exchange is great, + but doesn't work on BillWare (see tcp_write() in portable.h) */ +static inline void gras_trp_tcp_send(gras_socket_t sock, + const char *data, + unsigned long int size) { - /* TCP sockets are in duplex mode, don't check direction */ - xbt_assert0(size >= 0, "Cannot send a negative amount of data"); - while (size) { int status = 0; @@ -306,6 +284,13 @@ gras_trp_tcp_chunk_send(gras_socket_t sock, DEBUG3("write(%d, %p, %ld);", sock->sd, data, size); if (status < 0) { +#ifdef EWOULDBLOCK + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) +#else + if (errno == EINTR || errno == EAGAIN) +#endif + continue; + THROW4(system_error,0,"write(%d,%p,%ld) failed: %s", sock->sd, data, size, sock_errstr); @@ -320,50 +305,390 @@ gras_trp_tcp_chunk_send(gras_socket_t sock, } } } -/** - * gras_trp_tcp_chunk_recv: - * - * Receive data on a TCP socket. - */ -void -gras_trp_tcp_chunk_recv(gras_socket_t sock, - char *data, - unsigned long int size) { +static inline int +gras_trp_tcp_recv_withbuffer(gras_socket_t sock, + char *data, + unsigned long int size, + unsigned long int bufsize) { - /* TCP sockets are in duplex mode, don't check direction */ - xbt_assert0(sock, "Cannot recv on an NULL socket"); - xbt_assert0(size >= 0, "Cannot receive a negative amount of data"); - - while (size) { + int got = 0; + + while (size>got) { int status = 0; - DEBUG3("read(%d, %p, %ld);", sock->sd, data, size); - status = tcp_read(sock->sd, data, (size_t)size); + DEBUG5("read(%d, %p, %ld) got %d so far (%s)", + sock->sd, data+got, bufsize, got, + hexa_str((unsigned char*)data,got)); + status = tcp_read(sock->sd, data+got, (size_t)bufsize); if (status < 0) { - THROW4(system_error,0,"read(%d,%p,%d) failed: %s", - sock->sd, data, (int)size, - sock_errstr); + THROW7(system_error,0,"read(%d,%p,%d) from %s:%d failed: %s; got %d so far", + sock->sd, data+got, (int)size, + gras_socket_peer_name(sock),gras_socket_peer_port(sock), + sock_errstr, + got); } + DEBUG2("Got %d more bytes (%s)",status,hexa_str((unsigned char*)data+got,status)); if (status) { - size -= status; - data += status; + bufsize -= status; + got += status; + } else { + THROW1(system_error,0,"Socket closed by remote side (got %d bytes before this)", + got); + } + } + /* indicate to the gras_select function that there is more to read on this socket so that it does not actually select */ + sock->moredata = (bufsize != 0); + return got; +} + +static int gras_trp_tcp_recv(gras_socket_t sock, + char *data, + unsigned long int size) { + return gras_trp_tcp_recv_withbuffer(sock,data,size,size); + +} +/*******************************************/ +/****[ end of UNBUFFERED DATA EXCHANGE ]****/ +/*******************************************/ + +/**********************************/ +/****[ BUFFERED DATA EXCHANGE ]****/ +/**********************************/ + +/* Make sure the data is sent */ +static void +gras_trp_bufiov_flush(gras_socket_t sock) { +#ifdef HAVE_READV + xbt_dynar_t vect; + int size; +#endif + gras_trp_bufdata_t *data=sock->bufdata; + XBT_IN; + + DEBUG0("Flush"); + if (data->out == buffering_buf) { + if (XBT_LOG_ISENABLED(gras_trp_tcp,xbt_log_priority_debug)) + hexa_print("chunk to send ", + (unsigned char *) data->out_buf.data,data->out_buf.size); + if ((data->out_buf.size - data->out_buf.pos) != 0) { + DEBUG3("Send the chunk (size=%d) to %s:%d",data->out_buf.size, + gras_socket_peer_name(sock),gras_socket_peer_port(sock)); + gras_trp_tcp_send(sock, data->out_buf.data, data->out_buf.size); + VERB1("Chunk sent (size=%d)",data->out_buf.size); + data->out_buf.size = 0; + } + } + +#ifdef HAVE_READV + if (data->out == buffering_iov) { + DEBUG0("Flush out iov"); + vect = sock->bufdata->out_buf_v; + if ((size = xbt_dynar_length(vect))) { + DEBUG1("Flush %d chunks out of this socket",size); + writev(sock->sd,xbt_dynar_get_ptr(vect,0),size); + xbt_dynar_reset(vect); + } + data->out_buf.size = 0; /* reset the buffer containing non-stable data */ + } + + if (data->in == buffering_iov) { + DEBUG0("Flush in iov"); + vect = sock->bufdata->in_buf_v; + if ((size = xbt_dynar_length(vect))) { + DEBUG1("Get %d chunks from of this socket",size); + readv(sock->sd,xbt_dynar_get_ptr(vect,0),size); + xbt_dynar_reset(vect); + } + } +#endif +} +static void +gras_trp_buf_send(gras_socket_t sock, + const char *chunk, + unsigned long int size, + int stable_ignored) { + + gras_trp_bufdata_t *data=(gras_trp_bufdata_t*)sock->bufdata; + int chunk_pos=0; + + XBT_IN; + + while (chunk_pos < size) { + /* size of the chunk to receive in that shot */ + long int thissize = min(size-chunk_pos,data->buffsize-data->out_buf.size); + DEBUG4("Set the chars %d..%ld into the buffer; size=%ld, ctn=(%s)", + (int)data->out_buf.size, + ((int)data->out_buf.size) + thissize -1, + size, + hexa_str((unsigned char*)chunk,thissize)); + + memcpy(data->out_buf.data + data->out_buf.size, chunk + chunk_pos, thissize); + + data->out_buf.size += thissize; + chunk_pos += thissize; + DEBUG4("New pos = %d; Still to send = %ld of %ld; ctn sofar=(%s)", + data->out_buf.size,size-chunk_pos,size,hexa_str((unsigned char*)chunk,chunk_pos)); + + if (data->out_buf.size == data->buffsize) /* out of space. Flush it */ + gras_trp_bufiov_flush(sock); + } + + XBT_OUT; +} + +static int +gras_trp_buf_recv(gras_socket_t sock, + char *chunk, + unsigned long int size) { + + gras_trp_bufdata_t *data=sock->bufdata; + long int chunk_pos = 0; + + XBT_IN; + + while (chunk_pos < size) { + /* size of the chunk to receive in that shot */ + long int thissize; + + if (data->in_buf.size == data->in_buf.pos) { /* out of data. Get more */ + + DEBUG2("Get more data (size=%d,bufsize=%d)", + (int)MIN(size-chunk_pos,data->buffsize), + (int)data->buffsize); + + + data->in_buf.size = + gras_trp_tcp_recv_withbuffer(sock, data->in_buf.data, + MIN(size-chunk_pos,data->buffsize), + data->buffsize); + + data->in_buf.pos=0; + } + + thissize = min(size-chunk_pos , data->in_buf.size - data->in_buf.pos); + memcpy(chunk+chunk_pos, data->in_buf.data + data->in_buf.pos, thissize); + + data->in_buf.pos += thissize; + chunk_pos += thissize; + DEBUG4("New pos = %d; Still to receive = %ld of %ld. Ctn so far=(%s)", + data->in_buf.pos,size - chunk_pos,size,hexa_str((unsigned char*)chunk,chunk_pos)); + } + + XBT_OUT; + return chunk_pos; +} + +/*****************************************/ +/****[ end of BUFFERED DATA EXCHANGE ]****/ +/*****************************************/ + +/********************************/ +/****[ VECTOR DATA EXCHANGE ]****/ +/********************************/ +#ifdef HAVE_READV +static void +gras_trp_iov_send(gras_socket_t sock, + const char *chunk, + unsigned long int size, + int stable) { + struct iovec elm; + gras_trp_bufdata_t *data=(gras_trp_bufdata_t*)sock->bufdata; + + + DEBUG1("Buffer one chunk to be sent later (%s)", + hexa_str((char*)chunk,size)); + + elm.iov_len = (size_t)size; + + if (!stable) { + /* data storage won't last until flush. Save it in a buffer if we can */ + + if (size > data->buffsize-data->out_buf.size) { + /* buffer too small: + flush the socket, using data in its actual storage */ + elm.iov_base = (void*)chunk; + xbt_dynar_push(data->out_buf_v,&elm); + + gras_trp_bufiov_flush(sock); + return; } else { - THROW3(system_error,0, - "file descriptor closed (nothing read(%d, %p, %ld) on the socket)", - sock->sd, data, size); + /* buffer big enough: + copy data into it, and chain it for upcoming writev */ + memcpy(data->out_buf.data + data->out_buf.size, chunk, size); + elm.iov_base = (void*)(data->out_buf.data + data->out_buf.size); + data->out_buf.size += size; + + xbt_dynar_push(data->out_buf_v,&elm); + } + + } else { + /* data storage stable. Chain it */ + + elm.iov_base = (void*)chunk; + xbt_dynar_push(data->out_buf_v,&elm); + } +} +static int +gras_trp_iov_recv(gras_socket_t sock, + char *chunk, + unsigned long int size) { + struct iovec elm; + + DEBUG0("Buffer one chunk to be received later"); + elm.iov_base = (void*)chunk; + elm.iov_len = (size_t)size; + xbt_dynar_push(sock->bufdata->in_buf_v,&elm); + + return size; +} + +#endif +/***************************************/ +/****[ end of VECTOR DATA EXCHANGE ]****/ +/***************************************/ + + +/*** + *** Prototypes of BUFFERED + ***/ + +void gras_trp_buf_socket_client(gras_trp_plugin_t self, + gras_socket_t sock); +void gras_trp_buf_socket_server(gras_trp_plugin_t self, + gras_socket_t sock); +gras_socket_t gras_trp_buf_socket_accept(gras_socket_t sock); + +void gras_trp_buf_socket_close(gras_socket_t sd); + + +gras_socket_t gras_trp_buf_init_sock(gras_socket_t sock) { + gras_trp_bufdata_t *data=xbt_new(gras_trp_bufdata_t,1); + + data->buffsize = 100 * 1024 ; /* 100k */ + + data->in_buf.size = 0; + data->in_buf.data = xbt_malloc(data->buffsize); + data->in_buf.pos = 0; /* useless, indeed, since size==pos */ + + data->out_buf.size = 0; + data->out_buf.data = xbt_malloc(data->buffsize); + data->out_buf.pos = data->out_buf.size; + +#ifdef HAVE_READV + data->in_buf_v = data->out_buf_v = NULL; + data->in_buf_v=xbt_dynar_new(sizeof(struct iovec),NULL); + data->out_buf_v=xbt_dynar_new(sizeof(struct iovec),NULL); + data->out = buffering_iov; +#else + data->out = buffering_buf; +#endif + + data->in = buffering_buf; + + sock->bufdata = data; + return sock; +} + +/*** + *** Code + ***/ +void +gras_trp_tcp_setup(gras_trp_plugin_t plug) { + + plug->socket_client = gras_trp_buf_socket_client; + plug->socket_server = gras_trp_buf_socket_server; + plug->socket_accept = gras_trp_buf_socket_accept; + plug->socket_close = gras_trp_buf_socket_close; + +#ifdef HAVE_READV + plug->send = gras_trp_iov_send; +#else + plug->send = gras_trp_buf_send; +#endif + plug->recv = gras_trp_buf_recv; + + plug->raw_send = gras_trp_tcp_send; + plug->raw_recv = gras_trp_tcp_recv; + + plug->flush = gras_trp_bufiov_flush; + + plug->data = NULL; + plug->exit = NULL; +} + +void gras_trp_buf_socket_client(gras_trp_plugin_t self, + /* OUT */ gras_socket_t sock){ + + gras_trp_sock_socket_client(NULL,sock); + gras_trp_buf_init_sock(sock); +} + +/** + * gras_trp_buf_socket_server: + * + * Open a socket used to receive messages. + */ +void gras_trp_buf_socket_server(gras_trp_plugin_t self, + /* OUT */ gras_socket_t sock){ + + gras_trp_sock_socket_server(NULL,sock); + gras_trp_buf_init_sock(sock); +} + +gras_socket_t gras_trp_buf_socket_accept(gras_socket_t sock) { + return gras_trp_buf_init_sock(gras_trp_sock_socket_accept(sock)); +} + +void gras_trp_buf_socket_close(gras_socket_t sock){ + gras_trp_bufdata_t *data=sock->bufdata; + + if (data->in_buf.size!=data->in_buf.pos) { + WARN3("Socket closed, but %d bytes were unread (size=%d,pos=%d)", + data->in_buf.size - data->in_buf.pos, + data->in_buf.size, data->in_buf.pos); + } + if (data->in_buf.data) + free(data->in_buf.data); + + if (data->out_buf.size!=data->out_buf.pos) { + DEBUG2("Flush the socket before closing (in=%d,out=%d)", + data->in_buf.size, data->out_buf.size); + gras_trp_bufiov_flush(sock); + } + if (data->out_buf.data) + free(data->out_buf.data); + +#ifdef HAVE_READV + if (data->in_buf_v) { + if (xbt_dynar_length(data->in_buf_v)) + WARN0("Socket closed, but some bytes were unread"); + xbt_dynar_free(&data->in_buf_v); + } + if (data->out_buf_v) { + if (xbt_dynar_length(data->out_buf_v)) { + DEBUG0("Flush the socket before closing"); + gras_trp_bufiov_flush(sock); } + xbt_dynar_free(&data->out_buf_v); } +#endif + + free(data); + gras_trp_sock_socket_close(sock); } +/****************************/ +/****[ HELPER FUNCTIONS ]****/ +/****************************/ /* * Returns the tcp protocol number from the network protocol data base. * * getprotobyname() is not thread safe. We need to lock it. */ -static int TcpProtoNumber(void) { +static int _gras_tcp_proto_number(void) { struct protoent *fetchedEntry; static int returnValue = 0; @@ -450,3 +775,7 @@ const char *gras_wsa_err2string( int err ) { return "unknown WSA error"; } #endif /* HAVE_WINSOCK_H */ + +/***********************************/ +/****[ end of HELPER FUNCTIONS ]****/ +/***********************************/