X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/93413c0a2359997287c5051024cdd74266bd5c55..fbae123576a9e383ca4eb0ff2c6d78278f94b233:/src/gras/Transport/transport_plugin_tcp.c diff --git a/src/gras/Transport/transport_plugin_tcp.c b/src/gras/Transport/transport_plugin_tcp.c index 988b6e7a9f..189c2cfcc6 100644 --- a/src/gras/Transport/transport_plugin_tcp.c +++ b/src/gras/Transport/transport_plugin_tcp.c @@ -30,11 +30,8 @@ GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport,"TCP transport"); *** Prototypes ***/ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self, - const char *host, - unsigned short port, /* OUT */ gras_socket_t *sock); gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self, - unsigned short port, /* OUT */ gras_socket_t *sock); gras_error_t gras_trp_tcp_socket_accept(gras_socket_t *sock, gras_socket_t **dst); @@ -106,13 +103,12 @@ void gras_trp_tcp_exit(gras_trp_plugin_t *plug) { } gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self, - const char *host, - unsigned short port, /* OUT */ gras_socket_t *sock){ struct sockaddr_in addr; struct hostent *he; struct in_addr *haddr; + int size = sock->bufSize * 1024; sock->incoming = 1; /* TCP sockets are duplex'ed */ @@ -123,12 +119,18 @@ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self, "Failed to create socket: %s", strerror (errno)); } + + if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) || + setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) { + WARN1("setsockopt failed, cannot set buffer size: %s", + strerror(errno)); + } - he = gethostbyname (host); + he = gethostbyname (sock->peer_name); if (he == NULL) { RAISE2(system_error, "Failed to lookup hostname %s: %s", - host, strerror (errno)); + sock->peer_name, strerror (errno)); } haddr = ((struct in_addr *) (he->h_addr_list)[0]); @@ -136,13 +138,13 @@ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self, memset(&addr, 0, sizeof(struct sockaddr_in)); memcpy (&addr.sin_addr, haddr, sizeof(struct in_addr)); addr.sin_family = AF_INET; - addr.sin_port = htons (port); + addr.sin_port = htons (sock->peer_port); if (connect (sock->sd, (struct sockaddr*) &addr, sizeof (addr)) < 0) { close(sock->sd); RAISE3(system_error, "Failed to connect socket to %s:%d (%s)", - host, port, strerror (errno)); + sock->peer_name, sock->peer_port, strerror (errno)); } return no_error; @@ -154,9 +156,8 @@ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self, * Open a socket used to receive messages. */ gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self, - unsigned short port, /* OUT */ gras_socket_t *sock){ -/* int size = bufSize * 1024; */ + int size = sock->bufSize * 1024; int on = 1; struct sockaddr_in server; @@ -164,27 +165,32 @@ gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self, sock->outgoing = 1; /* TCP => duplex mode */ - server.sin_port = htons((u_short)port); + server.sin_port = htons((u_short)sock->port); server.sin_addr.s_addr = INADDR_ANY; server.sin_family = AF_INET; if((sock->sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - RAISE1(system_error,"socket allocation failed: %s", strerror(errno)); + RAISE1(system_error,"Socket allocation failed: %s", strerror(errno)); } - (void)setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, - (char *)&on, sizeof(on)); - /* - (void)setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)); - (void)setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size)); - */ + if (setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on))) { + RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s", + strerror(errno)); + } + + if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) || + setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) { + WARN1("setsockopt failed, cannot set buffer size: %s", + strerror(errno)); + } + if (bind(sock->sd, (struct sockaddr *)&server, sizeof(server)) == -1) { close(sock->sd); - RAISE2(system_error,"Cannot bind to port %d: %s",port, strerror(errno)); + RAISE2(system_error,"Cannot bind to port %d: %s",sock->port, strerror(errno)); } if (listen(sock->sd, 5) < 0) { close(sock->sd); - RAISE2(system_error,"Cannot listen to port %d: %s",port,strerror(errno)); + RAISE2(system_error,"Cannot listen to port %d: %s",sock->port,strerror(errno)); } if (sock->raw) @@ -192,7 +198,7 @@ gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self, else FD_SET(sock->sd, &(tcp->msg_socks)); - DEBUG2("Openned a server socket on port %d (sock %d)",port,sock->sd); + DEBUG2("Openned a server socket on port %d (sock %d)",sock->port,sock->sd); return no_error; } @@ -208,6 +214,7 @@ gras_trp_tcp_socket_accept(gras_socket_t *sock, int sd; int tmp_errno; + int size; TRY(gras_trp_socket_new(1,&res)); @@ -224,16 +231,17 @@ gras_trp_tcp_socket_accept(gras_socket_t *sock, if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) { - WARN0("setsockopt failed, cannot condition the accepted socket"); + RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s", + strerror(errno)); } - /* FIXME: bufSize removed until we can have optionsets - i = ((gras_trp_tcp_sock_specific_t*)sock->specific)->buffsize; - if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&i, s) - || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&i, s)) { - WARNING0("setsockopt failed, cannot set buffsize"); - } - */ + (*dst)->bufSize = sock->bufSize; + size = sock->bufSize * 1024; + if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) + || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) { + WARN1("setsockopt failed, cannot set buffer size: %s", + strerror(errno)); + } res->plugin = sock->plugin; res->incoming = sock->incoming; @@ -245,7 +253,7 @@ gras_trp_tcp_socket_accept(gras_socket_t *sock, /* FIXME: Lock to protect inet_ntoa */ if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) { - res->peer_name = strdup("unknown"); + res->peer_name = (char*)strdup("unknown"); } else { struct in_addr addrAsInAddr; char *tmp; @@ -254,9 +262,9 @@ gras_trp_tcp_socket_accept(gras_socket_t *sock, tmp = inet_ntoa(addrAsInAddr); if (tmp != NULL) { - res->peer_name = strdup(tmp); + res->peer_name = (char*)strdup(tmp); } else { - res->peer_name = strdup("unknown"); + res->peer_name = (char*)strdup("unknown"); } } @@ -393,3 +401,52 @@ static int TcpProtoNumber(void) { return returnValue; } + +/* Data exchange over raw sockets. Placing this in there is a kind of crude hack. + It means that the only possible raw are TCP where we may want to do UDP for them. + But I fail to find a good internal organization for now. We may want to split + raw and regular sockets more efficiently. +*/ +gras_error_t gras_socket_raw_exchange(gras_socket_t *peer, + int sender, + unsigned int timeout, + unsigned long int exp_size, + unsigned long int msg_size) { + char *chunk; + int res_last, msg_sofar, exp_sofar; + + fd_set rd_set; + int rv; + + struct timeval timeOut; + + if (!(chunk = (char *)gras_malloc(msg_size))) + RAISE_MALLOC; + + for (exp_sofar=0; exp_sofar < exp_size; exp_size += msg_sofar) { + for(msg_sofar=0; msg_sofar < msg_size; msg_size += res_last) { + + if(sender) { + res_last = send(peer->sd, chunk, msg_size - msg_sofar, 0); + } else { + res_last = 0; + FD_ZERO(&rd_set); + FD_SET(peer->sd,&rd_set); + timeOut.tv_sec = timeout; + timeOut.tv_usec = 0; + + if (0 < select(peer->sd+1,&rd_set,NULL,NULL,&timeOut)) + res_last = recv(peer->sd, chunk, msg_size-msg_sofar, 0); + + } + if (res_last == 0) { + /* No progress done, bail out */ + gras_free(chunk); + RAISE0(unknown_error,"Not exchanged a single byte, bailing out"); + } + } + } + + gras_free(chunk); + return no_error; +}