return timeout_error;
}
-xbt_error_t gras_trp_sg_setup(gras_trp_plugin_t *plug) {
+xbt_error_t gras_trp_sg_setup(gras_trp_plugin_t plug) {
return mismatch_error;
}
double startTime=gras_os_time();
gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_get("gras_trp");
gras_trp_sg_sock_data_t *sockdata;
- gras_trp_plugin_t *trp;
+ gras_trp_plugin_t trp;
gras_socket_t sock_iter; /* iterating over all sockets */
int cursor,cpt;
/* dummy implementations of the functions used in RL mode */
-xbt_error_t gras_trp_tcp_setup(gras_trp_plugin_t *plug) {
+xbt_error_t gras_trp_tcp_setup(gras_trp_plugin_t plug) {
return mismatch_error;
}
-xbt_error_t gras_trp_file_setup(gras_trp_plugin_t *plug) {
+xbt_error_t gras_trp_file_setup(gras_trp_plugin_t plug) {
return mismatch_error;
}
gras_trp_plugin_new(const char *name, gras_trp_setup_t setup) {
xbt_error_t errcode;
- gras_trp_plugin_t *plug = xbt_new0(gras_trp_plugin_t, 1);
+ gras_trp_plugin_t plug = xbt_new0(s_gras_trp_plugin_t, 1);
DEBUG1("Create plugin %s",name);
void gras_trp_plugin_free(void *p) {
- gras_trp_plugin_t *plug = p;
+ gras_trp_plugin_t plug = p;
if (plug) {
if (plug->exit) {
DEBUG1("Create a new socket (%p)", (void*)sock);
sock->plugin = NULL;
- sock->sd = -1;
- sock->data = NULL;
sock->incoming = incoming ? 1:0;
sock->outgoing = incoming ? 0:1;
sock->accepting = incoming ? 1:0;
+ sock->meas = 0;
+ sock->sd = -1;
sock->port = -1;
sock->peer_port = -1;
sock->peer_name = NULL;
- sock->meas = 0;
+ sock->data = NULL;
+ sock->bufdata = NULL;
+
*dst = sock;
xbt_dynar_push(gras_socketset_get(),dst);
/* OUT */ gras_socket_t *dst) {
xbt_error_t errcode;
- gras_trp_plugin_t *trp;
+ gras_trp_plugin_t trp;
gras_socket_t sock;
*dst = NULL;
/* OUT */ gras_socket_t *dst) {
xbt_error_t errcode;
- gras_trp_plugin_t *trp;
+ gras_trp_plugin_t trp;
gras_socket_t sock;
*dst = NULL;
xbt_error_t
gras_trp_plugin_get_by_name(const char *name,
- gras_trp_plugin_t **dst){
+ gras_trp_plugin_t *dst){
return xbt_dict_get(_gras_trp_plugins,name,(void**)dst);
}
return sock->peer_name;
}
+/** \brief Check if the provided socket is a measurement one (or a regular one) */
+int gras_socket_is_meas(gras_socket_t sock) {
+ return sock->meas;
+}
+
+/** \brief Send a chunk of (random) data over a measurement socket
+ *
+ * @param peer measurement socket to use for the experiment
+ * @param timeout timeout (in seconds)
+ * @param exp_size total amount of data to send (in bytes).
+ * @param msg_size size of each chunk sent over the socket (in bytes).
+ *
+ * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
+ * each side of the socket should be paired.
+ *
+ * The exchanged data is zeroed to make sure it's initialized, but
+ * there is no way to control what is sent (ie, you cannot use these
+ * functions to exchange data out of band).
+ */
xbt_error_t gras_socket_meas_send(gras_socket_t peer,
unsigned int timeout,
unsigned long int exp_size,
return no_error;
}
+/** \brief Receive a chunk of data over a measurement socket
+ *
+ * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
+ * each side of the socket should be paired.
+ */
xbt_error_t gras_socket_meas_recv(gras_socket_t peer,
unsigned int timeout,
unsigned long int exp_size,
* \brief Something similar to the good old accept system call.
*
* Make sure that there is someone speaking to the provided server socket.
- * In RL, it does an accept(2), close the master socket, and put the accepted
- * socket in place of the provided one. In SG, it does not do anything for
+ * In RL, it does an accept(2) and return the result as last argument.
+ * In SG, as accepts are useless, it returns the provided argument as result.
+ * You should thus test whether (peer != accepted) before closing both of them.
*
* You should only call this on measurement sockets. It is automatically
* done for regular sockets, but you usually want more control about
* what's going on with measurement sockets.
- *
- *
*/
xbt_error_t gras_socket_meas_accept(gras_socket_t peer, gras_socket_t *accepted){
xbt_error_t errcode;
***/
/* A plugin type */
-typedef struct gras_trp_plugin_ gras_trp_plugin_t;
+typedef struct gras_trp_plugin_ s_gras_trp_plugin_t, *gras_trp_plugin_t;
/* A plugin type */
struct gras_trp_plugin_ {
/* dst pointers are created and initialized with default values
before call to socket_client/server.
Retrive the info you need from there. */
- xbt_error_t (*socket_client)(gras_trp_plugin_t *self,
- gras_socket_t dst);
- xbt_error_t (*socket_server)(gras_trp_plugin_t *self,
- gras_socket_t dst);
+ xbt_error_t (*socket_client)(gras_trp_plugin_t self,
+ gras_socket_t dst);
+ xbt_error_t (*socket_server)(gras_trp_plugin_t self,
+ gras_socket_t dst);
xbt_error_t (*socket_accept)(gras_socket_t sock,
gras_socket_t *dst);
/* exit is responsible for freeing data and telling the OS this plugin goes */
/* exit=NULL, data gets freed. (ie exit function needed only when data contains pointers) */
- void (*exit)(gras_trp_plugin_t *);
+ void (*exit)(gras_trp_plugin_t);
};
xbt_error_t
gras_trp_plugin_get_by_name(const char *name,
- gras_trp_plugin_t **dst);
+ gras_trp_plugin_t *dst);
/* Data of this module specific to each process
* (used by sg_process.c to cleanup the SG channel cruft)
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_buf,transport,
"Generic buffered transport (works on top of TCP or SG)");
+
+static gras_trp_plugin_t _buf_super;
+
/***
*** Prototypes
***/
void hexa_print(const char*name, unsigned char *data, int size); /* in gras.c */
-xbt_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self,
+xbt_error_t gras_trp_buf_socket_client(gras_trp_plugin_t self,
gras_socket_t sock);
-xbt_error_t gras_trp_buf_socket_server(gras_trp_plugin_t *self,
+xbt_error_t gras_trp_buf_socket_server(gras_trp_plugin_t self,
gras_socket_t sock);
xbt_error_t gras_trp_buf_socket_accept(gras_socket_t sock,
gras_socket_t *dst);
***/
typedef struct {
- gras_trp_plugin_t *super;
+ int junk;
} gras_trp_buf_plug_data_t;
/***
sock->bufdata = data;
}
-
/***
*** Code
***/
xbt_error_t
-gras_trp_buf_setup(gras_trp_plugin_t *plug) {
+gras_trp_buf_setup(gras_trp_plugin_t plug) {
xbt_error_t errcode;
gras_trp_buf_plug_data_t *data =xbt_new(gras_trp_buf_plug_data_t,1);
XBT_IN;
TRY(gras_trp_plugin_get_by_name(gras_if_RL() ? "tcp" : "sg",
- &(data->super)));
+ &_buf_super));
DEBUG1("Derivate a buffer plugin from %s",gras_if_RL() ? "tcp" : "sg");
plug->socket_client = gras_trp_buf_socket_client;
return no_error;
}
-xbt_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self,
+xbt_error_t gras_trp_buf_socket_client(gras_trp_plugin_t self,
/* OUT */ gras_socket_t sock){
xbt_error_t errcode;
- gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)self->data)->super;
XBT_IN;
- TRY(super->socket_client(super,sock));
+ TRY(_buf_super->socket_client(_buf_super,sock));
sock->plugin = self;
gras_trp_buf_init_sock(sock);
*
* Open a socket used to receive messages.
*/
-xbt_error_t gras_trp_buf_socket_server(gras_trp_plugin_t *self,
+xbt_error_t gras_trp_buf_socket_server(gras_trp_plugin_t self,
/* OUT */ gras_socket_t sock){
xbt_error_t errcode;
- gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)self->data)->super;
XBT_IN;
- TRY(super->socket_server(super,sock));
+ TRY(_buf_super->socket_server(_buf_super,sock));
sock->plugin = self;
gras_trp_buf_init_sock(sock);
return no_error;
gras_trp_buf_socket_accept(gras_socket_t sock,
gras_socket_t *dst) {
xbt_error_t errcode;
- gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
XBT_IN;
- TRY(super->socket_accept(sock,dst));
+ TRY(_buf_super->socket_accept(sock,dst));
(*dst)->plugin = sock->plugin;
gras_trp_buf_init_sock(*dst);
XBT_OUT;
}
void gras_trp_buf_socket_close(gras_socket_t sock){
- gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
+ xbt_error_t errcode;
gras_trp_bufdata_t *data=sock->bufdata;
XBT_IN;
free(data->out.data);
free(data);
- super->socket_close(sock);
+ _buf_super->socket_close(sock);
}
/**
unsigned long int size) {
xbt_error_t errcode;
- gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
gras_trp_bufdata_t *data=sock->bufdata;
long int chunck_pos = 0;
-
+
/* Let underneath plugin check for direction, we work even in duplex */
xbt_assert0(sock, "Cannot recv on an NULL socket");
xbt_assert0(size >= 0, "Cannot receive a negative amount of data");
int nextsize;
if (gras_if_RL()) {
DEBUG0("Recv the size");
- TRY(super->chunk_recv(sock,(char*)&nextsize, 4));
+ errcode=_buf_super->chunk_recv(sock,(char*)&nextsize, 4);
+ if (errcode!=no_error)
+ RAISE4(errcode,"Got '%s' while trying to get the chunk size on %p (peer = %s:%d)",
+ xbt_error_name(errcode),sock,gras_socket_peer_name(sock),gras_socket_peer_port(sock));
data->in.size = (int)ntohl(nextsize);
VERB1("Recv the chunk (size=%d)",data->in.size);
} else {
data->in.size = -1;
}
- TRY(super->chunk_recv(sock, data->in.data, data->in.size));
+ TRY(_buf_super->chunk_recv(sock, data->in.data, data->in.size));
if (gras_if_RL()) {
data->in.pos=0;
gras_trp_buf_flush(gras_socket_t sock) {
xbt_error_t errcode;
int size;
- gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
gras_trp_bufdata_t *data=sock->bufdata;
-
- XBT_IN;
+ XBT_IN;
+
DEBUG0("Flush");
if (XBT_LOG_ISENABLED(trp_buf,xbt_log_priority_debug))
hexa_print("chunck to send ",data->out.data,data->out.size);
gras_socket_peer_name(sock),gras_socket_peer_port(sock));
if (gras_if_RL()) {
size = (int)htonl(size);
- TRY(super->chunk_send(sock,(char*) &size, 4));
+ TRY(_buf_super->chunk_send(sock,(char*) &size, 4));
} else {
memcpy(data->out.data, &size, 4);
}
DEBUG3("Send the chunk (size=%d) to %s:%d",data->out.size,
gras_socket_peer_name(sock),gras_socket_peer_port(sock));
- TRY(super->chunk_send(sock, data->out.data, data->out.size));
+ TRY(_buf_super->chunk_send(sock, data->out.data, data->out.size));
VERB1("Chunk sent (size=%d)",data->out.size);
if (XBT_LOG_ISENABLED(trp_buf,xbt_log_priority_debug))
hexa_print("chunck sent ",data->out.data,data->out.size);
*** Code
***/
xbt_error_t
-gras_trp_file_setup(gras_trp_plugin_t *plug) {
+gras_trp_file_setup(gras_trp_plugin_t plug) {
gras_trp_file_plug_data_t *file = xbt_new(gras_trp_file_plug_data_t,1);
gras_socket_client_from_file(const char*path,
/* OUT */ gras_socket_t *dst) {
xbt_error_t errcode;
- gras_trp_plugin_t *trp;
+ gras_trp_plugin_t trp;
xbt_assert0(gras_if_RL(),
"Cannot use file as socket in the simulator");
gras_socket_server_from_file(const char*path,
/* OUT */ gras_socket_t *dst) {
xbt_error_t errcode;
- gras_trp_plugin_t *trp;
+ gras_trp_plugin_t trp;
xbt_assert0(gras_if_RL(),
"Cannot use file as socket in the simulator");
gras_sg_portrec_t *hpd);
-xbt_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self,
+xbt_error_t gras_trp_sg_socket_client(gras_trp_plugin_t self,
/* OUT */ gras_socket_t sock);
-xbt_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self,
+xbt_error_t gras_trp_sg_socket_server(gras_trp_plugin_t self,
/* OUT */ gras_socket_t sock);
void gras_trp_sg_socket_close(gras_socket_t sd);
xbt_error_t
-gras_trp_sg_setup(gras_trp_plugin_t *plug) {
+gras_trp_sg_setup(gras_trp_plugin_t plug) {
gras_trp_sg_plug_data_t *data=xbt_new(gras_trp_sg_plug_data_t,1);
return no_error;
}
-xbt_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self,
+xbt_error_t gras_trp_sg_socket_client(gras_trp_plugin_t self,
/* OUT */ gras_socket_t sock){
xbt_error_t errcode;
return no_error;
}
-xbt_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self,
+xbt_error_t gras_trp_sg_socket_server(gras_trp_plugin_t self,
gras_socket_t sock){
xbt_error_t errcode;
/***
*** Prototypes
***/
-xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
+xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t self,
gras_socket_t sock);
-xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
+xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t self,
gras_socket_t sock);
xbt_error_t gras_trp_tcp_socket_accept(gras_socket_t sock,
gras_socket_t *dst);
char *data,
unsigned long int size);
-void gras_trp_tcp_exit(gras_trp_plugin_t *plug);
+void gras_trp_tcp_exit(gras_trp_plugin_t plug);
static int TcpProtoNumber(void);
/***
*** Code
***/
-xbt_error_t gras_trp_tcp_setup(gras_trp_plugin_t *plug) {
+xbt_error_t gras_trp_tcp_setup(gras_trp_plugin_t plug) {
gras_trp_tcp_plug_data_t *data = xbt_new(gras_trp_tcp_plug_data_t,1);
return no_error;
}
-void gras_trp_tcp_exit(gras_trp_plugin_t *plug) {
+void gras_trp_tcp_exit(gras_trp_plugin_t plug) {
DEBUG1("Exit plugin TCP (free %p)", plug->data);
free(plug->data);
}
-xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t *self,
+xbt_error_t gras_trp_tcp_socket_client(gras_trp_plugin_t self,
gras_socket_t sock){
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons (sock->peer_port);
- DEBUG2("Connect to %s:%d",sock->peer_name, sock->peer_port);
if (connect (sock->sd, (struct sockaddr*) &addr, sizeof (addr)) < 0) {
tcp_close(sock->sd);
RAISE3(system_error,
"Failed to connect socket to %s:%d (%s)",
sock->peer_name, sock->peer_port, sock_errstr);
}
-
+ VERB4("Connect to %s:%d (sd=%d, port %d here)",sock->peer_name, sock->peer_port, sock->sd, sock->port);
+
return no_error;
}
*
* Open a socket used to receive messages.
*/
-xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t *self,
+xbt_error_t gras_trp_tcp_socket_server(gras_trp_plugin_t self,
/* OUT */ gras_socket_t sock){
int size = sock->bufSize * 1024;
int on = 1;
RAISE2(system_error,"Cannot bind to port %d: %s",sock->port, sock_errstr);
}
- DEBUG1("Listen on port %d",sock->port);
+ DEBUG2("Listen on port %d (sd=%d)",sock->port, sock->sd);
if (listen(sock->sd, 5) < 0) {
tcp_close(sock->sd);
RAISE2(system_error,"Cannot listen on port %d: %s",sock->port,sock_errstr);
else
FD_SET(sock->sd, &(tcp->msg_socks));
- DEBUG2("Openned a server socket on port %d (sock %d)",sock->port,sock->sd);
+ VERB2("Openned a server socket on port %d (sd=%d)",sock->port,sock->sd);
return no_error;
}
}
}
- VERB3("Accepted socket %d to %s:%d", sd, res->peer_name,res->peer_port);
+ VERB3("Accepted from %s:%d (sd=%d)", res->peer_name,res->peer_port,sd);
*dst = res;
if (!sock) return; /* close only once */
tcp=sock->plugin->data;
- DEBUG1("close tcp connection %d", sock->sd);
+ VERB1("close tcp connection %d", sock->sd);
/* FIXME: no pipe in GRAS so far
if(!FD_ISSET(sd, &connectedPipes)) {
size -= status;
data += status;
} else {
- RAISE0(system_error,"file descriptor closed (nothing read on the socket)");
+ RAISE3(system_error,"file descriptor closed (nothing read(%d, %p, %ld) on the socket)",
+ sock->sd, data, size);
}
}
return returnValue;
}
-#if 0 /* KILLME */
-/* Data exchange over measurement sockets. Placing this in there is a kind of crude hack.
- It means that the only possible measurement sockets are TCP where we may want to do UDP for them.
- But I fail to find a good internal organization for now. We may want to split
- meas and regular sockets more efficiently.
-*/
-xbt_error_t gras_socket_meas_exchange(gras_socket_t peer,
- int sender,
- unsigned int timeout,
- unsigned long int exp_size,
- unsigned long int msg_size) {
- char *chunk;
- int res_last, msg_sofar, exp_sofar;
-
- fd_set rd_set;
-/* int rv; */
-
- struct timeval timeOut;
-
- chunk = xbt_malloc(msg_size);
-
- for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
- for(msg_sofar=0; msg_sofar < msg_size; msg_sofar += res_last) {
-
- if(sender) {
- res_last = send(peer->sd, chunk, msg_size - msg_sofar, 0);
- } else {
- res_last = 0;
- FD_ZERO(&rd_set);
- FD_SET(peer->sd,&rd_set);
- timeOut.tv_sec = timeout;
- timeOut.tv_usec = 0;
-
- if (0 < select(peer->sd+1,&rd_set,NULL,NULL,&timeOut))
- res_last = recv(peer->sd, chunk, msg_size-msg_sofar, 0);
-
- }
- if (res_last == 0) {
- /* No progress done, bail out */
- free(chunk);
- RAISE0(unknown_error,"Not exchanged a single byte, bailing out");
- }
- }
- }
-
- free(chunk);
- return no_error;
-}
-#endif
-
#ifdef HAVE_WINSOCK_H
#define RETSTR( x ) case x: return #x
typedef struct gras_trp_bufdata_ gras_trp_bufdata_t;
typedef struct s_gras_socket {
- gras_trp_plugin_t *plugin;
+ gras_trp_plugin_t plugin;
int incoming :1; /* true if we can read from this sock */
int outgoing :1; /* true if we can write on this sock */
gras_socket_t *dst);
/* The drivers */
-typedef xbt_error_t (*gras_trp_setup_t)(gras_trp_plugin_t *dst);
+typedef xbt_error_t (*gras_trp_setup_t)(gras_trp_plugin_t dst);
-xbt_error_t gras_trp_tcp_setup(gras_trp_plugin_t *plug);
-xbt_error_t gras_trp_file_setup(gras_trp_plugin_t *plug);
-xbt_error_t gras_trp_sg_setup(gras_trp_plugin_t *plug);
-xbt_error_t gras_trp_buf_setup(gras_trp_plugin_t *plug);
+xbt_error_t gras_trp_tcp_setup(gras_trp_plugin_t plug);
+xbt_error_t gras_trp_file_setup(gras_trp_plugin_t plug);
+xbt_error_t gras_trp_sg_setup(gras_trp_plugin_t plug);
+xbt_error_t gras_trp_buf_setup(gras_trp_plugin_t plug);
/*