X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/a7729514e277aabe9422a2626541f889ad1af08b..31d626536ff0cdfa875075e092bf8c86a5e43710:/src/gras/Transport/transport_tcp.c diff --git a/src/gras/Transport/transport_tcp.c b/src/gras/Transport/transport_tcp.c index 967e907413..18eb5fb227 100644 --- a/src/gras/Transport/transport_tcp.c +++ b/src/gras/Transport/transport_tcp.c @@ -8,71 +8,373 @@ /* This program is free software; you can redistribute it and/or modify it under the terms of the license (GNU LGPL) which comes with this package. */ +#include /* close() pipe() read() write() */ +#include /* close() pipe() read() write() */ +#include /* sometimes required for #include */ +#include /* TCP_NODELAY */ +#include /* inet_ntoa() */ +#include /* getprotobyname() */ +#include /* struct timeval */ +#include /* errno */ +#include /* waitpid() */ +#include /* getpeername() socket() */ +#include + + #include "gras_private.h" #include "transport_private.h" GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport); typedef struct { - int dummy; + int buffsize; +} gras_trp_tcp_sock_specific_t; + +/*** + *** Prototypes + ***/ +gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self, + const char *host, + unsigned short port, + unsigned int bufSize, + /* OUT */ gras_socket_t **dst); +gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self, + unsigned short port, + unsigned int bufSize, + /* OUT */ gras_socket_t **dst); +gras_error_t gras_trp_tcp_socket_accept(gras_socket_t *sock, + gras_socket_t **dst); + +void gras_trp_tcp_socket_close(gras_socket_t *sd); + +gras_error_t gras_trp_tcp_bloc_send(gras_socket_t *sd, + char *data, + size_t size); + +gras_error_t gras_trp_tcp_bloc_recv(gras_socket_t *sd, + char *data, + size_t size); + +void gras_trp_tcp_free_specific(void *s); + + +static int TcpProtoNumber(void); +/*** + *** Specific plugin part + ***/ + +typedef struct { + fd_set incoming_socks; } gras_trp_tcp_specific_t; +/*** + *** Specific socket part + ***/ + + +/*** + *** Code + ***/ gras_error_t -gras_trp_tcp_init(void) { +gras_trp_tcp_init(gras_trp_plugin_t **dst) { - gras_trp_tcp_specific_t *specific = malloc(sizeof(gras_trp_tcp_specific_t)); - if (!specific) + gras_trp_plugin_t *res=malloc(sizeof(gras_trp_plugin_t)); + gras_trp_tcp_specific_t *tcp = malloc(sizeof(gras_trp_tcp_specific_t)); + if (!res || !tcp) RAISE_MALLOC; + FD_ZERO(&(tcp->incoming_socks)); + + res->socket_client = gras_trp_tcp_socket_client; + res->socket_server = gras_trp_tcp_socket_server; + res->socket_accept = gras_trp_tcp_socket_accept; + res->socket_close = gras_trp_tcp_socket_close; + + res->bloc_send = gras_trp_tcp_bloc_send; + res->bloc_recv = gras_trp_tcp_bloc_recv; + + res->specific = (void*)tcp; + res->free_specific = gras_trp_tcp_free_specific; + + *dst = res; return no_error; } -void -gras_trp_tcp_exit(gras_trp_plugin_t *plugin) { - gras_trp_tcp_specific_t *specific = (gras_trp_tcp_specific_t*)plugin->specific; +void gras_trp_tcp_free_specific(void *s) { + gras_trp_tcp_specific_t *specific = s; free(specific); } -gras_error_t gras_trp_tcp_socket_client(const char *host, +gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self, + const char *host, unsigned short port, - int raw, unsigned int bufSize, - /* OUT */ gras_trp_sock_t **dst){ + /* OUT */ gras_socket_t **dst){ + /* + int addrCount; + IPAddress addresses[10]; + int i; + int sd; + + if (!(*sock=malloc(sizeof(gras_socket_t)))) { + fprintf(stderr,"Malloc error\n"); + return malloc_error; + } + (*sock)->peer_addr=NULL; + + if (!(addrCount = IPAddressValues(host, addresses, 10))) { + fprintf(stderr,"grasOpenClientSocket: address retrieval of '%s' failed\n",host); + return system_error; + } + + for(i = 0; i < addrCount && i<10 ; i++) { + if(CallAddr(addresses[i], port, &sd, -1)) { + (*sock)->sock = sd; + (*sock)->port = port; + return no_error; + } + } + free(*sock); + fprintf(stderr,"grasOpenClientSocket: something wicked happenned while connecting to %s:%d", + host,port); + return system_error; + */ RAISE_UNIMPLEMENTED; } -gras_error_t gras_trp_tcp_socket_server(unsigned short port, - int raw, +/** + * gras_trp_tcp_socket_server: + * + * Open a socket used to receive messages. bufSize is in ko. + */ +gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self, + unsigned short port, unsigned int bufSize, - /* OUT */ gras_trp_sock_t **dst){ - RAISE_UNIMPLEMENTED; -} + /* OUT */ gras_socket_t **dst){ + int size = bufSize * 1024; + int on = 1; + int sd = -1; + struct sockaddr_in server; + + gras_socket_t *res; + gras_trp_tcp_specific_t *data=(gras_trp_tcp_specific_t*)self -> specific; + + res=malloc(sizeof(gras_socket_t)); + if (!res) + RAISE_MALLOC; + + server.sin_port = htons((u_short)port); + server.sin_addr.s_addr = INADDR_ANY; + server.sin_family = AF_INET; + if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + free(res); + RAISE0(system_error,"socket allocation failed"); + } + + (void)setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on)); + (void)setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)); + (void)setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size)); + if (bind(sd, (struct sockaddr *)&server, sizeof(server)) == -1) { + free(res); + close(sd); + RAISE1(system_error,"Cannot bind to port %d",port); + } + + if (listen(sd, 5) != -1) { + free(res); + close(sd); + RAISE1(system_error,"Cannot listen to port %d",port); + } + + FD_SET(sd, &(data->incoming_socks)); + + *dst=res; + res->plugin = self; + res->incoming = 1; + res->sd = sd; + res->port=port; + res->peer_port=-1; + res->peer_name=NULL; -void gras_trp_tcp_socket_close(gras_trp_sock_t **sd){ - ERROR1("%s not implemented",__FUNCTION__); - abort(); + DEBUG2("Openned a server socket on port %d (sock %d)",port,sd); + + return no_error; } -gras_error_t gras_trp_tcp_select(double timeOut, - gras_trp_sock_t **sd){ - RAISE_UNIMPLEMENTED; +gras_error_t +gras_trp_tcp_socket_accept(gras_socket_t *sock, + gras_socket_t **dst) { + gras_socket_t *res; + + struct sockaddr_in peer_in; + socklen_t peer_in_len = sizeof(peer_in); + + int sd; + int tmp_errno; + + res=malloc(sizeof(gras_socket_t)); + if (!res) + RAISE_MALLOC; + + sd = accept(sock->sd, (struct sockaddr *)&peer_in, &peer_in_len); + tmp_errno = errno; + + if(sd == -1) { + gras_socket_close(sock); + RAISE1(system_error, + "Accept failed (%s). Droping server socket.", strerror(tmp_errno)); + } else { + int i = 1; + socklen_t s = sizeof(int); + + if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) + || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) { + WARNING0("setsockopt failed, cannot condition the accepted socket"); + } + + i = ((gras_trp_tcp_sock_specific_t*)sock->specific)->buffsize; + if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&i, s) + || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&i, s)) { + WARNING0("setsockopt failed, cannot set buffsize"); + } + + res->plugin = sock->plugin; + res->incoming = 1; + res->sd = sd; + res->port= -1; + res->peer_port= peer_in.sin_port; + + if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) { + res->peer_name = strdup("unknown"); + } else { + struct in_addr addrAsInAddr; + char *tmp; + + addrAsInAddr.s_addr = peer_in.sin_addr.s_addr; + + tmp = inet_ntoa(addrAsInAddr); + if (tmp != NULL) { + res->peer_name = strdup(inet_ntoa(addrAsInAddr)); + } else { + res->peer_name = strdup("unknown"); + } + } + + VERB3("accepted socket %d to %s:%d\n", sd, res->peer_name,res->peer_port); + + *dst = res; + + return no_error; + } } + +void gras_trp_tcp_socket_close(gras_socket_t *sock){ + gras_trp_tcp_specific_t *tcp; -gras_error_t gras_trp_tcp_bloc_send(gras_trp_sock_t *sd, - void *data, - size_t size, - double timeOut){ - RAISE_UNIMPLEMENTED; + if (!sock) return; /* close only once */ + tcp=sock->plugin->specific; + + DEBUG1("close tcp connection %d\n", sock->sd); + + /* FIXME: no pipe in GRAS so far + if(!FD_ISSET(sd, &connectedPipes)) { + if(shutdown(sd, 2) < 0) { + GetNWSLock(&lock); + tmp_errno = errno; + ReleaseNWSLock(&lock); + + / * The other side may have beaten us to the reset. * / + if ((tmp_errno!=ENOTCONN) && (tmp_errno!=ECONNRESET)) { + WARN1("CloseSocket: shutdown error %d\n", tmp_errno); + } + } + } */ + + /* close the socket */ + if(close(sock->sd) < 0) { + WARNING3("error while closing tcp socket %d: %d (%s)\n", sock->sd, errno, strerror(errno)); + } + + /* forget about it */ + FD_CLR(sock->sd, &(tcp->incoming_socks)); + } -gras_error_t gras_trp_tcp_bloc_recv(gras_trp_sock_t *sd, - void *data, - size_t size, - double timeOut){ - RAISE_UNIMPLEMENTED; +gras_error_t gras_trp_tcp_bloc_send(gras_socket_t *sock, + char *data, + size_t size){ + + gras_assert0(sock && !sock->incoming, "Ascked to send stuff on an incomming socket"); + gras_assert0(size >= 0, "Cannot send a negative amount of data"); + + while (size) { + int status = 0; + + status = write(sock->sd, data, (size_t)size); + DEBUG3("write(%d, %p, %ld);\n", sock->sd, data, size); + + if (status == -1) { + RAISE4(system_error,"write(%d,%p,%d) failed: %s", + sock->sd, data, (int)size, + strerror(errno)); + } + + if (status) { + size -= status; + data += status; + } else { + RAISE0(system_error,"file descriptor closed"); + } + } + + return no_error; } -gras_error_t gras_trp_tcp_flush(gras_trp_sock_t *sd){ - RAISE_UNIMPLEMENTED; +gras_error_t gras_trp_tcp_bloc_recv(gras_socket_t *sock, + char *data, + size_t size){ + + gras_assert0(sock && !sock->incoming, "Ascked to receive stuff on an outcomming socket"); + gras_assert0(size >= 0, "Cannot receive a negative amount of data"); + + while (size) { + int status = 0; + + status = read(sock->sd, data, (size_t)size); + DEBUG3("read(%d, %p, %ld);\n", sock->sd, data, size); + + if (status == -1) { + RAISE4(system_error,"read(%d,%p,%d) failed: %s", + sock->sd, data, (int)size, + strerror(errno)); + } + + if (status) { + size -= status; + data += status; + } else { + RAISE0(system_error,"file descriptor closed"); + } + } + + return no_error; } + +/* + * Returns the tcp protocol number from the network protocol data base. + * + * getprotobyname() is not thread safe. We need to lock it. + */ +static int TcpProtoNumber(void) { + struct protoent *fetchedEntry; + static int returnValue = 0; + + if(returnValue == 0) { + fetchedEntry = getprotobyname("tcp"); + gras_assert0(fetchedEntry, "getprotobyname(tcp) gave NULL"); + returnValue = fetchedEntry->p_proto; + } + + return returnValue; +}