X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/87468a9f6b682ee41edc3a16a100554ef48032c9..3489f102970893cd4a0462a176ff855c1a09f85c:/src/gras/Transport/transport_plugin_tcp.c diff --git a/src/gras/Transport/transport_plugin_tcp.c b/src/gras/Transport/transport_plugin_tcp.c index 256820f776..a4db1b640a 100644 --- a/src/gras/Transport/transport_plugin_tcp.c +++ b/src/gras/Transport/transport_plugin_tcp.c @@ -1,6 +1,6 @@ /* $Id$ */ -/* buf trp (transport) - buffered transport using the TCP one */ +/* buf trp (transport) - buffered transport using the TCP one */ /* Copyright (c) 2004 Martin Quinson. All rights reserved. */ @@ -16,11 +16,18 @@ #include "xbt/ex.h" #include "transport_private.h" +/* FIXME maybe READV is sometime a good thing? */ +#undef HAVE_READV + +#ifdef HAVE_READV +#include +#endif + #ifndef MIN #define MIN(a,b) ((a)<(b)?(a):(b)) #endif -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport, +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_tcp,gras_trp, "TCP buffered transport"); /*** @@ -54,6 +61,14 @@ struct gras_trp_bufdata_{ /*****************************/ /****[ SOCKET MANAGEMENT ]****/ /*****************************/ +/* we exchange port number on client side on socket creation, + so we need to be able to talk right now. */ +static inline void gras_trp_tcp_send(gras_socket_t sock, const char *data, + unsigned long int size); +static int gras_trp_tcp_recv(gras_socket_t sock, char *data, + unsigned long int size); + + static int _gras_tcp_proto_number(void); static inline void gras_trp_sock_socket_client(gras_trp_plugin_t ignored, @@ -62,7 +77,8 @@ static inline void gras_trp_sock_socket_client(gras_trp_plugin_t ignored, struct sockaddr_in addr; struct hostent *he; struct in_addr *haddr; - int size = sock->bufSize * 1024; + int size = sock->buf_size; + uint32_t myport = htonl(((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport); sock->incoming = 1; /* TCP sockets are duplex'ed */ @@ -96,6 +112,10 @@ static inline void gras_trp_sock_socket_client(gras_trp_plugin_t ignored, "Failed to connect socket to %s:%d (%s)", sock->peer_name, sock->peer_port, sock_errstr); } + + gras_trp_tcp_send(sock,(char*)&myport,sizeof(uint32_t)); + DEBUG1("peerport sent to %d", sock->peer_port); + VERB4("Connect to %s:%d (sd=%d, port %d here)", sock->peer_name, sock->peer_port, sock->sd, sock->port); } @@ -107,7 +127,7 @@ static inline void gras_trp_sock_socket_client(gras_trp_plugin_t ignored, */ static inline void gras_trp_sock_socket_server(gras_trp_plugin_t ignored, gras_socket_t sock){ - int size = sock->bufSize * 1024; + int size = sock->buf_size; int on = 1; struct sockaddr_in server; @@ -155,6 +175,8 @@ static gras_socket_t gras_trp_sock_socket_accept(gras_socket_t sock) { int i = 1; socklen_t s = sizeof(int); + + uint32_t hisport; XBT_IN; gras_trp_socket_new(1,&res); @@ -173,8 +195,8 @@ static gras_socket_t gras_trp_sock_socket_accept(gras_socket_t sock) { THROW1(system_error,0,"setsockopt failed, cannot condition the socket: %s", sock_errstr); - res->bufSize = sock->bufSize; - size = sock->bufSize * 1024; + res->buf_size = sock->buf_size; + size = sock->buf_size; if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) WARN1("setsockopt failed, cannot set buffer size: %s", sock_errstr); @@ -185,7 +207,10 @@ static gras_socket_t gras_trp_sock_socket_accept(gras_socket_t sock) { res->accepting = 0; res->sd = sd; res->port = -1; - res->peer_port = peer_in.sin_port; + + gras_trp_tcp_recv(res,(char*)&hisport,sizeof(hisport)); + res->peer_port = ntohl(hisport); + DEBUG1("peerport %d received",res->peer_port); /* FIXME: Lock to protect inet_ntoa */ if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) { @@ -259,6 +284,13 @@ static inline void gras_trp_tcp_send(gras_socket_t sock, DEBUG3("write(%d, %p, %ld);", sock->sd, data, size); if (status < 0) { +#ifdef EWOULDBLOCK + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) +#else + if (errno == EINTR || errno == EAGAIN) +#endif + continue; + THROW4(system_error,0,"write(%d,%p,%ld) failed: %s", sock->sd, data, size, sock_errstr); @@ -286,21 +318,24 @@ gras_trp_tcp_recv_withbuffer(gras_socket_t sock, DEBUG5("read(%d, %p, %ld) got %d so far (%s)", sock->sd, data+got, bufsize, got, - hexa_str(data,got)); + hexa_str((unsigned char*)data,got)); status = tcp_read(sock->sd, data+got, (size_t)bufsize); if (status < 0) { - THROW4(system_error,0,"read(%d,%p,%d) failed: %s", + THROW7(system_error,0,"read(%d,%p,%d) from %s:%d failed: %s; got %d so far", sock->sd, data+got, (int)size, - sock_errstr); + gras_socket_peer_name(sock),gras_socket_peer_port(sock), + sock_errstr, + got); } - DEBUG2("Got %d more bytes (%s)",status,hexa_str(data+got,status)); + DEBUG2("Got %d more bytes (%s)",status,hexa_str((unsigned char*)data+got,status)); if (status) { bufsize -= status; got += status; } else { - THROW0(system_error,0,"Socket closed by remote side"); + THROW1(system_error,0,"Socket closed by remote side (got %d bytes before this)", + got); } } return got; @@ -332,7 +367,7 @@ gras_trp_bufiov_flush(gras_socket_t sock) { DEBUG0("Flush"); if (data->out == buffering_buf) { - if (XBT_LOG_ISENABLED(trp_tcp,xbt_log_priority_debug)) + if (XBT_LOG_ISENABLED(gras_trp_tcp,xbt_log_priority_debug)) hexa_print("chunk to send ", (unsigned char *) data->out_buf.data,data->out_buf.size); if ((data->out_buf.size - data->out_buf.pos) != 0) { @@ -346,6 +381,7 @@ gras_trp_bufiov_flush(gras_socket_t sock) { #ifdef HAVE_READV if (data->out == buffering_iov) { + DEBUG0("Flush out iov"); vect = sock->bufdata->out_buf_v; if ((size = xbt_dynar_length(vect))) { DEBUG1("Flush %d chunks out of this socket",size); @@ -356,6 +392,7 @@ gras_trp_bufiov_flush(gras_socket_t sock) { } if (data->in == buffering_iov) { + DEBUG0("Flush in iov"); vect = sock->bufdata->in_buf_v; if ((size = xbt_dynar_length(vect))) { DEBUG1("Get %d chunks from of this socket",size); @@ -383,14 +420,14 @@ gras_trp_buf_send(gras_socket_t sock, (int)data->out_buf.size, ((int)data->out_buf.size) + thissize -1, size, - hexa_str((char*)chunk,thissize)); + hexa_str((unsigned char*)chunk,thissize)); memcpy(data->out_buf.data + data->out_buf.size, chunk + chunk_pos, thissize); data->out_buf.size += thissize; chunk_pos += thissize; DEBUG4("New pos = %d; Still to send = %ld of %ld; ctn sofar=(%s)", - data->out_buf.size,size-chunk_pos,size,hexa_str((char*)chunk,chunk_pos)); + data->out_buf.size,size-chunk_pos,size,hexa_str((unsigned char*)chunk,chunk_pos)); if (data->out_buf.size == data->buffsize) /* out of space. Flush it */ gras_trp_bufiov_flush(sock); @@ -434,7 +471,7 @@ gras_trp_buf_recv(gras_socket_t sock, data->in_buf.pos += thissize; chunk_pos += thissize; DEBUG4("New pos = %d; Still to receive = %ld of %ld. Ctn so far=(%s)", - data->in_buf.pos,size - chunk_pos,size,hexa_str(chunk,chunk_pos)); + data->in_buf.pos,size - chunk_pos,size,hexa_str((unsigned char*)chunk,chunk_pos)); } XBT_OUT; @@ -537,15 +574,16 @@ gras_socket_t gras_trp_buf_init_sock(gras_socket_t sock) { data->out_buf.data = xbt_malloc(data->buffsize); data->out_buf.pos = data->out_buf.size; - data->in_buf_v = data->out_buf_v = NULL; #ifdef HAVE_READV + data->in_buf_v = data->out_buf_v = NULL; data->in_buf_v=xbt_dynar_new(sizeof(struct iovec),NULL); data->out_buf_v=xbt_dynar_new(sizeof(struct iovec),NULL); + data->out = buffering_iov; +#else + data->out = buffering_buf; #endif data->in = buffering_buf; - data->out = buffering_iov; - /*data->out = buffering_buf;*/ sock->bufdata = data; return sock; @@ -555,15 +593,18 @@ gras_socket_t gras_trp_buf_init_sock(gras_socket_t sock) { *** Code ***/ void -gras_trp_buf_setup(gras_trp_plugin_t plug) { +gras_trp_tcp_setup(gras_trp_plugin_t plug) { plug->socket_client = gras_trp_buf_socket_client; plug->socket_server = gras_trp_buf_socket_server; plug->socket_accept = gras_trp_buf_socket_accept; plug->socket_close = gras_trp_buf_socket_close; +#ifdef HAVE_READV plug->send = gras_trp_iov_send; - /*plug->send = gras_trp_buf_send;*/ +#else + plug->send = gras_trp_buf_send; +#endif plug->recv = gras_trp_buf_recv; plug->raw_send = gras_trp_tcp_send;