/***
*** Prototypes
***/
-xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
+xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t self,
gras_socket_t sock);
-xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
+xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t self,
gras_socket_t sock);
xbt_error_t gras_trp_tcp_socket_accept(gras_socket_t sock,
gras_socket_t *dst);
void gras_trp_tcp_socket_close(gras_socket_t sd);
xbt_error_t gras_trp_tcp_chunk_send(gras_socket_t sd,
- const char *data,
- long int size);
+ const char *data,
+ unsigned long int size);
xbt_error_t gras_trp_tcp_chunk_recv(gras_socket_t sd,
- char *data,
- long int size);
+ char *data,
+ unsigned long int size);
-void gras_trp_tcp_exit(gras_trp_plugin_t *plug);
+void gras_trp_tcp_exit(gras_trp_plugin_t plug);
static int TcpProtoNumber(void);
/***
*** Code
***/
-xbt_error_t gras_trp_tcp_setup(gras_trp_plugin_t *plug) {
+xbt_error_t gras_trp_tcp_setup(gras_trp_plugin_t plug) {
gras_trp_tcp_plug_data_t *data = xbt_new(gras_trp_tcp_plug_data_t,1);
return no_error;
}
-void gras_trp_tcp_exit(gras_trp_plugin_t *plug) {
+void gras_trp_tcp_exit(gras_trp_plugin_t plug) {
DEBUG1("Exit plugin TCP (free %p)", plug->data);
free(plug->data);
}
-xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
+xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t self,
gras_socket_t sock){
struct sockaddr_in addr;
"Failed to connect socket to %s:%d (%s)",
sock->peer_name, sock->peer_port, sock_errstr);
}
-
+ VERB4("Connect to %s:%d (sd=%d, port %d here)",sock->peer_name, sock->peer_port, sock->sd, sock->port);
+
return no_error;
}
*
* Open a socket used to receive messages.
*/
-xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
+xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t self,
/* OUT */ gras_socket_t sock){
int size = sock->bufSize * 1024;
int on = 1;
RAISE2(system_error,"Cannot bind to port %d: %s",sock->port, sock_errstr);
}
+ DEBUG2("Listen on port %d (sd=%d)",sock->port, sock->sd);
if (listen(sock->sd, 5) < 0) {
tcp_close(sock->sd);
- RAISE2(system_error,"Cannot listen to port %d: %s",sock->port,sock_errstr);
+ RAISE2(system_error,"Cannot listen on port %d: %s",sock->port,sock_errstr);
}
if (sock->meas)
else
FD_SET(sock->sd, &(tcp->msg_socks));
- DEBUG2("Openned a server socket on port %d (sock %d)",sock->port,sock->sd);
+ VERB2("Openned a server socket on port %d (sd=%d)",sock->port,sock->sd);
return no_error;
}
sd = accept(sock->sd, (struct sockaddr *)&peer_in, &peer_in_len);
tmp_errno = errno;
- if(sd == -1) {
+ if (sd == -1) {
gras_socket_close(sock);
RAISE1(system_error,
"Accept failed (%s). Droping server socket.", sock_errstr);
}
}
- VERB3("accepted socket %d to %s:%d", sd, res->peer_name,res->peer_port);
+ VERB3("Accepted from %s:%d (sd=%d)", res->peer_name,res->peer_port,sd);
*dst = res;
if (!sock) return; /* close only once */
tcp=sock->plugin->data;
- DEBUG1("close tcp connection %d", sock->sd);
+ VERB1("close tcp connection %d", sock->sd);
/* FIXME: no pipe in GRAS so far
if(!FD_ISSET(sd, &connectedPipes)) {
xbt_error_t
gras_trp_tcp_chunk_send(gras_socket_t sock,
const char *data,
- long int size) {
+ unsigned long int size) {
/* TCP sockets are in duplex mode, don't check direction */
xbt_assert0(size >= 0, "Cannot send a negative amount of data");
status = tcp_write(sock->sd, data, (size_t)size);
DEBUG3("write(%d, %p, %ld);", sock->sd, data, size);
- if (status <= 0) {
+ if (status < 0) {
RAISE4(system_error,"write(%d,%p,%ld) failed: %s",
sock->sd, data, size,
sock_errstr);
size -= status;
data += status;
} else {
- RAISE0(system_error,"file descriptor closed");
+ RAISE1(system_error,"file descriptor closed (%s)",
+ sock_errstr);
}
}
xbt_error_t
gras_trp_tcp_chunk_recv(gras_socket_t sock,
char *data,
- long int size) {
+ unsigned long int size) {
/* TCP sockets are in duplex mode, don't check direction */
xbt_assert0(sock, "Cannot recv on an NULL socket");
while (size) {
int status = 0;
- status = tcp_read(sock->sd, data, (size_t)size);
DEBUG3("read(%d, %p, %ld);", sock->sd, data, size);
+ status = tcp_read(sock->sd, data, (size_t)size);
- if (status <= 0) {
+ if (status < 0) {
RAISE4(system_error,"read(%d,%p,%d) failed: %s",
sock->sd, data, (int)size,
sock_errstr);
size -= status;
data += status;
} else {
- RAISE0(system_error,"file descriptor closed");
+ RAISE3(system_error,"file descriptor closed (nothing read(%d, %p, %ld) on the socket)",
+ sock->sd, data, size);
}
}
return returnValue;
}
-#if 0 /* KILLME */
-/* Data exchange over measurement sockets. Placing this in there is a kind of crude hack.
- It means that the only possible measurement sockets are TCP where we may want to do UDP for them.
- But I fail to find a good internal organization for now. We may want to split
- meas and regular sockets more efficiently.
-*/
-xbt_error_t gras_socket_meas_exchange(gras_socket_t peer,
- int sender,
- unsigned int timeout,
- unsigned long int exp_size,
- unsigned long int msg_size) {
- char *chunk;
- int res_last, msg_sofar, exp_sofar;
-
- fd_set rd_set;
-/* int rv; */
-
- struct timeval timeOut;
-
- chunk = xbt_malloc(msg_size);
-
- for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
- for(msg_sofar=0; msg_sofar < msg_size; msg_sofar += res_last) {
-
- if(sender) {
- res_last = send(peer->sd, chunk, msg_size - msg_sofar, 0);
- } else {
- res_last = 0;
- FD_ZERO(&rd_set);
- FD_SET(peer->sd,&rd_set);
- timeOut.tv_sec = timeout;
- timeOut.tv_usec = 0;
-
- if (0 < select(peer->sd+1,&rd_set,NULL,NULL,&timeOut))
- res_last = recv(peer->sd, chunk, msg_size-msg_sofar, 0);
-
- }
- if (res_last == 0) {
- /* No progress done, bail out */
- free(chunk);
- RAISE0(unknown_error,"Not exchanged a single byte, bailing out");
- }
- }
- }
-
- free(chunk);
- return no_error;
-}
-#endif
-
#ifdef HAVE_WINSOCK_H
#define RETSTR( x ) case x: return #x