From: mquinson Date: Wed, 29 Sep 2004 09:28:42 +0000 (+0000) Subject: Reintroduce raw sockets. X-Git-Tag: v3.3~4913 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/fbae123576a9e383ca4eb0ff2c6d78278f94b233 Reintroduce raw sockets. Created by gras_socket_{client,server}_ext;used with gras_raw_{send,recv} It should allow to kill the last bits of gras first version. This is not completely satisfactory yet (dupplicate code with chunk_{send,recv}; a bit out of the plugin mecanism), but it should work. Simplify transport plugin interface by not passing any argument to _server and _client, but embeeding them in the socket struct directly. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@437 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/src/gras/Transport/sg_transport.c b/src/gras/Transport/sg_transport.c index 93dc5c798d..b79eafc645 100644 --- a/src/gras/Transport/sg_transport.c +++ b/src/gras/Transport/sg_transport.c @@ -137,7 +137,7 @@ gras_trp_select(double timeout, MSG_host_get_name(MSG_host_self())); */ /* MSG_process_sleep(1); */ - MSG_process_sleep(0.01); + MSG_process_sleep(0.001); } } while (gras_os_time()-startTime < timeout || MSG_task_Iprobe((m_channel_t) pd->chan)); @@ -155,3 +155,6 @@ gras_error_t gras_trp_tcp_setup(gras_trp_plugin_t *plug) { gras_error_t gras_trp_file_setup(gras_trp_plugin_t *plug) { return mismatch_error; } + + + diff --git a/src/gras/Transport/transport.c b/src/gras/Transport/transport.c index 252efee019..d5d62b4d4c 100644 --- a/src/gras/Transport/transport.c +++ b/src/gras/Transport/transport.c @@ -130,14 +130,18 @@ gras_error_t gras_trp_socket_new(int incoming, /** - * gras_socket_server: + * gras_socket_server_ext: * * Opens a server socket and make it ready to be listened to. * In real life, you'll get a TCP socket. */ gras_error_t -gras_socket_server(unsigned short port, - /* OUT */ gras_socket_t **dst) { +gras_socket_server_ext(unsigned short port, + + unsigned long int bufSize, + int raw, + + /* OUT */ gras_socket_t **dst) { gras_error_t errcode; gras_trp_plugin_t *trp; @@ -152,9 +156,11 @@ gras_socket_server(unsigned short port, TRY(gras_trp_socket_new(1,&sock)); sock->plugin= trp; sock->port=port; + sock->bufSize = bufSize; + sock->raw = raw; /* Call plugin socket creation function */ - errcode = trp->socket_server(trp, port, sock); + errcode = trp->socket_server(trp, sock); DEBUG3("in=%c out=%c accept=%c", sock->incoming?'y':'n', sock->outgoing?'y':'n', @@ -169,17 +175,21 @@ gras_socket_server(unsigned short port, return no_error; } - + /** - * gras_socket_client: + * gras_socket_client_ext: * * Opens a client socket to a remote host. * In real life, you'll get a TCP socket. */ gras_error_t -gras_socket_client(const char *host, - unsigned short port, - /* OUT */ gras_socket_t **dst) { +gras_socket_client_ext(const char *host, + unsigned short port, + + unsigned long int bufSize, + int raw, + + /* OUT */ gras_socket_t **dst) { gras_error_t errcode; gras_trp_plugin_t *trp; @@ -195,11 +205,11 @@ gras_socket_client(const char *host, sock->plugin= trp; sock->peer_port = port; sock->peer_name = (char*)strdup(host?host:"localhost"); + sock->bufSize = bufSize; + sock->raw = raw; /* plugin-specific */ - errcode= (*trp->socket_client)(trp, - host ? host : "localhost", port, - sock); + errcode= (*trp->socket_client)(trp, sock); DEBUG3("in=%c out=%c accept=%c", sock->incoming?'y':'n', sock->outgoing?'y':'n', @@ -215,6 +225,32 @@ gras_socket_client(const char *host, return no_error; } +/** + * gras_socket_server: + * + * Opens a server socket and make it ready to be listened to. + * In real life, you'll get a TCP socket. + */ +gras_error_t +gras_socket_server(unsigned short port, + /* OUT */ gras_socket_t **dst) { + return gras_socket_server_ext(port,32,0,dst); +} + +/** + * gras_socket_client: + * + * Opens a client socket to a remote host. + * In real life, you'll get a TCP socket. + */ +gras_error_t +gras_socket_client(const char *host, + unsigned short port, + /* OUT */ gras_socket_t **dst) { + return gras_socket_client_ext(host,port,32,0,dst); +} + + void gras_socket_close(gras_socket_t *sock) { gras_dynar_t *sockets = gras_socketset_get(); gras_socket_t *sock_iter; @@ -299,3 +335,21 @@ int gras_socket_peer_port(gras_socket_t *sock) { char *gras_socket_peer_name(gras_socket_t *sock) { return sock->peer_name; } + +gras_error_t gras_socket_raw_send(gras_socket_t *peer, + unsigned int timeout, + unsigned long int expSize, + unsigned long int msgSize) { + + gras_assert0(peer->raw,"Asked to send raw data on a regular socket\n"); + return gras_socket_raw_exchange(peer,1,timeout,expSize,msgSize); +} + +gras_error_t gras_socket_raw_recv(gras_socket_t *peer, + unsigned int timeout, + unsigned long int expSize, + unsigned long int msgSize){ + + gras_assert0(peer->raw,"Asked to recveive raw data on a regular socket\n"); + return gras_socket_raw_exchange(peer,0,timeout,expSize,msgSize); +} diff --git a/src/gras/Transport/transport_interface.h b/src/gras/Transport/transport_interface.h index 78421a5ba4..63e3429407 100644 --- a/src/gras/Transport/transport_interface.h +++ b/src/gras/Transport/transport_interface.h @@ -41,14 +41,12 @@ typedef struct gras_trp_plugin_ gras_trp_plugin_t; struct gras_trp_plugin_ { char *name; - /* dst pointers are created and initialized with default values - before call to socket_client/server*/ + /* dst pointers are created and initialized with default values + before call to socket_client/server. + Retrive the info you need from there. */ gras_error_t (*socket_client)(gras_trp_plugin_t *self, - const char *host, - unsigned short port, /* OUT */ gras_socket_t *dst); gras_error_t (*socket_server)(gras_trp_plugin_t *self, - unsigned short port, /* OUT */ gras_socket_t *dst); gras_error_t (*socket_accept)(gras_socket_t *sock, diff --git a/src/gras/Transport/transport_plugin_buf.c b/src/gras/Transport/transport_plugin_buf.c index 8ed2e025b5..46638bec97 100644 --- a/src/gras/Transport/transport_plugin_buf.c +++ b/src/gras/Transport/transport_plugin_buf.c @@ -22,11 +22,8 @@ GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_buf,transport, *** Prototypes ***/ gras_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self, - const char *host, - unsigned short port, /* OUT */ gras_socket_t *sock); gras_error_t gras_trp_buf_socket_server(gras_trp_plugin_t *self, - unsigned short port, /* OUT */ gras_socket_t *sock); gras_error_t gras_trp_buf_socket_accept(gras_socket_t *sock, gras_socket_t **dst); @@ -120,14 +117,12 @@ gras_trp_buf_setup(gras_trp_plugin_t *plug) { } gras_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self, - const char *host, - unsigned short port, /* OUT */ gras_socket_t *sock){ gras_error_t errcode; gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)self->data)->super; GRAS_IN; - TRY(super->socket_client(super,host,port,sock)); + TRY(super->socket_client(super,sock)); sock->plugin = self; TRY(gras_trp_buf_init_sock(sock)); @@ -140,13 +135,12 @@ gras_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self, * Open a socket used to receive messages. */ gras_error_t gras_trp_buf_socket_server(gras_trp_plugin_t *self, - unsigned short port, /* OUT */ gras_socket_t *sock){ gras_error_t errcode; gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)self->data)->super; GRAS_IN; - TRY(super->socket_server(super,port,sock)); + TRY(super->socket_server(super,sock)); sock->plugin = self; TRY(gras_trp_buf_init_sock(sock)); return no_error; diff --git a/src/gras/Transport/transport_plugin_sg.c b/src/gras/Transport/transport_plugin_sg.c index c0df9da63b..ae0ca84228 100644 --- a/src/gras/Transport/transport_plugin_sg.c +++ b/src/gras/Transport/transport_plugin_sg.c @@ -30,11 +30,8 @@ static gras_error_t find_port(gras_hostdata_t *hd, int port, gras_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self, - const char *host, - unsigned short port, /* OUT */ gras_socket_t *sock); gras_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self, - unsigned short port, /* OUT */ gras_socket_t *sock); void gras_trp_sg_socket_close(gras_socket_t *sd); @@ -97,8 +94,6 @@ gras_trp_sg_setup(gras_trp_plugin_t *plug) { } gras_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self, - const char *host, - unsigned short port, /* OUT */ gras_socket_t *sock){ gras_error_t errcode; @@ -109,34 +104,34 @@ gras_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self, gras_sg_portrec_t pr; /* make sure this socket will reach someone */ - if (!(peer=MSG_get_host_by_name(host))) { - fprintf(stderr,"GRAS: can't connect to %s: no such host.\n",host); + if (!(peer=MSG_get_host_by_name(sock->peer_name))) { + fprintf(stderr,"GRAS: can't connect to %s: no such host.\n",sock->peer_name); return mismatch_error; } if (!(hd=(gras_hostdata_t *)MSG_host_get_data(peer))) { RAISE1(mismatch_error, "can't connect to %s: no process on this host", - host); + sock->peer_name); } - errcode = find_port(hd,port,&pr); + errcode = find_port(hd,sock->peer_port,&pr); if (errcode != no_error && errcode != mismatch_error) return errcode; if (errcode == mismatch_error) { RAISE2(mismatch_error, "can't connect to %s:%d, no process listen on this port", - host,port); + sock->peer_name,sock->peer_port); } if (pr.raw && !sock->raw) { RAISE2(mismatch_error, "can't connect to %s:%d in regular mode, the process listen " - "in raw mode on this port",host,port); + "in raw mode on this port",sock->peer_name,sock->peer_port); } if (!pr.raw && sock->raw) { RAISE2(mismatch_error, "can't connect to %s:%d in raw mode, the process listen " - "in regular mode on this port",host,port); + "in regular mode on this port",sock->peer_name,sock->peer_port); } /* create the socket */ @@ -153,13 +148,13 @@ gras_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self, DEBUG6("%s (PID %d) connects in %s mode to %s:%d (to_PID=%d)", MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(), - sock->raw?"RAW":"regular",host,port,data->to_PID); + sock->raw?"RAW":"regular", + sock->peer_name,sock->peer_port,data->to_PID); return no_error; } gras_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self, - unsigned short port, gras_socket_t *sock){ gras_error_t errcode; @@ -175,16 +170,16 @@ gras_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self, sock->accepting = 0; /* no such nuisance in SG */ - errcode = find_port(hd,port,&pr); + errcode = find_port(hd,sock->port,&pr); switch (errcode) { case no_error: /* Port already used... */ RAISE2(mismatch_error, "can't listen on address %s:%d: port already in use\n.", - host,port); + host,sock->port); case mismatch_error: /* Port not used so far. Do it */ pr.tochan = sock->raw ? pd->rawChan : pd->chan; - pr.port = port; + pr.port = sock->port; pr.raw = sock->raw; TRY(gras_dynar_push(hd->ports,&pr)); @@ -205,7 +200,7 @@ gras_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self, INFO6("'%s' (%d) ears on %s:%d%s (%p)", MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(), - host,port,sock->raw? " (mode RAW)":"",sock); + host,sock->port,sock->raw? " (mode RAW)":"",sock); return no_error; } @@ -246,11 +241,9 @@ gras_error_t gras_trp_sg_chunk_send(gras_socket_t *sock, m_task_t task=NULL; static unsigned int count=0; char name[256]; - gras_trp_sg_sock_data_t *sock_data; + gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)sock->data; sg_task_data_t *task_data; - sock_data = (gras_trp_sg_sock_data_t *)sock->data; - sprintf(name,"Chunk[%d]",count++); if (!(task_data=gras_new(sg_task_data_t,1))) @@ -305,3 +298,75 @@ gras_error_t gras_trp_sg_chunk_recv(gras_socket_t *sock, GRAS_OUT; return no_error; } + +/* Data exchange over raw sockets */ +gras_error_t gras_socket_raw_exchange(gras_socket_t *peer, + int sender, + unsigned int timeout, + unsigned long int expSize, + unsigned long int msgSize) { + unsigned int bytesTotal; + static unsigned int count=0; + m_task_t task=NULL; + char name[256]; + gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)peer->data; + + gras_procdata_t *pd=gras_procdata_get(); + double startTime; + + startTime=gras_os_time(); /* used only in sender mode */ + + for(bytesTotal = 0; bytesTotal < expSize; bytesTotal += msgSize) { + + if (sender) { + + sprintf(name,"Raw data[%d]",count++); + + task=MSG_task_create(name,0,((double)msgSize)/(1024.0*1024.0),NULL); + + DEBUG5("%f:%s: gras_socket_raw_send(%f %s -> %s) BEGIN", + gras_os_time(), MSG_process_get_name(MSG_process_self()), + ((double)msgSize)/(1024.0*1024.0), + MSG_host_get_name( MSG_host_self()), peer->peer_name); + + if (MSG_task_put(task, sock_data->to_host,sock_data->to_chan) != MSG_OK) + RAISE0(system_error,"Problem during the MSG_task_put()"); + + DEBUG5("%f:%s: gras_socket_raw_send(%f %s -> %s) END", + gras_os_time(), MSG_process_get_name(MSG_process_self()), + ((double)msgSize)/(1024.0*1024.0), + MSG_host_get_name( MSG_host_self()), peer->peer_name); + + } else { /* we are receiver, simulate a select */ + + task=NULL; + DEBUG2("%f:%s: gras_socket_raw_recv() BEGIN\n", + gras_os_time(), MSG_process_get_name(MSG_process_self())); + do { + if (MSG_task_Iprobe((m_channel_t) pd->rawChan)) { + if (MSG_task_get(&task, (m_channel_t) pd->rawChan) != MSG_OK) { + fprintf(stderr,"GRAS: Error in MSG_task_get()\n"); + return unknown_error; + } + + if (MSG_task_destroy(task) != MSG_OK) { + fprintf(stderr,"GRAS: Error in MSG_task_destroy()\n"); + return unknown_error; + } + + DEBUG2("%f:%s: gras_socket_raw_recv() END\n", + gras_os_time(), MSG_process_get_name(MSG_process_self())); + break; + } else { + MSG_process_sleep(0.0001); + } + + } while (gras_os_time() - startTime < timeout); + + if (gras_os_time() - startTime > timeout) + return timeout_error; + } /* receiver part */ + } /* foreach msg */ + + return no_error; +} diff --git a/src/gras/Transport/transport_plugin_tcp.c b/src/gras/Transport/transport_plugin_tcp.c index 988b6e7a9f..189c2cfcc6 100644 --- a/src/gras/Transport/transport_plugin_tcp.c +++ b/src/gras/Transport/transport_plugin_tcp.c @@ -30,11 +30,8 @@ GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_tcp,transport,"TCP transport"); *** Prototypes ***/ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self, - const char *host, - unsigned short port, /* OUT */ gras_socket_t *sock); gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self, - unsigned short port, /* OUT */ gras_socket_t *sock); gras_error_t gras_trp_tcp_socket_accept(gras_socket_t *sock, gras_socket_t **dst); @@ -106,13 +103,12 @@ void gras_trp_tcp_exit(gras_trp_plugin_t *plug) { } gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self, - const char *host, - unsigned short port, /* OUT */ gras_socket_t *sock){ struct sockaddr_in addr; struct hostent *he; struct in_addr *haddr; + int size = sock->bufSize * 1024; sock->incoming = 1; /* TCP sockets are duplex'ed */ @@ -123,12 +119,18 @@ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self, "Failed to create socket: %s", strerror (errno)); } + + if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) || + setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) { + WARN1("setsockopt failed, cannot set buffer size: %s", + strerror(errno)); + } - he = gethostbyname (host); + he = gethostbyname (sock->peer_name); if (he == NULL) { RAISE2(system_error, "Failed to lookup hostname %s: %s", - host, strerror (errno)); + sock->peer_name, strerror (errno)); } haddr = ((struct in_addr *) (he->h_addr_list)[0]); @@ -136,13 +138,13 @@ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self, memset(&addr, 0, sizeof(struct sockaddr_in)); memcpy (&addr.sin_addr, haddr, sizeof(struct in_addr)); addr.sin_family = AF_INET; - addr.sin_port = htons (port); + addr.sin_port = htons (sock->peer_port); if (connect (sock->sd, (struct sockaddr*) &addr, sizeof (addr)) < 0) { close(sock->sd); RAISE3(system_error, "Failed to connect socket to %s:%d (%s)", - host, port, strerror (errno)); + sock->peer_name, sock->peer_port, strerror (errno)); } return no_error; @@ -154,9 +156,8 @@ gras_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self, * Open a socket used to receive messages. */ gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self, - unsigned short port, /* OUT */ gras_socket_t *sock){ -/* int size = bufSize * 1024; */ + int size = sock->bufSize * 1024; int on = 1; struct sockaddr_in server; @@ -164,27 +165,32 @@ gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self, sock->outgoing = 1; /* TCP => duplex mode */ - server.sin_port = htons((u_short)port); + server.sin_port = htons((u_short)sock->port); server.sin_addr.s_addr = INADDR_ANY; server.sin_family = AF_INET; if((sock->sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - RAISE1(system_error,"socket allocation failed: %s", strerror(errno)); + RAISE1(system_error,"Socket allocation failed: %s", strerror(errno)); } - (void)setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, - (char *)&on, sizeof(on)); - /* - (void)setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)); - (void)setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size)); - */ + if (setsockopt(sock->sd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on))) { + RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s", + strerror(errno)); + } + + if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) || + setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) { + WARN1("setsockopt failed, cannot set buffer size: %s", + strerror(errno)); + } + if (bind(sock->sd, (struct sockaddr *)&server, sizeof(server)) == -1) { close(sock->sd); - RAISE2(system_error,"Cannot bind to port %d: %s",port, strerror(errno)); + RAISE2(system_error,"Cannot bind to port %d: %s",sock->port, strerror(errno)); } if (listen(sock->sd, 5) < 0) { close(sock->sd); - RAISE2(system_error,"Cannot listen to port %d: %s",port,strerror(errno)); + RAISE2(system_error,"Cannot listen to port %d: %s",sock->port,strerror(errno)); } if (sock->raw) @@ -192,7 +198,7 @@ gras_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self, else FD_SET(sock->sd, &(tcp->msg_socks)); - DEBUG2("Openned a server socket on port %d (sock %d)",port,sock->sd); + DEBUG2("Openned a server socket on port %d (sock %d)",sock->port,sock->sd); return no_error; } @@ -208,6 +214,7 @@ gras_trp_tcp_socket_accept(gras_socket_t *sock, int sd; int tmp_errno; + int size; TRY(gras_trp_socket_new(1,&res)); @@ -224,16 +231,17 @@ gras_trp_tcp_socket_accept(gras_socket_t *sock, if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *)&i, s) || setsockopt(sd, TcpProtoNumber(), TCP_NODELAY, (char *)&i, s)) { - WARN0("setsockopt failed, cannot condition the accepted socket"); + RAISE1(system_error,"setsockopt failed, cannot condition the socket: %s", + strerror(errno)); } - /* FIXME: bufSize removed until we can have optionsets - i = ((gras_trp_tcp_sock_specific_t*)sock->specific)->buffsize; - if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&i, s) - || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&i, s)) { - WARNING0("setsockopt failed, cannot set buffsize"); - } - */ + (*dst)->bufSize = sock->bufSize; + size = sock->bufSize * 1024; + if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&size, sizeof(size)) + || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&size, sizeof(size))) { + WARN1("setsockopt failed, cannot set buffer size: %s", + strerror(errno)); + } res->plugin = sock->plugin; res->incoming = sock->incoming; @@ -245,7 +253,7 @@ gras_trp_tcp_socket_accept(gras_socket_t *sock, /* FIXME: Lock to protect inet_ntoa */ if (((struct sockaddr *)&peer_in)->sa_family != AF_INET) { - res->peer_name = strdup("unknown"); + res->peer_name = (char*)strdup("unknown"); } else { struct in_addr addrAsInAddr; char *tmp; @@ -254,9 +262,9 @@ gras_trp_tcp_socket_accept(gras_socket_t *sock, tmp = inet_ntoa(addrAsInAddr); if (tmp != NULL) { - res->peer_name = strdup(tmp); + res->peer_name = (char*)strdup(tmp); } else { - res->peer_name = strdup("unknown"); + res->peer_name = (char*)strdup("unknown"); } } @@ -393,3 +401,52 @@ static int TcpProtoNumber(void) { return returnValue; } + +/* Data exchange over raw sockets. Placing this in there is a kind of crude hack. + It means that the only possible raw are TCP where we may want to do UDP for them. + But I fail to find a good internal organization for now. We may want to split + raw and regular sockets more efficiently. +*/ +gras_error_t gras_socket_raw_exchange(gras_socket_t *peer, + int sender, + unsigned int timeout, + unsigned long int exp_size, + unsigned long int msg_size) { + char *chunk; + int res_last, msg_sofar, exp_sofar; + + fd_set rd_set; + int rv; + + struct timeval timeOut; + + if (!(chunk = (char *)gras_malloc(msg_size))) + RAISE_MALLOC; + + for (exp_sofar=0; exp_sofar < exp_size; exp_size += msg_sofar) { + for(msg_sofar=0; msg_sofar < msg_size; msg_size += res_last) { + + if(sender) { + res_last = send(peer->sd, chunk, msg_size - msg_sofar, 0); + } else { + res_last = 0; + FD_ZERO(&rd_set); + FD_SET(peer->sd,&rd_set); + timeOut.tv_sec = timeout; + timeOut.tv_usec = 0; + + if (0 < select(peer->sd+1,&rd_set,NULL,NULL,&timeOut)) + res_last = recv(peer->sd, chunk, msg_size-msg_sofar, 0); + + } + if (res_last == 0) { + /* No progress done, bail out */ + gras_free(chunk); + RAISE0(unknown_error,"Not exchanged a single byte, bailing out"); + } + } + } + + gras_free(chunk); + return no_error; +} diff --git a/src/gras/Transport/transport_private.h b/src/gras/Transport/transport_private.h index 3f22f44d98..22a8539695 100644 --- a/src/gras/Transport/transport_private.h +++ b/src/gras/Transport/transport_private.h @@ -31,6 +31,8 @@ struct s_gras_socket { int outgoing :1; /* true if we can write on this sock */ int accepting :1; /* true if master incoming sock in tcp */ int raw :1; /* true if this is an experiment socket instead of messaging */ + + unsigned long int bufSize; /* what to say to the OS. field here to remember it when accepting */ int sd; int port; /* port on this side */ @@ -75,4 +77,11 @@ gras_error_t gras_trp_buf_setup(gras_trp_plugin_t *plug); gras_error_t gras_trp_buf_init_sock(gras_socket_t *sock); +/* Data exchange over raw sockets */ +gras_error_t gras_socket_raw_exchange(gras_socket_t *peer, + int sender, + unsigned int timeout, + unsigned long int expSize, + unsigned long int msgSize); + #endif /* GRAS_TRP_PRIVATE_H */