X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/6511b78ff810ead55a110d42b01a08255a55b56d..2f63dce7911dd967b5fd4acadc46a6010c8c0662:/src/gras/Transport/transport_plugin_tcp.c diff --git a/src/gras/Transport/transport_plugin_tcp.c b/src/gras/Transport/transport_plugin_tcp.c index 3d0cd1625a..4b3fa1e0ec 100644 --- a/src/gras/Transport/transport_plugin_tcp.c +++ b/src/gras/Transport/transport_plugin_tcp.c @@ -26,10 +26,6 @@ GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport); -typedef struct { - int buffsize; -} gras_trp_tcp_sock_specific_t; - /*** *** Prototypes ***/ @@ -46,14 +42,14 @@ gras_error_t gras_trp_tcp_socket_accept(gras_socket_t *sock, void gras_trp_tcp_socket_close(gras_socket_t *sd); gras_error_t gras_trp_tcp_chunk_send(gras_socket_t *sd, - char *data, - size_t size); + const char *data, + long int size); gras_error_t gras_trp_tcp_chunk_recv(gras_socket_t *sd, char *data, - size_t size); + long int size); -void gras_trp_tcp_free_specific(void *s); +void gras_trp_tcp_exit(gras_trp_plugin_t *plug); static int TcpProtoNumber(void); @@ -62,46 +58,51 @@ static int TcpProtoNumber(void); ***/ typedef struct { - fd_set incoming_socks; -} gras_trp_tcp_specific_t; + fd_set msg_socks; + fd_set raw_socks; +} gras_trp_tcp_plug_data_t; /*** *** Specific socket part ***/ +typedef struct { + int buffsize; +} gras_trp_tcp_sock_data_t; + /*** *** Code ***/ gras_error_t -gras_trp_tcp_init(gras_trp_plugin_t **dst) { +gras_trp_tcp_setup(gras_trp_plugin_t *plug) { - gras_trp_plugin_t *res=malloc(sizeof(gras_trp_plugin_t)); - gras_trp_tcp_specific_t *tcp = malloc(sizeof(gras_trp_tcp_specific_t)); - if (!res || !tcp) + gras_trp_tcp_plug_data_t *data = malloc(sizeof(gras_trp_tcp_plug_data_t)); + if (!data) RAISE_MALLOC; - FD_ZERO(&(tcp->incoming_socks)); + FD_ZERO(&(data->msg_socks)); + FD_ZERO(&(data->raw_socks)); + + plug->socket_client = gras_trp_tcp_socket_client; + plug->socket_server = gras_trp_tcp_socket_server; + plug->socket_accept = gras_trp_tcp_socket_accept; + plug->socket_close = gras_trp_tcp_socket_close; - res->name = strdup("TCP"); - res->socket_client = gras_trp_tcp_socket_client; - res->socket_server = gras_trp_tcp_socket_server; - res->socket_accept = gras_trp_tcp_socket_accept; - res->socket_close = gras_trp_tcp_socket_close; + plug->chunk_send = gras_trp_tcp_chunk_send; + plug->chunk_recv = gras_trp_tcp_chunk_recv; - res->chunk_send = gras_trp_tcp_chunk_send; - res->chunk_recv = gras_trp_tcp_chunk_recv; + plug->flush = NULL; /* nothing's cached */ - res->specific = (void*)tcp; - res->free_specific = gras_trp_tcp_free_specific; + plug->data = (void*)data; + plug->exit = gras_trp_tcp_exit; - *dst = res; return no_error; } -void gras_trp_tcp_free_specific(void *s) { - gras_trp_tcp_specific_t *specific = s; - free(specific); +void gras_trp_tcp_exit(gras_trp_plugin_t *plug) { + DEBUG1("Exit plugin TCP (free %p)", plug->data); + free(plug->data); } gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self, @@ -159,7 +160,7 @@ gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self, int on = 1; struct sockaddr_in server; - gras_trp_tcp_specific_t *data=(gras_trp_tcp_specific_t*)self -> specific; + gras_trp_tcp_plug_data_t *tcp=(gras_trp_tcp_plug_data_t*)self->data; sock->outgoing = 1; /* TCP => duplex mode */ @@ -186,7 +187,10 @@ gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self, RAISE2(system_error,"Cannot listen to port %d: %s",port,strerror(errno)); } - FD_SET(sock->sd, &(data->incoming_socks)); + if (sock->raw) + FD_SET(sock->sd, &(tcp->raw_socks)); + else + FD_SET(sock->sd, &(tcp->msg_socks)); DEBUG2("Openned a server socket on port %d (sock %d)",port,sock->sd); @@ -197,22 +201,21 @@ gras_error_t gras_trp_tcp_socket_accept(gras_socket_t *sock, gras_socket_t **dst) { gras_socket_t *res; + gras_error_t errcode; struct sockaddr_in peer_in; socklen_t peer_in_len = sizeof(peer_in); int sd; int tmp_errno; - - res=malloc(sizeof(gras_socket_t)); - if (!res) - RAISE_MALLOC; + + TRY(gras_trp_socket_new(1,&res)); sd = accept(sock->sd, (struct sockaddr *)&peer_in, &peer_in_len); tmp_errno = errno; if(sd == -1) { - gras_socket_close(&sock); + gras_socket_close(sock); RAISE1(system_error, "Accept failed (%s). Droping server socket.", strerror(tmp_errno)); } else { @@ -221,16 +224,16 @@ gras_trp_tcp_socket_accept(gras_socket_t *sock, if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) { - WARNING0("setsockopt failed, cannot condition the accepted socket"); + WARN0("setsockopt failed, cannot condition the accepted socket"); } - /* FIXME: bufSize removed until we can have optionsets - i = ((gras_trp_tcp_sock_specific_t*)sock->specific)->buffsize; - if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&i, s) - || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&i, s)) { - WARNING0("setsockopt failed, cannot set buffsize"); - } - */ + /* FIXME: bufSize removed until we can have optionsets + i = ((gras_trp_tcp_sock_specific_t*)sock->specific)->buffsize; + if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&i, s) + || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&i, s)) { + WARNING0("setsockopt failed, cannot set buffsize"); + } + */ res->plugin = sock->plugin; res->incoming = sock->incoming; @@ -257,7 +260,7 @@ gras_trp_tcp_socket_accept(gras_socket_t *sock, } } - VERB3("accepted socket %d to %s:%d\n", sd, res->peer_name,res->peer_port); + VERB3("accepted socket %d to %s:%d", sd, res->peer_name,res->peer_port); *dst = res; @@ -266,12 +269,12 @@ gras_trp_tcp_socket_accept(gras_socket_t *sock, } void gras_trp_tcp_socket_close(gras_socket_t *sock){ - gras_trp_tcp_specific_t *tcp; + gras_trp_tcp_plug_data_t *tcp; if (!sock) return; /* close only once */ - tcp=sock->plugin->specific; + tcp=sock->plugin->data; - DEBUG1("close tcp connection %d\n", sock->sd); + DEBUG1("close tcp connection %d", sock->sd); /* FIXME: no pipe in GRAS so far if(!FD_ISSET(sd, &connectedPipes)) { @@ -288,11 +291,14 @@ void gras_trp_tcp_socket_close(gras_socket_t *sock){ } */ /* forget about the socket */ - FD_CLR(sock->sd, &(tcp->incoming_socks)); + if (sock->raw) + FD_CLR(sock->sd, &(tcp->raw_socks)); + else + FD_CLR(sock->sd, &(tcp->msg_socks)); /* close the socket */ if(close(sock->sd) < 0) { - WARNING3("error while closing tcp socket %d: %d (%s)\n", + WARN3("error while closing tcp socket %d: %d (%s)\n", sock->sd, errno, strerror(errno)); } } @@ -304,8 +310,8 @@ void gras_trp_tcp_socket_close(gras_socket_t *sock){ */ gras_error_t gras_trp_tcp_chunk_send(gras_socket_t *sock, - char *data, - size_t size) { + const char *data, + long int size) { /* TCP sockets are in duplex mode, don't check direction */ gras_assert0(size >= 0, "Cannot send a negative amount of data"); @@ -314,11 +320,11 @@ gras_trp_tcp_chunk_send(gras_socket_t *sock, int status = 0; status = write(sock->sd, data, (size_t)size); - DEBUG3("write(%d, %p, %ld);\n", sock->sd, data, size); + DEBUG3("write(%d, %p, %ld);", sock->sd, data, size); if (status == -1) { - RAISE4(system_error,"write(%d,%p,%d) failed: %s", - sock->sd, data, (int)size, + RAISE4(system_error,"write(%d,%p,%ld) failed: %s", + sock->sd, data, size, strerror(errno)); } @@ -340,7 +346,7 @@ gras_trp_tcp_chunk_send(gras_socket_t *sock, gras_error_t gras_trp_tcp_chunk_recv(gras_socket_t *sock, char *data, - size_t size) { + long int size) { /* TCP sockets are in duplex mode, don't check direction */ gras_assert0(sock, "Cannot recv on an NULL socket"); @@ -350,7 +356,7 @@ gras_trp_tcp_chunk_recv(gras_socket_t *sock, int status = 0; status = read(sock->sd, data, (size_t)size); - DEBUG3("read(%d, %p, %ld);\n", sock->sd, data, size); + DEBUG3("read(%d, %p, %ld);", sock->sd, data, size); if (status == -1) { RAISE4(system_error,"read(%d,%p,%d) failed: %s",