X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/a394dea56ed45a707acff2552343114a4d5a95fb..1a2592601e29b027872580cbfc364194f8b01ba6:/src/gras/Transport/transport_plugin_tcp.c diff --git a/src/gras/Transport/transport_plugin_tcp.c b/src/gras/Transport/transport_plugin_tcp.c index 8b58f553c5..6c22cb7ace 100644 --- a/src/gras/Transport/transport_plugin_tcp.c +++ b/src/gras/Transport/transport_plugin_tcp.c @@ -1,49 +1,56 @@ -/* $Id$ */ - /* buf trp (transport) - buffered transport using the TCP one */ -/* Copyright (c) 2004 Martin Quinson. All rights reserved. */ +/* Copyright (c) 2004, 2005, 2006, 2007, 2009, 2010. The SimGrid Team. + * All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ #include -#include /* memset */ +#include /* memset */ #include "portable.h" #include "xbt/misc.h" #include "xbt/sysdep.h" #include "xbt/ex.h" #include "gras/Transport/transport_private.h" +#include "gras/Msg/msg_interface.h" /* listener_close_socket */ /* FIXME maybe READV is sometime a good thing? */ #undef HAVE_READV #ifdef HAVE_READV #include -#endif +#endif #ifndef MIN #define MIN(a,b) ((a)<(b)?(a):(b)) #endif -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_tcp,gras_trp, - "TCP buffered transport"); +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_tcp, gras_trp, + "TCP buffered transport"); /*** *** Specific socket part ***/ +typedef struct { + int port; /* port on this side */ + int peer_port; /* port on the other side */ + char *peer_name; /* hostname of the other side */ + char *peer_proc; /* process on the other side */ +} s_gras_trp_tcp_sock_data_t, *gras_trp_tcp_sock_data_t; + typedef enum { buffering_buf, buffering_iov } buffering_kind; typedef struct { int size; char *data; - int pos; /* for receive; not exchanged over the net */ + int pos; /* for receive; not exchanged over the net */ } gras_trp_buf_t; -struct gras_trp_bufdata_{ +struct gras_trp_bufdata_ { int buffsize; gras_trp_buf_t in_buf; gras_trp_buf_t out_buf; @@ -63,61 +70,78 @@ struct gras_trp_bufdata_{ /*****************************/ /* we exchange port number on client side on socket creation, so we need to be able to talk right now. */ -static XBT_INLINE void gras_trp_tcp_send(gras_socket_t sock, const char *data, - unsigned long int size); +static XBT_INLINE void gras_trp_tcp_send(gras_socket_t sock, + const char *data, + unsigned long int size); static int gras_trp_tcp_recv(gras_socket_t sock, char *data, - unsigned long int size); + unsigned long int size); static int _gras_tcp_proto_number(void); -static XBT_INLINE void gras_trp_sock_socket_client(gras_trp_plugin_t ignored, - gras_socket_t sock){ - +static XBT_INLINE +void gras_trp_sock_socket_client(gras_trp_plugin_t ignored, + const char *host, + int port, + /*OUT*/gras_socket_t sock) +{ + gras_trp_tcp_sock_data_t sockdata = xbt_new(s_gras_trp_tcp_sock_data_t,1); + sockdata->port = port; + sockdata->peer_proc = NULL; + sockdata->peer_port = port; + sockdata->peer_name = (char *) strdup(host ? host : "localhost"); + sock->data = sockdata; + struct sockaddr_in addr; struct hostent *he; struct in_addr *haddr; - int size = sock->buf_size; - uint32_t myport = htonl(((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport); + int size = sock->buf_size; + uint32_t myport = htonl(((gras_trp_procdata_t) + gras_libdata_by_id + (gras_trp_libdata_id))->myport); + + sock->incoming = 1; /* TCP sockets are duplex'ed */ - sock->incoming = 1; /* TCP sockets are duplex'ed */ + sock->sd = socket(AF_INET, SOCK_STREAM, 0); - sock->sd = socket (AF_INET, SOCK_STREAM, 0); - if (sock->sd < 0) { - THROW1(system_error,0, "Failed to create socket: %s", sock_errstr(sock_errno)); + THROW1(system_error, 0, "Failed to create socket: %s", + sock_errstr(sock_errno)); } - if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) || - setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) { - VERB1("setsockopt failed, cannot set buffer size: %s",sock_errstr(sock_errno)); + if (setsockopt + (sock->sd, SOL_SOCKET, SO_RCVBUF, (char *) &size, sizeof(size)) + || setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *) &size, + sizeof(size))) { + VERB1("setsockopt failed, cannot set buffer size: %s", + sock_errstr(sock_errno)); } - - he = gethostbyname (sock->peer_name); + + he = gethostbyname(sockdata->peer_name); if (he == NULL) { - THROW2(system_error,0, "Failed to lookup hostname %s: %s", - sock->peer_name, sock_errstr(sock_errno)); + THROW2(system_error, 0, "Failed to lookup hostname %s: %s", + sockdata->peer_name, sock_errstr(sock_errno)); } - + haddr = ((struct in_addr *) (he->h_addr_list)[0]); - + memset(&addr, 0, sizeof(struct sockaddr_in)); - memcpy (&addr.sin_addr, haddr, sizeof(struct in_addr)); + memcpy(&addr.sin_addr, haddr, sizeof(struct in_addr)); addr.sin_family = AF_INET; - addr.sin_port = htons (sock->peer_port); + addr.sin_port = htons(sockdata->peer_port); - if (connect (sock->sd, (struct sockaddr*) &addr, sizeof (addr)) < 0) { + if (connect(sock->sd, (struct sockaddr *) &addr, sizeof(addr)) < 0) { tcp_close(sock->sd); - THROW3(system_error,0, - "Failed to connect socket to %s:%d (%s)", - sock->peer_name, sock->peer_port, sock_errstr(sock_errno)); + THROW3(system_error, 0, + "Failed to connect socket to %s:%d (%s)", + sockdata->peer_name, sockdata->peer_port, sock_errstr(sock_errno)); } - gras_trp_tcp_send(sock,(char*)&myport,sizeof(uint32_t)); - DEBUG1("peerport sent to %d", sock->peer_port); + gras_trp_tcp_send(sock, (char *) &myport, sizeof(uint32_t)); + DEBUG1("peerport sent to %d", sockdata->peer_port); VERB4("Connect to %s:%d (sd=%d, port %d here)", - sock->peer_name, sock->peer_port, sock->sd, sock->port); + sockdata->peer_name, sockdata->peer_port, sock->sd, sockdata->port); } /** @@ -125,54 +149,68 @@ static XBT_INLINE void gras_trp_sock_socket_client(gras_trp_plugin_t ignored, * * Open a socket used to receive messages. */ -static XBT_INLINE void gras_trp_sock_socket_server(gras_trp_plugin_t ignored, - gras_socket_t sock){ - int size = sock->buf_size; +static XBT_INLINE +void gras_trp_sock_socket_server(gras_trp_plugin_t ignored, + int port, + gras_socket_t sock) +{ + int size = sock->buf_size; int on = 1; struct sockaddr_in server; - sock->outgoing = 1; /* TCP => duplex mode */ + gras_trp_tcp_sock_data_t sockdata = xbt_new(s_gras_trp_tcp_sock_data_t,1); + sockdata->port = port; + sockdata->peer_port = -1; + sockdata->peer_name = NULL; + sockdata->peer_proc = NULL; + sock->data=sockdata; + + sock->outgoing = 1; /* TCP => duplex mode */ - server.sin_port = htons((u_short)sock->port); + server.sin_port = htons((u_short) sockdata->port); server.sin_addr.s_addr = INADDR_ANY; server.sin_family = AF_INET; - if((sock->sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) - THROW1(system_error,0,"Socket allocation failed: %s", sock_errstr(sock_errno)); - - if (setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on))) - THROW1(system_error,0, - "setsockopt failed, cannot condition the socket: %s", - sock_errstr(sock_errno)); - - if ( setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, - (char *)&size, sizeof(size)) - || setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, - (char *)&size, sizeof(size))) { - VERB1("setsockopt failed, cannot set buffer size: %s", - sock_errstr(sock_errno)); + if ((sock->sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) + THROW1(system_error, 0, "Socket allocation failed: %s", + sock_errstr(sock_errno)); + + if (setsockopt + (sock->sd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on))) + THROW1(system_error, 0, + "setsockopt failed, cannot condition the socket: %s", + sock_errstr(sock_errno)); + + if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, + (char *) &size, sizeof(size)) + || setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, + (char *) &size, sizeof(size))) { + VERB1("setsockopt failed, cannot set buffer size: %s", + sock_errstr(sock_errno)); } - - if (bind(sock->sd, (struct sockaddr *)&server, sizeof(server)) == -1) { + + if (bind(sock->sd, (struct sockaddr *) &server, sizeof(server)) == -1) { tcp_close(sock->sd); - THROW2(system_error,0, - "Cannot bind to port %d: %s", - sock->port, sock_errstr(sock_errno)); + THROW2(system_error, 0, + "Cannot bind to port %d: %s", sockdata->port, + sock_errstr(sock_errno)); } - DEBUG2("Listen on port %d (sd=%d)",sock->port, sock->sd); + DEBUG2("Listen on port %d (sd=%d)", sockdata->port, sock->sd); if (listen(sock->sd, 5) < 0) { tcp_close(sock->sd); - THROW2(system_error,0, - "Cannot listen on port %d: %s", - sock->port,sock_errstr(sock_errno)); + THROW2(system_error, 0, + "Cannot listen on port %d: %s", + sockdata->port, sock_errstr(sock_errno)); } - VERB2("Openned a server socket on port %d (sd=%d)",sock->port,sock->sd); + VERB2("Openned a server socket on port %d (sd=%d)", sockdata->port, + sock->sd); } -static gras_socket_t gras_trp_sock_socket_accept(gras_socket_t sock) { +static gras_socket_t gras_trp_sock_socket_accept(gras_socket_t sock) +{ gras_socket_t res; - + struct sockaddr_in peer_in; socklen_t peer_in_len = sizeof(peer_in); @@ -184,94 +222,90 @@ static gras_socket_t gras_trp_sock_socket_accept(gras_socket_t sock) { socklen_t s = sizeof(int); uint32_t hisport; - + XBT_IN; - gras_trp_socket_new(1,&res); + gras_trp_socket_new(1, &res); - sd = accept(sock->sd, (struct sockaddr *)&peer_in, &peer_in_len); + sd = accept(sock->sd, (struct sockaddr *) &peer_in, &peer_in_len); tmp_errno = sock_errno; if (sd == -1) { gras_socket_close(sock); - THROW1(system_error,0, - "Accept failed (%s). Droping server socket.", sock_errstr(tmp_errno)); + THROW1(system_error, 0, + "Accept failed (%s). Droping server socket.", + sock_errstr(tmp_errno)); } - - if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) - || setsockopt(sd, _gras_tcp_proto_number(), TCP_NODELAY, (char *)&i, s)) - THROW1(system_error,0,"setsockopt failed, cannot condition the socket: %s", - sock_errstr(tmp_errno)); + + if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *) &i, s) + || setsockopt(sd, _gras_tcp_proto_number(), TCP_NODELAY, (char *) &i, + s)) + THROW1(system_error, 0, + "setsockopt failed, cannot condition the socket: %s", + sock_errstr(tmp_errno)); res->buf_size = sock->buf_size; size = sock->buf_size; - if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) - || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) - VERB1("setsockopt failed, cannot set buffer size: %s", sock_errstr(tmp_errno)); - - res->plugin = sock->plugin; - res->incoming = sock->incoming; - res->outgoing = sock->outgoing; + if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *) &size, sizeof(size)) + || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *) &size, + sizeof(size))) + VERB1("setsockopt failed, cannot set buffer size: %s", + sock_errstr(tmp_errno)); + + res->plugin = sock->plugin; + res->incoming = sock->incoming; + res->outgoing = sock->outgoing; res->accepting = 0; - res->sd = sd; - res->port = -1; + res->sd = sd; + gras_trp_tcp_sock_data_t sockdata = xbt_new(s_gras_trp_tcp_sock_data_t,1); + sockdata->port = -1; + res->data=sockdata; + - gras_trp_tcp_recv(res,(char*)&hisport,sizeof(hisport)); - res->peer_port = ntohl(hisport); - DEBUG1("peerport %d received",res->peer_port); + gras_trp_tcp_recv(res, (char *) &hisport, sizeof(hisport)); + sockdata->peer_port = ntohl(hisport); + DEBUG1("peerport %d received", sockdata->peer_port); /* FIXME: Lock to protect inet_ntoa */ - if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) { - res->peer_name = (char*)strdup("unknown"); + if (((struct sockaddr *) &peer_in)->sa_family != AF_INET) { + sockdata->peer_name = (char *) strdup("unknown"); } else { struct in_addr addrAsInAddr; char *tmp; - + addrAsInAddr.s_addr = peer_in.sin_addr.s_addr; - + tmp = inet_ntoa(addrAsInAddr); if (tmp != NULL) { - res->peer_name = (char*)strdup(tmp); + sockdata->peer_name = (char *) strdup(tmp); } else { - res->peer_name = (char*)strdup("unknown"); + sockdata->peer_name = (char *) strdup("unknown"); } } - - VERB3("Accepted from %s:%d (sd=%d)", res->peer_name,res->peer_port,sd); + + VERB3("Accepted from %s:%d (sd=%d)", sockdata->peer_name, sockdata->peer_port, sd); xbt_dynar_push(((gras_trp_procdata_t) - gras_libdata_by_id(gras_trp_libdata_id))->sockets,&res); - + gras_libdata_by_id(gras_trp_libdata_id))->sockets, &res); + XBT_OUT; return res; } -static void gras_trp_sock_socket_close(gras_socket_t sock){ - - if (!sock) return; /* close only once */ +static void gras_trp_sock_socket_close(gras_socket_t sock) +{ - VERB1("close tcp connection %d", sock->sd); + if (!sock) + return; /* close only once */ - /* FIXME: no pipe in GRAS so far - if(!FD_ISSET(sd, &connectedPipes)) { - if(shutdown(sd, 2) < 0) { - GetNWSLock(&lock); - tmp_errno = errno; - ReleaseNWSLock(&lock); - - / * The other side may have beaten us to the reset. * / - if ((tmp_errno!=ENOTCONN) && (tmp_errno!=ECONNRESET)) { - WARN1("CloseSocket: shutdown error %d\n", tmp_errno); - } - } - } */ + if (((gras_trp_tcp_sock_data_t)sock->data)->peer_name) + free(((gras_trp_tcp_sock_data_t)sock->data)->peer_name); + free(sock->data); - - /* close the socket */ - if(tcp_close(sock->sd) < 0) { - WARN3("error while closing tcp socket %d: %d (%s)\n", - sock->sd, sock_errno, sock_errstr(sock_errno)); - } + VERB1("close tcp connection %d", sock->sd); + /* ask the listener to close the socket */ + gras_msg_listener_close_socket(sock->sd); } + /************************************/ /****[ end of SOCKET MANAGEMENT ]****/ /************************************/ @@ -283,75 +317,78 @@ static void gras_trp_sock_socket_close(gras_socket_t sock){ /* Temptation to merge this with file data exchange is great, but doesn't work on BillWare (see tcp_write() in portable.h) */ static XBT_INLINE void gras_trp_tcp_send(gras_socket_t sock, - const char *data, - unsigned long int size) { - + const char *data, + unsigned long int size) +{ + while (size) { int status = 0; - - status = tcp_write(sock->sd, data, (size_t)size); + + status = tcp_write(sock->sd, data, (size_t) size); DEBUG3("write(%d, %p, %ld);", sock->sd, data, size); - + if (status < 0) { #ifdef EWOULDBLOCK - if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) #else - if (errno == EINTR || errno == EAGAIN) + if (errno == EINTR || errno == EAGAIN) #endif - continue; - - THROW4(system_error,0,"write(%d,%p,%ld) failed: %s", - sock->sd, data, size, - sock_errstr(sock_errno)); + continue; + + THROW4(system_error, 0, "write(%d,%p,%ld) failed: %s", + sock->sd, data, size, sock_errstr(sock_errno)); } - + if (status) { - size -= status; - data += status; + size -= status; + data += status; } else { - THROW1(system_error,0,"file descriptor closed (%s)", + THROW1(system_error, 0, "file descriptor closed (%s)", sock_errstr(sock_errno)); } } } -static XBT_INLINE int + +static XBT_INLINE int gras_trp_tcp_recv_withbuffer(gras_socket_t sock, - char *data, - unsigned long int size, - unsigned long int bufsize) { + char *data, + unsigned long int size, + unsigned long int bufsize) +{ int got = 0; if (sock->recvd) { - data[0] = sock->recvd_val; - sock->recvd = 0; - got++; - bufsize--; - } + data[0] = sock->recvd_val; + sock->recvd = 0; + got++; + bufsize--; + } - while (size>got) { + while (size > got) { int status = 0; - + DEBUG5("read(%d, %p, %ld) got %d so far (%s)", - sock->sd, data+got, bufsize, got, - hexa_str((unsigned char*)data,got,0)); - status = tcp_read(sock->sd, data+got, (size_t)bufsize); - + sock->sd, data + got, bufsize, got, + hexa_str((unsigned char *) data, got, 0)); + status = tcp_read(sock->sd, data + got, (size_t) bufsize); + if (status < 0) { - THROW7(system_error,0,"read(%d,%p,%d) from %s:%d failed: %s; got %d so far", - sock->sd, data+got, (int)size, - gras_socket_peer_name(sock),gras_socket_peer_port(sock), - sock_errstr(sock_errno), - got); + THROW7(system_error, 0, + "read(%d,%p,%d) from %s:%d failed: %s; got %d so far", + sock->sd, data + got, (int) size, gras_socket_peer_name(sock), + gras_socket_peer_port(sock), sock_errstr(sock_errno), got); } - DEBUG2("Got %d more bytes (%s)",status,hexa_str((unsigned char*)data+got,status,0)); - + DEBUG2("Got %d more bytes (%s)", status, + hexa_str((unsigned char *) data + got, status, 0)); + if (status) { bufsize -= status; - got += status; + got += status; } else { - THROW1(system_error,errno,"Socket closed by remote side (got %d bytes before this)", - got); + THROW1(system_error, errno, + "Socket closed by remote side (got %d bytes before this)", + got); } } @@ -359,11 +396,12 @@ gras_trp_tcp_recv_withbuffer(gras_socket_t sock, } static int gras_trp_tcp_recv(gras_socket_t sock, - char *data, - unsigned long int size) { - return gras_trp_tcp_recv_withbuffer(sock,data,size,size); + char *data, unsigned long int size) +{ + return gras_trp_tcp_recv_withbuffer(sock, data, size, size); } + /*******************************************/ /****[ end of UNBUFFERED DATA EXCHANGE ]****/ /*******************************************/ @@ -373,80 +411,82 @@ static int gras_trp_tcp_recv(gras_socket_t sock, /**********************************/ /* Make sure the data is sent */ -static void -gras_trp_bufiov_flush(gras_socket_t sock) { +static void gras_trp_bufiov_flush(gras_socket_t sock) +{ #ifdef HAVE_READV xbt_dynar_t vect; int size; #endif - gras_trp_bufdata_t *data=sock->bufdata; - XBT_IN; - + gras_trp_bufdata_t *data = sock->bufdata; + XBT_IN; + DEBUG0("Flush"); if (data->out == buffering_buf) { - if (XBT_LOG_ISENABLED(gras_trp_tcp,xbt_log_priority_debug)) + if (XBT_LOG_ISENABLED(gras_trp_tcp, xbt_log_priority_debug)) hexa_print("chunk to send ", - (unsigned char *) data->out_buf.data,data->out_buf.size); - if ((data->out_buf.size - data->out_buf.pos) != 0) { - DEBUG3("Send the chunk (size=%d) to %s:%d",data->out_buf.size, - gras_socket_peer_name(sock),gras_socket_peer_port(sock)); + (unsigned char *) data->out_buf.data, data->out_buf.size); + if ((data->out_buf.size - data->out_buf.pos) != 0) { + DEBUG3("Send the chunk (size=%d) to %s:%d", data->out_buf.size, + gras_socket_peer_name(sock), gras_socket_peer_port(sock)); gras_trp_tcp_send(sock, data->out_buf.data, data->out_buf.size); - VERB1("Chunk sent (size=%d)",data->out_buf.size); + VERB1("Chunk sent (size=%d)", data->out_buf.size); data->out_buf.size = 0; } } - #ifdef HAVE_READV if (data->out == buffering_iov) { DEBUG0("Flush out iov"); vect = sock->bufdata->out_buf_v; if ((size = xbt_dynar_length(vect))) { - DEBUG1("Flush %d chunks out of this socket",size); - writev(sock->sd,xbt_dynar_get_ptr(vect,0),size); + DEBUG1("Flush %d chunks out of this socket", size); + writev(sock->sd, xbt_dynar_get_ptr(vect, 0), size); xbt_dynar_reset(vect); } - data->out_buf.size = 0; /* reset the buffer containing non-stable data */ + data->out_buf.size = 0; /* reset the buffer containing non-stable data */ } if (data->in == buffering_iov) { DEBUG0("Flush in iov"); vect = sock->bufdata->in_buf_v; if ((size = xbt_dynar_length(vect))) { - DEBUG1("Get %d chunks from of this socket",size); - readv(sock->sd,xbt_dynar_get_ptr(vect,0),size); + DEBUG1("Get %d chunks from of this socket", size); + readv(sock->sd, xbt_dynar_get_ptr(vect, 0), size); xbt_dynar_reset(vect); } } #endif } + static void gras_trp_buf_send(gras_socket_t sock, - const char *chunk, - unsigned long int size, - int stable_ignored) { + const char *chunk, + unsigned long int size, int stable_ignored) +{ - gras_trp_bufdata_t *data=(gras_trp_bufdata_t*)sock->bufdata; - int chunk_pos=0; + gras_trp_bufdata_t *data = (gras_trp_bufdata_t *) sock->bufdata; + int chunk_pos = 0; XBT_IN; while (chunk_pos < size) { /* size of the chunk to receive in that shot */ - long int thissize = min(size-chunk_pos,data->buffsize-data->out_buf.size); + long int thissize = + min(size - chunk_pos, data->buffsize - data->out_buf.size); DEBUG4("Set the chars %d..%ld into the buffer; size=%ld, ctn=(%s)", - (int)data->out_buf.size, - ((int)data->out_buf.size) + thissize -1, - size, - hexa_str((unsigned char*)chunk,thissize,0)); + (int) data->out_buf.size, + ((int) data->out_buf.size) + thissize - 1, size, + hexa_str((unsigned char *) chunk, thissize, 0)); - memcpy(data->out_buf.data + data->out_buf.size, chunk + chunk_pos, thissize); + memcpy(data->out_buf.data + data->out_buf.size, chunk + chunk_pos, + thissize); data->out_buf.size += thissize; - chunk_pos += thissize; + chunk_pos += thissize; DEBUG4("New pos = %d; Still to send = %ld of %ld; ctn sofar=(%s)", - data->out_buf.size,size-chunk_pos,size,hexa_str((unsigned char*)chunk,chunk_pos,0)); + data->out_buf.size, size - chunk_pos, size, + hexa_str((unsigned char *) chunk, chunk_pos, 0)); - if (data->out_buf.size == data->buffsize) /* out of space. Flush it */ + if (data->out_buf.size == data->buffsize) /* out of space. Flush it */ gras_trp_bufiov_flush(sock); } @@ -454,46 +494,48 @@ gras_trp_buf_send(gras_socket_t sock, } static int -gras_trp_buf_recv(gras_socket_t sock, - char *chunk, - unsigned long int size) { +gras_trp_buf_recv(gras_socket_t sock, char *chunk, unsigned long int size) +{ - gras_trp_bufdata_t *data=sock->bufdata; + gras_trp_bufdata_t *data = sock->bufdata; long int chunk_pos = 0; - + XBT_IN; while (chunk_pos < size) { /* size of the chunk to receive in that shot */ long int thissize; - if (data->in_buf.size == data->in_buf.pos) { /* out of data. Get more */ + if (data->in_buf.size == data->in_buf.pos) { /* out of data. Get more */ DEBUG2("Get more data (size=%d,bufsize=%d)", - (int)MIN(size-chunk_pos,data->buffsize), - (int)data->buffsize); - - - data->in_buf.size = - gras_trp_tcp_recv_withbuffer(sock, data->in_buf.data, - MIN(size-chunk_pos,data->buffsize), - data->buffsize); - - data->in_buf.pos=0; + (int) MIN(size - chunk_pos, data->buffsize), + (int) data->buffsize); + + + data->in_buf.size = + gras_trp_tcp_recv_withbuffer(sock, data->in_buf.data, + MIN(size - chunk_pos, + data->buffsize), + data->buffsize); + + data->in_buf.pos = 0; } - - thissize = min(size-chunk_pos , data->in_buf.size - data->in_buf.pos); - memcpy(chunk+chunk_pos, data->in_buf.data + data->in_buf.pos, thissize); + + thissize = min(size - chunk_pos, data->in_buf.size - data->in_buf.pos); + memcpy(chunk + chunk_pos, data->in_buf.data + data->in_buf.pos, + thissize); data->in_buf.pos += thissize; - chunk_pos += thissize; + chunk_pos += thissize; DEBUG4("New pos = %d; Still to receive = %ld of %ld. Ctn so far=(%s)", - data->in_buf.pos,size - chunk_pos,size,hexa_str((unsigned char*)chunk,chunk_pos,0)); + data->in_buf.pos, size - chunk_pos, size, + hexa_str((unsigned char *) chunk, chunk_pos, 0)); } /* indicate on need to the gras_select function that there is more to read on this socket so that it does not actually select */ sock->moredata = (data->in_buf.size > data->in_buf.pos); - DEBUG1("There is %smore data",(sock->moredata?"":"no ")); - + DEBUG1("There is %smore data", (sock->moredata ? "" : "no ")); + XBT_OUT; return chunk_pos; } @@ -508,56 +550,55 @@ gras_trp_buf_recv(gras_socket_t sock, #ifdef HAVE_READV static void gras_trp_iov_send(gras_socket_t sock, - const char *chunk, - unsigned long int size, - int stable) { + const char *chunk, unsigned long int size, int stable) +{ struct iovec elm; - gras_trp_bufdata_t *data=(gras_trp_bufdata_t*)sock->bufdata; - + gras_trp_bufdata_t *data = (gras_trp_bufdata_t *) sock->bufdata; + DEBUG1("Buffer one chunk to be sent later (%s)", - hexa_str((char*)chunk,size,0)); + hexa_str((char *) chunk, size, 0)); - elm.iov_len = (size_t)size; + elm.iov_len = (size_t) size; if (!stable) { /* data storage won't last until flush. Save it in a buffer if we can */ - if (size > data->buffsize-data->out_buf.size) { + if (size > data->buffsize - data->out_buf.size) { /* buffer too small: - flush the socket, using data in its actual storage */ - elm.iov_base = (void*)chunk; - xbt_dynar_push(data->out_buf_v,&elm); + flush the socket, using data in its actual storage */ + elm.iov_base = (void *) chunk; + xbt_dynar_push(data->out_buf_v, &elm); - gras_trp_bufiov_flush(sock); + gras_trp_bufiov_flush(sock); return; } else { /* buffer big enough: - copy data into it, and chain it for upcoming writev */ + copy data into it, and chain it for upcoming writev */ memcpy(data->out_buf.data + data->out_buf.size, chunk, size); - elm.iov_base = (void*)(data->out_buf.data + data->out_buf.size); + elm.iov_base = (void *) (data->out_buf.data + data->out_buf.size); data->out_buf.size += size; - xbt_dynar_push(data->out_buf_v,&elm); + xbt_dynar_push(data->out_buf_v, &elm); } } else { /* data storage stable. Chain it */ - - elm.iov_base = (void*)chunk; - xbt_dynar_push(data->out_buf_v,&elm); + + elm.iov_base = (void *) chunk; + xbt_dynar_push(data->out_buf_v, &elm); } } + static int -gras_trp_iov_recv(gras_socket_t sock, - char *chunk, - unsigned long int size) { +gras_trp_iov_recv(gras_socket_t sock, char *chunk, unsigned long int size) +{ struct iovec elm; DEBUG0("Buffer one chunk to be received later"); - elm.iov_base = (void*)chunk; - elm.iov_len = (size_t)size; - xbt_dynar_push(sock->bufdata->in_buf_v,&elm); + elm.iov_base = (void *) chunk; + elm.iov_len = (size_t) size; + xbt_dynar_push(sock->bufdata->in_buf_v, &elm); return size; } @@ -571,54 +612,89 @@ gras_trp_iov_recv(gras_socket_t sock, /*** *** Prototypes of BUFFERED ***/ - + void gras_trp_buf_socket_client(gras_trp_plugin_t self, - gras_socket_t sock); + const char *host, + int port, + gras_socket_t sock); void gras_trp_buf_socket_server(gras_trp_plugin_t self, - gras_socket_t sock); + int port, + gras_socket_t sock); gras_socket_t gras_trp_buf_socket_accept(gras_socket_t sock); -void gras_trp_buf_socket_close(gras_socket_t sd); - +void gras_trp_buf_socket_close(gras_socket_t sd); + + +gras_socket_t gras_trp_buf_init_sock(gras_socket_t sock) +{ + gras_trp_bufdata_t *data = xbt_new(gras_trp_bufdata_t, 1); -gras_socket_t gras_trp_buf_init_sock(gras_socket_t sock) { - gras_trp_bufdata_t *data=xbt_new(gras_trp_bufdata_t,1); - - data->buffsize = 100 * 1024 ; /* 100k */ + data->buffsize = 100 * 1024; /* 100k */ + + data->in_buf.size = 0; + data->in_buf.data = xbt_malloc(data->buffsize); + data->in_buf.pos = 0; /* useless, indeed, since size==pos */ - data->in_buf.size = 0; - data->in_buf.data = xbt_malloc(data->buffsize); - data->in_buf.pos = 0; /* useless, indeed, since size==pos */ - data->out_buf.size = 0; data->out_buf.data = xbt_malloc(data->buffsize); - data->out_buf.pos = data->out_buf.size; + data->out_buf.pos = data->out_buf.size; #ifdef HAVE_READV data->in_buf_v = data->out_buf_v = NULL; - data->in_buf_v=xbt_dynar_new(sizeof(struct iovec),NULL); - data->out_buf_v=xbt_dynar_new(sizeof(struct iovec),NULL); + data->in_buf_v = xbt_dynar_new(sizeof(struct iovec), NULL); + data->out_buf_v = xbt_dynar_new(sizeof(struct iovec), NULL); data->out = buffering_iov; #else data->out = buffering_buf; #endif - + data->in = buffering_buf; sock->bufdata = data; return sock; } +/*** + *** Info about who's speaking + ***/ +static int gras_trp_tcp_my_port(gras_socket_t s) { + gras_trp_tcp_sock_data_t sockdata = s->data; + return sockdata->port; +} +static int gras_trp_tcp_peer_port(gras_socket_t s) { + gras_trp_tcp_sock_data_t sockdata = s->data; + return sockdata->peer_port; +} +static const char* gras_trp_tcp_peer_name(gras_socket_t s) { + gras_trp_tcp_sock_data_t sockdata = s->data; + return sockdata->peer_name; +} +static const char* gras_trp_tcp_peer_proc(gras_socket_t s) { + gras_trp_tcp_sock_data_t sockdata = s->data; + return sockdata->peer_proc; +} +static void gras_trp_tcp_peer_proc_set(gras_socket_t s,char *name) { + gras_trp_tcp_sock_data_t sockdata = s->data; + sockdata->peer_proc = xbt_strdup(name); +} + /*** *** Code ***/ -void -gras_trp_tcp_setup(gras_trp_plugin_t plug) { +void gras_trp_tcp_setup(gras_trp_plugin_t plug) +{ + + plug->my_port = gras_trp_tcp_my_port; + plug->peer_port = gras_trp_tcp_peer_port; + plug->peer_name = gras_trp_tcp_peer_name; + plug->peer_proc = gras_trp_tcp_peer_proc; + plug->peer_proc_set = gras_trp_tcp_peer_proc_set; + plug->socket_client = gras_trp_buf_socket_client; plug->socket_server = gras_trp_buf_socket_server; plug->socket_accept = gras_trp_buf_socket_accept; - plug->socket_close = gras_trp_buf_socket_close; + plug->socket_close = gras_trp_buf_socket_close; #ifdef HAVE_READV plug->send = gras_trp_iov_send; @@ -627,19 +703,22 @@ gras_trp_tcp_setup(gras_trp_plugin_t plug) { #endif plug->recv = gras_trp_buf_recv; - plug->raw_send = gras_trp_tcp_send; - plug->raw_recv = gras_trp_tcp_recv; + plug->raw_send = gras_trp_tcp_send; + plug->raw_recv = gras_trp_tcp_recv; - plug->flush = gras_trp_bufiov_flush; + plug->flush = gras_trp_bufiov_flush; plug->data = NULL; plug->exit = NULL; } void gras_trp_buf_socket_client(gras_trp_plugin_t self, - /* OUT */ gras_socket_t sock){ + const char *host, + int port, + /* OUT */ gras_socket_t sock) +{ - gras_trp_sock_socket_client(NULL,sock); + gras_trp_sock_socket_client(NULL, host,port,sock); gras_trp_buf_init_sock(sock); } @@ -649,38 +728,42 @@ void gras_trp_buf_socket_client(gras_trp_plugin_t self, * Open a socket used to receive messages. */ void gras_trp_buf_socket_server(gras_trp_plugin_t self, - /* OUT */ gras_socket_t sock){ + int port, + /* OUT */ gras_socket_t sock) +{ - gras_trp_sock_socket_server(NULL,sock); + gras_trp_sock_socket_server(NULL, port, sock); gras_trp_buf_init_sock(sock); } -gras_socket_t gras_trp_buf_socket_accept(gras_socket_t sock) { +gras_socket_t gras_trp_buf_socket_accept(gras_socket_t sock) +{ return gras_trp_buf_init_sock(gras_trp_sock_socket_accept(sock)); } -void gras_trp_buf_socket_close(gras_socket_t sock){ - gras_trp_bufdata_t *data=sock->bufdata; +void gras_trp_buf_socket_close(gras_socket_t sock) +{ + gras_trp_bufdata_t *data = sock->bufdata; - if (data->in_buf.size!=data->in_buf.pos) { - WARN3("Socket closed, but %d bytes were unread (size=%d,pos=%d)", - data->in_buf.size - data->in_buf.pos, - data->in_buf.size, data->in_buf.pos); + if (data->in_buf.size != data->in_buf.pos) { + WARN3("Socket closed, but %d bytes were unread (size=%d,pos=%d)", + data->in_buf.size - data->in_buf.pos, + data->in_buf.size, data->in_buf.pos); } if (data->in_buf.data) free(data->in_buf.data); - - if (data->out_buf.size!=data->out_buf.pos) { + + if (data->out_buf.size != data->out_buf.pos) { DEBUG2("Flush the socket before closing (in=%d,out=%d)", - data->in_buf.size, data->out_buf.size); + data->in_buf.size, data->out_buf.size); gras_trp_bufiov_flush(sock); - } + } if (data->out_buf.data) free(data->out_buf.data); #ifdef HAVE_READV if (data->in_buf_v) { - if (xbt_dynar_length(data->in_buf_v)) + if (xbt_dynar_length(data->in_buf_v)) WARN0("Socket closed, but some bytes were unread"); xbt_dynar_free(&data->in_buf_v); } @@ -706,93 +789,95 @@ void gras_trp_buf_socket_close(gras_socket_t sock){ * * getprotobyname() is not thread safe. We need to lock it. */ -static int _gras_tcp_proto_number(void) { +static int _gras_tcp_proto_number(void) +{ struct protoent *fetchedEntry; static int returnValue = 0; - - if(returnValue == 0) { + + if (returnValue == 0) { fetchedEntry = getprotobyname("tcp"); xbt_assert0(fetchedEntry, "getprotobyname(tcp) gave NULL"); returnValue = fetchedEntry->p_proto; } - + return returnValue; } #ifdef HAVE_WINSOCK_H #define RETSTR( x ) case x: return #x -const char *gras_wsa_err2string( int err ) { - switch( err ) { - RETSTR( WSAEINTR ); - RETSTR( WSAEBADF ); - RETSTR( WSAEACCES ); - RETSTR( WSAEFAULT ); - RETSTR( WSAEINVAL ); - RETSTR( WSAEMFILE ); - RETSTR( WSAEWOULDBLOCK ); - RETSTR( WSAEINPROGRESS ); - RETSTR( WSAEALREADY ); - RETSTR( WSAENOTSOCK ); - RETSTR( WSAEDESTADDRREQ ); - RETSTR( WSAEMSGSIZE ); - RETSTR( WSAEPROTOTYPE ); - RETSTR( WSAENOPROTOOPT ); - RETSTR( WSAEPROTONOSUPPORT ); - RETSTR( WSAESOCKTNOSUPPORT ); - RETSTR( WSAEOPNOTSUPP ); - RETSTR( WSAEPFNOSUPPORT ); - RETSTR( WSAEAFNOSUPPORT ); - RETSTR( WSAEADDRINUSE ); - RETSTR( WSAEADDRNOTAVAIL ); - RETSTR( WSAENETDOWN ); - RETSTR( WSAENETUNREACH ); - RETSTR( WSAENETRESET ); - RETSTR( WSAECONNABORTED ); - RETSTR( WSAECONNRESET ); - RETSTR( WSAENOBUFS ); - RETSTR( WSAEISCONN ); - RETSTR( WSAENOTCONN ); - RETSTR( WSAESHUTDOWN ); - RETSTR( WSAETOOMANYREFS ); - RETSTR( WSAETIMEDOUT ); - RETSTR( WSAECONNREFUSED ); - RETSTR( WSAELOOP ); - RETSTR( WSAENAMETOOLONG ); - RETSTR( WSAEHOSTDOWN ); - RETSTR( WSAEHOSTUNREACH ); - RETSTR( WSAENOTEMPTY ); - RETSTR( WSAEPROCLIM ); - RETSTR( WSAEUSERS ); - RETSTR( WSAEDQUOT ); - RETSTR( WSAESTALE ); - RETSTR( WSAEREMOTE ); - RETSTR( WSASYSNOTREADY ); - RETSTR( WSAVERNOTSUPPORTED ); - RETSTR( WSANOTINITIALISED ); - RETSTR( WSAEDISCON ); - +const char *gras_wsa_err2string(int err) +{ + switch (err) { + RETSTR(WSAEINTR); + RETSTR(WSAEBADF); + RETSTR(WSAEACCES); + RETSTR(WSAEFAULT); + RETSTR(WSAEINVAL); + RETSTR(WSAEMFILE); + RETSTR(WSAEWOULDBLOCK); + RETSTR(WSAEINPROGRESS); + RETSTR(WSAEALREADY); + RETSTR(WSAENOTSOCK); + RETSTR(WSAEDESTADDRREQ); + RETSTR(WSAEMSGSIZE); + RETSTR(WSAEPROTOTYPE); + RETSTR(WSAENOPROTOOPT); + RETSTR(WSAEPROTONOSUPPORT); + RETSTR(WSAESOCKTNOSUPPORT); + RETSTR(WSAEOPNOTSUPP); + RETSTR(WSAEPFNOSUPPORT); + RETSTR(WSAEAFNOSUPPORT); + RETSTR(WSAEADDRINUSE); + RETSTR(WSAEADDRNOTAVAIL); + RETSTR(WSAENETDOWN); + RETSTR(WSAENETUNREACH); + RETSTR(WSAENETRESET); + RETSTR(WSAECONNABORTED); + RETSTR(WSAECONNRESET); + RETSTR(WSAENOBUFS); + RETSTR(WSAEISCONN); + RETSTR(WSAENOTCONN); + RETSTR(WSAESHUTDOWN); + RETSTR(WSAETOOMANYREFS); + RETSTR(WSAETIMEDOUT); + RETSTR(WSAECONNREFUSED); + RETSTR(WSAELOOP); + RETSTR(WSAENAMETOOLONG); + RETSTR(WSAEHOSTDOWN); + RETSTR(WSAEHOSTUNREACH); + RETSTR(WSAENOTEMPTY); + RETSTR(WSAEPROCLIM); + RETSTR(WSAEUSERS); + RETSTR(WSAEDQUOT); + RETSTR(WSAESTALE); + RETSTR(WSAEREMOTE); + RETSTR(WSASYSNOTREADY); + RETSTR(WSAVERNOTSUPPORTED); + RETSTR(WSANOTINITIALISED); + RETSTR(WSAEDISCON); + #ifdef HAVE_WINSOCK2 - RETSTR( WSAENOMORE ); - RETSTR( WSAECANCELLED ); - RETSTR( WSAEINVALIDPROCTABLE ); - RETSTR( WSAEINVALIDPROVIDER ); - RETSTR( WSASYSCALLFAILURE ); - RETSTR( WSASERVICE_NOT_FOUND ); - RETSTR( WSATYPE_NOT_FOUND ); - RETSTR( WSA_E_NO_MORE ); - RETSTR( WSA_E_CANCELLED ); - RETSTR( WSAEREFUSED ); -#endif /* HAVE_WINSOCK2 */ - - RETSTR( WSAHOST_NOT_FOUND ); - RETSTR( WSATRY_AGAIN ); - RETSTR( WSANO_RECOVERY ); - RETSTR( WSANO_DATA ); - } - return "unknown WSA error"; + RETSTR(WSAENOMORE); + RETSTR(WSAECANCELLED); + RETSTR(WSAEINVALIDPROCTABLE); + RETSTR(WSAEINVALIDPROVIDER); + RETSTR(WSASYSCALLFAILURE); + RETSTR(WSASERVICE_NOT_FOUND); + RETSTR(WSATYPE_NOT_FOUND); + RETSTR(WSA_E_NO_MORE); + RETSTR(WSA_E_CANCELLED); + RETSTR(WSAEREFUSED); +#endif /* HAVE_WINSOCK2 */ + + RETSTR(WSAHOST_NOT_FOUND); + RETSTR(WSATRY_AGAIN); + RETSTR(WSANO_RECOVERY); + RETSTR(WSANO_DATA); + } + return "unknown WSA error"; } -#endif /* HAVE_WINSOCK_H */ +#endif /* HAVE_WINSOCK_H */ /***********************************/ /****[ end of HELPER FUNCTIONS ]****/