whole_payload_size = gras_datadesc_copy(msgtype->ctn_type,
payload, msg->payl);
}
- /* put message on msg_queue */
- msg_remote_proc = (gras_msg_procdata_t)gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process);
- xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg);
-
- /* wake-up the receiver */
- trp_remote_proc = (gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
- xbt_fifo_push(trp_remote_proc->active_socket,sock);
-
- SIMIX_cond_signal(trp_remote_proc->cond);
-
- /* wait for the receiver */
- SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
-
- /* creates simix action and waits its ends, waits in the sender host condition*/
- act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host,msgtype->name, (double)whole_payload_size, -1);
- SIMIX_register_action_to_condition(act,sock_data->cond);
- SIMIX_register_condition_to_action(act,sock_data->cond);
-
+ /* put message on msg_queue */
+ msg_remote_proc = (gras_msg_procdata_t)
+ gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process);
+ xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg);
+
+ /* wake-up the receiver */
+ trp_remote_proc = (gras_trp_procdata_t)
+ gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
+
+ xbt_fifo_push(trp_remote_proc->msg_selectable_sockets,sock);
+ SIMIX_cond_signal(trp_remote_proc->msg_select_cond);
+
+ /* wait for the receiver */
+ SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
+
+ /* creates simix action and waits its ends, waits in the sender host
+ condition*/
+ act = SIMIX_action_communicate(SIMIX_host_self(),
+ sock_data->to_host,msgtype->name,
+ (double)whole_payload_size, -1);
+ SIMIX_register_action_to_condition(act,sock_data->cond);
+ SIMIX_register_condition_to_action(act,sock_data->cond);
+
VERB5("Sending to %s(%s) a message type '%s' kind '%s' ID %lu",
- SIMIX_host_get_name(sock_data->to_host),SIMIX_process_get_name(sock_data->to_process),
- msg->type->name,e_gras_msg_kind_names[msg->kind], msg->ID);
+ SIMIX_host_get_name(sock_data->to_host),
+ SIMIX_process_get_name(sock_data->to_process),
+ msg->type->name,e_gras_msg_kind_names[msg->kind], msg->ID);
- SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
- /* error treatmeant */
-
- /* cleanup structures */
- SIMIX_action_destroy(act);
- SIMIX_mutex_unlock(sock_data->mutex);
+ SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
+ /* error treatmeant (FIXME)*/
- VERB0("Message sent");
+ /* cleanup structures */
+ SIMIX_action_destroy(act);
+ SIMIX_mutex_unlock(sock_data->mutex);
+
+ VERB0("Message sent");
}
/*
void
gras_msg_recv(gras_socket_t sock,
gras_msg_t msg) {
-
- gras_trp_sg_sock_data_t *sock_data;
- gras_trp_sg_sock_data_t *remote_sock_data;
- gras_hostdata_t *remote_hd;
+
+ gras_trp_sg_sock_data_t *sock_data;
+ gras_trp_sg_sock_data_t *remote_sock_data;
+ gras_hostdata_t *remote_hd;
gras_msg_t msg_got;
- gras_msg_procdata_t msg_procdata = (gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
+ gras_msg_procdata_t msg_procdata =
+ (gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
xbt_assert1(!gras_socket_is_meas(sock),
"Asked to receive a message on the measurement socket %p", sock);
xbt_assert0(msg,"msg is an out parameter of gras_msg_recv...");
-
- sock_data = (gras_trp_sg_sock_data_t *)sock->data;
- remote_sock_data = ((gras_trp_sg_sock_data_t *)sock->data)->to_socket->data;
- DEBUG3("Remote host %s, Remote Port: %d Local port %d", SIMIX_host_get_name(sock_data->to_host), sock->peer_port, sock->port);
- remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host);
-
- if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) {
- THROW_IMPOSSIBLE;
- }
- DEBUG1("Size msg_to_receive buffer: %d", xbt_fifo_size(msg_procdata->msg_to_receive_queue));
+
+ sock_data = (gras_trp_sg_sock_data_t *)sock->data;
+ remote_sock_data = ((gras_trp_sg_sock_data_t *)sock->data)->to_socket->data;
+ DEBUG3("Remote host %s, Remote Port: %d Local port %d",
+ SIMIX_host_get_name(sock_data->to_host), sock->peer_port, sock->port);
+ remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host);
+
+ if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) {
+ THROW_IMPOSSIBLE;
+ }
+ DEBUG1("Size msg_to_receive buffer: %d",
+ xbt_fifo_size(msg_procdata->msg_to_receive_queue));
msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue);
-
- SIMIX_mutex_lock(remote_sock_data->mutex);
-/* ok, I'm here, you can continuate the communication */
- SIMIX_cond_signal(remote_sock_data->cond);
-
-/* wait for communication end */
- SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex);
-
- msg_got->expe= msg->expe;
+
+ SIMIX_mutex_lock(remote_sock_data->mutex);
+ /* ok, I'm here, you can continuate the communication */
+ SIMIX_cond_signal(remote_sock_data->cond);
+
+ /* wait for communication end */
+ SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex);
+
+ msg_got->expe= msg->expe;
memcpy(msg,msg_got,sizeof(s_gras_msg_t));
- xbt_free(msg_got);
- SIMIX_mutex_unlock(remote_sock_data->mutex);
-
- VERB3("Received a message type '%s' kind '%s' ID %lu",// from %s",
+ xbt_free(msg_got);
+ SIMIX_mutex_unlock(remote_sock_data->mutex);
+
+ VERB3("Received a message type '%s' kind '%s' ID %lu",// from %s",
msg->type->name,
e_gras_msg_kind_names[msg->kind],
msg->ID);
* if timeout>0 and no message there, wait at most that amount of time before giving up.
*/
gras_socket_t gras_trp_select(double timeout) {
- gras_socket_t res;
- gras_trp_procdata_t pd = (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
- gras_trp_sg_sock_data_t *sockdata;
- gras_trp_plugin_t trp;
- gras_socket_t active_socket;
- gras_socket_t sock_iter; /* iterating over all sockets */
- int cursor;
-
- DEBUG0("Trying to get the lock pd, trp_select");
- SIMIX_mutex_lock(pd->mutex);
- DEBUG3("select on %s@%s with timeout=%f",
- SIMIX_process_get_name(SIMIX_process_self()),
- SIMIX_host_get_name(SIMIX_host_self()),
- timeout);
-
- if (xbt_fifo_size(pd->active_socket) == 0) {
- /* message didn't arrive yet, wait */
- SIMIX_cond_wait_timeout(pd->cond,pd->mutex,timeout);
- }
-
- if (xbt_fifo_size(pd->active_socket) == 0) {
- DEBUG0("TIMEOUT");
- SIMIX_mutex_unlock(pd->mutex);
- THROW0(timeout_error,0,"Timeout");
- }
- active_socket = xbt_fifo_shift(pd->active_socket);
-
- /* Ok, got something. Open a socket back to the expeditor */
-
- /* Try to reuse an already openned socket to that expeditor */
- DEBUG1("Open sockets size %lu",xbt_dynar_length(pd->sockets));
- xbt_dynar_foreach(pd->sockets,cursor,sock_iter) {
- DEBUG1("Consider %p as outgoing socket to expeditor",sock_iter);
-
- if (sock_iter->meas || !sock_iter->outgoing)
- continue;
- /*
- if ((sock_iter->peer_port == active_socket->port) &&
- (((gras_trp_sg_sock_data_t*)sock_iter->data)->to_host == SIMIX_process_get_host(((gras_trp_sg_sock_data_t*)active_socket->data)->from_process))) {
- */
- if ( (((gras_trp_sg_sock_data_t*)sock_iter->data)->to_socket == active_socket) && (((gras_trp_sg_sock_data_t*)sock_iter->data)->to_host == SIMIX_process_get_host(((gras_trp_sg_sock_data_t*)active_socket->data)->from_process)) ) {
- SIMIX_mutex_unlock(pd->mutex);
- return sock_iter;
- }
- }
-
- /* Socket to expeditor not created yet */
- DEBUG0("Create a socket to the expeditor");
-
- trp = gras_trp_plugin_get_by_name("sg");
-
- gras_trp_socket_new(1,&res);
- res->plugin = trp;
-
- res->incoming = 1;
- res->outgoing = 1;
- res->accepting = 0;
- res->sd = -1;
-
- res->port = -1;
-
- /* initialize the ports */
- //res->peer_port = active_socket->port;
- res->port = active_socket->peer_port;
-
- /* create sockdata */
- sockdata = xbt_new(gras_trp_sg_sock_data_t,1);
- sockdata->from_process = SIMIX_process_self();
- sockdata->to_process = ((gras_trp_sg_sock_data_t*)(active_socket->data))->from_process;
+ gras_socket_t res;
+ gras_trp_procdata_t pd =
+ (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
+ gras_trp_sg_sock_data_t *sockdata;
+ gras_trp_plugin_t trp;
+ gras_socket_t active_socket;
+ gras_trp_sg_sock_data_t *active_socket_data;
+ gras_socket_t sock_iter; /* iterating over all sockets */
+ int cursor;
+
+ DEBUG0("Trying to get the lock pd, trp_select");
+ SIMIX_mutex_lock(pd->msg_select_mutex);
+ DEBUG3("select on %s@%s with timeout=%f",
+ SIMIX_process_get_name(SIMIX_process_self()),
+ SIMIX_host_get_name(SIMIX_host_self()),
+ timeout);
+
+ if (xbt_fifo_size(pd->msg_selectable_sockets) == 0) {
+ /* message didn't arrive yet, wait */
+ SIMIX_cond_wait_timeout(pd->msg_select_cond,pd->msg_select_mutex,timeout);
+ }
+
+ if (xbt_fifo_size(pd->msg_selectable_sockets) == 0) {
+ DEBUG0("TIMEOUT");
+ SIMIX_mutex_unlock(pd->msg_select_mutex);
+ THROW0(timeout_error,0,"Timeout");
+ }
+ active_socket = xbt_fifo_shift(pd->msg_selectable_sockets);
+ active_socket_data = (gras_trp_sg_sock_data_t*)active_socket->data;
+
+ /* Ok, got something. Open a socket back to the expeditor */
+
+ /* Try to reuse an already openned socket to that expeditor */
+ DEBUG1("Open sockets size %lu",xbt_dynar_length(pd->sockets));
+ xbt_dynar_foreach(pd->sockets,cursor,sock_iter) {
+ gras_trp_sg_sock_data_t *sock_data;
+ DEBUG1("Consider %p as outgoing socket to expeditor",sock_iter);
+
+ if (sock_iter->meas || !sock_iter->outgoing)
+ continue;
+ sock_data = ((gras_trp_sg_sock_data_t*)sock_iter->data);
+
+ if ( (sock_data->to_socket == active_socket) &&
+ (sock_data->to_host == SIMIX_process_get_host(active_socket_data->from_process)) ) {
+ SIMIX_mutex_unlock(pd->msg_select_mutex);
+ return sock_iter;
+ }
+ }
+
+ /* Socket to expeditor not created yet */
+ DEBUG0("Create a socket to the expeditor");
+
+ trp = gras_trp_plugin_get_by_name("sg");
+
+ gras_trp_socket_new(1,&res);
+ res->plugin = trp;
+
+ res->incoming = 1;
+ res->outgoing = 1;
+ res->accepting = 0;
+ res->sd = -1;
+
+ res->port = -1;
+
+ /* initialize the ports */
+ //res->peer_port = active_socket->port;
+ res->port = active_socket->peer_port;
+
+ /* create sockdata */
+ sockdata = xbt_new(gras_trp_sg_sock_data_t,1);
+ sockdata->from_process = SIMIX_process_self();
+ sockdata->to_process = active_socket_data->from_process;
- res->peer_port = ((gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sockdata->to_process))->myport;
- sockdata->to_socket = active_socket;
- /*update the peer to_socket variable */
- ((gras_trp_sg_sock_data_t*)active_socket->data)->to_socket = res;
- sockdata->cond = SIMIX_cond_init();
- sockdata->mutex = SIMIX_mutex_init();
+ res->peer_port =
+ ((gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sockdata->to_process))->myport;
+ sockdata->to_socket = active_socket;
+ /*update the peer to_socket variable */
+ active_socket_data->to_socket = res;
+ sockdata->cond = SIMIX_cond_init();
+ sockdata->mutex = SIMIX_mutex_init();
- sockdata->to_host = SIMIX_process_get_host(((gras_trp_sg_sock_data_t*)(active_socket->data))->from_process);
+ sockdata->to_host = SIMIX_process_get_host(active_socket_data->from_process);
- res->data = sockdata;
- res->peer_name = strdup(SIMIX_host_get_name(sockdata->to_host));
+ res->data = sockdata;
+ res->peer_name = strdup(SIMIX_host_get_name(sockdata->to_host));
- gras_trp_buf_init_sock(res);
+ gras_trp_buf_init_sock(res);
- DEBUG4("Create socket to process:%s(Port %d) from process: %s(Port %d)",SIMIX_process_get_name(sockdata->from_process),res->peer_port, SIMIX_process_get_name(sockdata->to_process), res->port);
+ DEBUG4("Create socket to process:%s(Port %d) from process: %s(Port %d)",
+ SIMIX_process_get_name(sockdata->from_process),
+ res->peer_port,
+ SIMIX_process_get_name(sockdata->to_process), res->port);
- SIMIX_mutex_unlock(pd->mutex);
- return res;
+ SIMIX_mutex_unlock(pd->msg_select_mutex);
+ return res;
}
*** Main user functions
***/
/* stable if we know the storage will keep as is until the next trp_flush */
-XBT_PUBLIC(void) gras_trp_send(gras_socket_t sd, char *data, long int size, int stable);
+XBT_PUBLIC(void) gras_trp_send(gras_socket_t sd, char *data, long int size,
+ int stable);
XBT_PUBLIC(void) gras_trp_recv(gras_socket_t sd, char *data, long int size);
XBT_PUBLIC(void) gras_trp_flush(gras_socket_t sd);
void *data; /* plugin-specific data */
- /* exit is responsible for freeing data and telling the OS this plugin goes */
- /* exit=NULL, data gets freed. (ie exit function needed only when data contains pointers) */
+ /* exit is responsible for freeing data and telling to the OS that
+ this plugin is gone */
+ /* exit=NULL, data gets brutally free()d by the generic interface.
+ (ie exit function needed only when data contains pointers) */
void (*exit)(gras_trp_plugin_t);
};
char *name;
unsigned int name_len;
- xbt_dynar_t sockets; /* all sockets known to this process */
int myport; /* Port on which I listen myself */
- fd_set *fdset;
+
+ xbt_dynar_t sockets; /* all sockets known to this process */
+ fd_set *fdset; /* idem, in another formalism */
/* SG only elements. In RL, they are part of the OS ;) */
- smx_cond_t cond;
- smx_mutex_t mutex;
- smx_cond_t cond_meas;
- smx_mutex_t mutex_meas;
- xbt_fifo_t active_socket;
- xbt_fifo_t active_socket_meas;
+
+ /* List of sockets ready to be select()ed */
+ xbt_fifo_t msg_selectable_sockets; /* regular sockets */
+ xbt_fifo_t meas_selectable_sockets;/* measurement ones */
+
+ /* Synchronisation on msg_selectable_sockets */
+ smx_cond_t msg_select_cond;
+ smx_mutex_t msg_select_mutex;
+ /* Synchronisation on meas_selectable_sockets */
+ smx_cond_t meas_select_cond;
+ smx_mutex_t meas_select_mutex;
} s_gras_trp_procdata_t,*gras_trp_procdata_t;
/* make sure this socket will reach someone */
if (!(peer=SIMIX_host_get_by_name(sock->peer_name)))
- THROW1(mismatch_error,0,"Can't connect to %s: no such host.\n",sock->peer_name);
+ THROW1(mismatch_error,0,
+ "Can't connect to %s: no such host.\n",sock->peer_name);
if (!(hd=(gras_hostdata_t *)SIMIX_host_get_data(peer)))
THROW1(mismatch_error,0,
if (pr.meas && !sock->meas) {
THROW2(mismatch_error,0,
"can't connect to %s:%d in regular mode, the process listen "
- "in meas mode on this port",sock->peer_name,sock->peer_port);
+ "in measurement mode on this port",sock->peer_name,sock->peer_port);
}
if (!pr.meas && sock->meas) {
THROW2(mismatch_error,0,
- "can't connect to %s:%d in meas mode, the process listen "
+ "can't connect to %s:%d in measurement mode, the process listen "
"in regular mode on this port",sock->peer_name,sock->peer_port);
}
/* create the socket */
data = xbt_new(gras_trp_sg_sock_data_t,1);
data->from_process = SIMIX_process_self();
- data->to_process = pr.process;
+ data->to_process = pr.process;
data->to_host = peer;
- /* initialize mutex and condition of the socket */
- data->mutex = SIMIX_mutex_init();
- data->cond = SIMIX_cond_init();
- data->to_socket = pr.socket;
+ /* initialize mutex and condition of the socket */
+ data->mutex = SIMIX_mutex_init();
+ data->cond = SIMIX_cond_init();
+ data->to_socket = pr.socket;
sock->data = data;
sock->incoming = 1;
DEBUG5("%s (PID %d) connects in %s mode to %s:%d",
- SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
- sock->meas?"meas":"regular",
+ SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
+ sock->meas?"meas":"regular",
sock->peer_name,sock->peer_port);
}
"can't listen on address %s:%d: port already in use.",
host,sock->port);
- pr.port = sock->port;
- pr.meas = sock->meas;
- pr.socket = sock;
- pr.process = SIMIX_process_self();
+ pr.port = sock->port;
+ pr.meas = sock->meas;
+ pr.socket = sock;
+ pr.process = SIMIX_process_self();
xbt_dynar_push(hd->ports,&pr);
/* Create the socket */
data = xbt_new(gras_trp_sg_sock_data_t,1);
data->from_process = SIMIX_process_self();
data->to_process = NULL;
- data->to_host = SIMIX_host_self();
+ data->to_host = SIMIX_host_self();
- data->cond = SIMIX_cond_init();
- data->mutex = SIMIX_mutex_init();
+ data->cond = SIMIX_cond_init();
+ data->mutex = SIMIX_mutex_init();
sock->data = data;
VERB6("'%s' (%d) ears on %s:%d%s (%p)",
- SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
- host,sock->port,sock->meas? " (mode meas)":"",sock);
+ SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
+ host,sock->port,sock->meas? " (mode meas)":"",sock);
}
xbt_assert0(hd,"Please run gras_process_init on each process");
if (sock->data) {
- SIMIX_cond_destroy(((gras_trp_sg_sock_data_t*)sock->data)->cond);
- SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t*)sock->data)->mutex);
- free(sock->data);
- }
+ SIMIX_cond_destroy(((gras_trp_sg_sock_data_t*)sock->data)->cond);
+ SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t*)sock->data)->mutex);
+ free(sock->data);
+ }
if (sock->incoming && !sock->outgoing && sock->port >= 0) {
/* server mode socket. Unregister it from 'OS' tables */
xbt_dynar_foreach(hd->ports, cpt, pr) {
DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports));
if (pr.port == sock->port) {
- xbt_dynar_cursor_rm(hd->ports, &cpt);
- XBT_OUT;
- return;
+ xbt_dynar_cursor_rm(hd->ports, &cpt);
+ XBT_OUT;
+ return;
}
}
WARN2("socket_close called on the unknown incoming socket %p (port=%d)",
sock,sock->port);
}
- XBT_OUT;
-
+ XBT_OUT;
}
typedef struct {
char name[256];
static unsigned int count=0;
- smx_action_t act; /* simix action */
- gras_trp_sg_sock_data_t *sock_data;
- gras_trp_procdata_t trp_remote_proc;
- gras_msg_procdata_t msg_remote_proc;
- gras_msg_t msg; /* message to send */
+ smx_action_t act; /* simix action */
+ gras_trp_sg_sock_data_t *sock_data;
+ gras_trp_procdata_t trp_remote_proc;
+ gras_msg_procdata_t msg_remote_proc;
+ gras_msg_t msg; /* message to send */
- sock_data = (gras_trp_sg_sock_data_t *)sock->data;
+ sock_data = (gras_trp_sg_sock_data_t *)sock->data;
- xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets");
+ xbt_assert0(sock->meas,
+ "SG chunk exchange shouldn't be used on non-measurement sockets");
- SIMIX_mutex_lock(sock_data->mutex);
+ SIMIX_mutex_lock(sock_data->mutex);
sprintf(name,"Chunk[%d]",count++);
- /*initialize gras message */
- msg = xbt_new(s_gras_msg_t,1);
- msg->expe = sock;
- msg->payl_size=size;
+ /*initialize gras message */
+ msg = xbt_new(s_gras_msg_t,1);
+ msg->expe = sock;
+ msg->payl_size=size;
if (data) {
msg->payl=(void*)xbt_malloc(size);
} else {
msg->payl = NULL;
}
+
+ /* put message on msg_queue */
+ msg_remote_proc = (gras_msg_procdata_t)
+ gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
- /* put message on msg_queue */
- msg_remote_proc = (gras_msg_procdata_t)gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process);
- xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas,msg);
- /* put his socket on the active_socket list */
- trp_remote_proc = (gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
- xbt_fifo_push(trp_remote_proc->active_socket_meas,sock);
- /* wake-up the receiver */
+ xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas,msg);
- SIMIX_cond_signal(trp_remote_proc->cond_meas);
+ /* put his socket on the selectable socket list */
+ trp_remote_proc = (gras_trp_procdata_t)
+ gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
- /* wait for the receiver */
- SIMIX_cond_wait(sock_data->cond,sock_data->mutex);
+ xbt_fifo_push(trp_remote_proc->meas_selectable_sockets,sock);
- /* creates simix action and waits its ends, waits in the sender host condition*/
+ /* wake-up the receiver */
+ SIMIX_cond_signal(trp_remote_proc->meas_select_cond);
+
+ /* wait for the receiver */
+ SIMIX_cond_wait(sock_data->cond,sock_data->mutex);
+
+ /* creates simix action and waits its ends, waits in the sender host
+ condition*/
DEBUG5("send chunk %s from %s to %s:%d (size=%ld)",
name, SIMIX_host_get_name(SIMIX_host_self()),
SIMIX_host_get_name(sock_data->to_host), sock->peer_port,size);
- act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host, name, size, -1);
- SIMIX_register_action_to_condition(act,sock_data->cond);
- SIMIX_register_condition_to_action(act,sock_data->cond);
+ act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host,
+ name, size, -1);
+ SIMIX_register_action_to_condition(act,sock_data->cond);
+ SIMIX_register_condition_to_action(act,sock_data->cond);
- SIMIX_cond_wait(sock_data->cond,sock_data->mutex);
- /* error treatmeant */
+ SIMIX_cond_wait(sock_data->cond,sock_data->mutex);
+ /* error treatmeant (FIXME)*/
- /* cleanup structures */
- SIMIX_action_destroy(act);
+ /* cleanup structures */
+ SIMIX_action_destroy(act);
- SIMIX_mutex_unlock(sock_data->mutex);
+ SIMIX_mutex_unlock(sock_data->mutex);
}
int gras_trp_sg_chunk_recv(gras_socket_t sock,
- char *data,
- unsigned long int size){
- gras_trp_sg_sock_data_t *sock_data;
- gras_trp_sg_sock_data_t *remote_sock_data;
- gras_socket_t remote_socket;
+ char *data,
+ unsigned long int size){
+ gras_trp_sg_sock_data_t *sock_data;
+ gras_trp_sg_sock_data_t *remote_sock_data;
+ gras_socket_t remote_socket;
gras_msg_t msg_got;
- gras_msg_procdata_t msg_procdata = (gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
- gras_trp_procdata_t trp_proc=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
+ gras_msg_procdata_t msg_procdata =
+ (gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
+ gras_trp_procdata_t trp_proc =
+ (gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
- xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets");
+ xbt_assert0(sock->meas,
+ "SG chunk exchange shouldn't be used on non-measurement sockets");
- SIMIX_mutex_lock(trp_proc->mutex_meas);
- if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) {
- SIMIX_cond_wait_timeout(trp_proc->cond_meas,trp_proc->mutex_meas,60);
- }
- if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) {
- SIMIX_mutex_unlock(trp_proc->mutex_meas);
- THROW0(timeout_error,0,"Timeout");
- }
- SIMIX_mutex_unlock(trp_proc->mutex_meas);
-
- remote_socket = xbt_fifo_shift(trp_proc->active_socket_meas);
- remote_sock_data = (gras_trp_sg_sock_data_t *)remote_socket->data;
+ SIMIX_mutex_lock(trp_proc->meas_select_mutex);
+ if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) {
+ SIMIX_cond_wait_timeout(trp_proc->meas_select_cond,
+ trp_proc->meas_select_mutex,60);
+ }
+ if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) {
+ SIMIX_mutex_unlock(trp_proc->meas_select_mutex);
+ THROW0(timeout_error,0,"Timeout");
+ }
+ SIMIX_mutex_unlock(trp_proc->meas_select_mutex);
+
+ remote_socket = xbt_fifo_shift(trp_proc->meas_selectable_sockets);
+ remote_sock_data = (gras_trp_sg_sock_data_t *)remote_socket->data;
msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas);
+
+ sock_data = (gras_trp_sg_sock_data_t *)sock->data;
- sock_data = (gras_trp_sg_sock_data_t *)sock->data;
-
-/* ok, I'm here, you can continue the communication */
- SIMIX_cond_signal(remote_sock_data->cond);
+ /* ok, I'm here, you can continue the communication */
+ SIMIX_cond_signal(remote_sock_data->cond);
- SIMIX_mutex_lock(remote_sock_data->mutex);
-/* wait for communication end */
- SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex);
+ SIMIX_mutex_lock(remote_sock_data->mutex);
+ /* wait for communication end */
+ SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex);
if (msg_got->payl_size != size)
THROW5(mismatch_error,0,
msg_got->payl_size, size,
SIMIX_host_get_name(sock_data->to_host),
SIMIX_host_get_name(SIMIX_host_self()), sock->peer_port);
- if (data) {
+
+ if (data)
memcpy(data,msg_got->payl,size);
- }
- if (msg_got->payl)
- xbt_free(msg_got->payl);
- xbt_free(msg_got);
- SIMIX_mutex_unlock(remote_sock_data->mutex);
- return 0;
+
+ if (msg_got->payl)
+ xbt_free(msg_got->payl);
+
+ xbt_free(msg_got);
+ SIMIX_mutex_unlock(remote_sock_data->mutex);
+ return 0;
}
extern int gras_trp_libdata_id; /* our libdata identifier */
-/* The function that select returned the last time we asked. We need this because the TCP read
- are greedy and try to get as much data in their buffer as possible (to avoid subsequent syscalls).
+/* The function that select returned the last time we asked. We need this
+ because the TCP read are greedy and try to get as much data in their
+ buffer as possible (to avoid subsequent syscalls).
(measurement sockets are not buffered and thus not concerned).
- So, we can get more than one message in one shoot. And when this happens, we have to handle
- the same socket again afterward without select()ing at all.
+ So, we can get more than one message in one shoot. And when this happens,
+ we have to handle the same socket again afterward without select()ing at
+ all.
- Then, this data is not a static of the TCP driver because we want to zero it when
- it gets closed by the user. If not, we use an already freed pointer, which is bad.
+ Then, this data is not a static of the TCP driver because we want to
+ zero it when it gets closed by the user. If not, we use an already freed
+ pointer, which is bad.
- It gets tricky since gras_socket_close is part of the common API, not only the RL one. */
+ It gets tricky since gras_socket_close is part of the common API, not
+ only the RL one. */
extern gras_socket_t _gras_lastly_selected_socket;
/**
int meas :1; /* true if this is an experiment socket instead of messaging */
int recv_ok :1; /* true if it is valid to recv() on the socket (false if it is a file) */
int valid :1; /* false if a select returned that the peer quitted, forcing us to "close" the socket */
- int moredata :1; /* TCP socket use a buffer and read operation get as much data as possible.
- It is possible that several messages are received in one shoot, and select won't catch them afterward again.
- This boolean indicates that this is the case, so that we don't call select in that case.
- Note that measurement sockets are not concerned since they use the TCP interface directly, with no buffer. */
-
- unsigned long int buf_size; /* what to say to the OS. field here to remember it when accepting */
+ int moredata :1; /* TCP socket use a buffer and read operation get as much
+ data as possible. It is possible that several messages
+ are received in one shoot, and select won't catch them
+ afterward again.
+ This boolean indicates that this is the case, so that we
+ don't call select in that case. Note that measurement
+ sockets are not concerned since they use the TCP
+ interface directly, with no buffer. */
+
+ unsigned long int buf_size; /* what to say to the OS.
+ Field here to remember it when accepting */
int sd;
int port; /* port on this side */
void gras_trp_file_setup(gras_trp_plugin_t plug);
void gras_trp_sg_setup(gras_trp_plugin_t plug);
-/*
+/* FIXME: this should be solved by SIMIX
I'm tired of that shit. the select in SG has to create a socket to expeditor
manually do deal with the weirdness of the hostdata, themselves here to deal
hd->refcount++;
}
- trp_pd = (gras_trp_procdata_t)gras_libdata_by_name("gras_trp");
- pd->pid = PID++;
-
- if (SIMIX_process_self() != NULL ) {
- pd->ppid = gras_os_getpid();
- }
- else pd->ppid = -1;
-
- trp_pd->mutex = SIMIX_mutex_init();
- trp_pd->cond = SIMIX_cond_init();
- trp_pd->mutex_meas = SIMIX_mutex_init();
- trp_pd->cond_meas = SIMIX_cond_init();
- trp_pd->active_socket = xbt_fifo_new();
- trp_pd->active_socket_meas = xbt_fifo_new();
-
+ trp_pd = (gras_trp_procdata_t)gras_libdata_by_name("gras_trp");
+ pd->pid = PID++;
+
+ if (SIMIX_process_self() != NULL ) {
+ pd->ppid = gras_os_getpid();
+ }
+ else pd->ppid = -1;
+
+ trp_pd->msg_selectable_sockets = xbt_fifo_new();
+ trp_pd->msg_select_mutex = SIMIX_mutex_init();
+ trp_pd->msg_select_cond = SIMIX_cond_init();
+
+ trp_pd->meas_selectable_sockets = xbt_fifo_new();
+ trp_pd->meas_select_mutex = SIMIX_mutex_init();
+ trp_pd->meas_select_cond = SIMIX_cond_init();
+
VERB2("Creating process '%s' (%d)",
- SIMIX_process_get_name(SIMIX_process_self()),
- gras_os_getpid());
+ SIMIX_process_get_name(SIMIX_process_self()),
+ gras_os_getpid());
}
void
xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_name("gras_trp"))->sockets;
gras_socket_t sock_iter;
int cursor;
- gras_hostdata_t *hd=(gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self());
- gras_procdata_t *pd=(gras_procdata_t*)SIMIX_process_get_data(SIMIX_process_self());
+ gras_hostdata_t *hd=
+ (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self());
+ gras_procdata_t *pd=
+ (gras_procdata_t*)SIMIX_process_get_data(SIMIX_process_self());
+
+ gras_msg_procdata_t msg_pd=
+ (gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
+ gras_trp_procdata_t trp_pd=
+ (gras_trp_procdata_t)gras_libdata_by_name("gras_trp");
- gras_msg_procdata_t msg_pd=(gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
- gras_trp_procdata_t trp_pd=(gras_trp_procdata_t)gras_libdata_by_name("gras_trp");
+ SIMIX_mutex_destroy(trp_pd->msg_select_mutex);
+ SIMIX_cond_destroy(trp_pd->msg_select_cond);
+ xbt_fifo_free(trp_pd->msg_selectable_sockets);
- SIMIX_mutex_destroy(trp_pd->mutex);
- SIMIX_cond_destroy(trp_pd->cond);
- xbt_fifo_free(trp_pd->active_socket);
- SIMIX_mutex_destroy(trp_pd->mutex_meas);
- SIMIX_cond_destroy(trp_pd->cond_meas);
- xbt_fifo_free(trp_pd->active_socket_meas);
+ SIMIX_mutex_destroy(trp_pd->meas_select_mutex);
+ SIMIX_cond_destroy(trp_pd->meas_select_cond);
+ xbt_fifo_free(trp_pd->meas_selectable_sockets);
xbt_assert0(hd,"Run gras_process_init (ie, gras_init)!!");
if (xbt_dynar_length(msg_pd->msg_queue))
WARN1("process %d terminated, but some messages are still queued",
gras_os_getpid());
-
- /* if each process has its sockets list, we need to close them when the process finish */
- xbt_dynar_foreach(sockets,cursor,sock_iter) {
- VERB1("Closing the socket %p left open on exit. Maybe a socket leak?",
- sock_iter);
- gras_socket_close(sock_iter);
- }
+
+ /* if each process has its sockets list, we need to close them when the
+ process finish */
+ xbt_dynar_foreach(sockets,cursor,sock_iter) {
+ VERB1("Closing the socket %p left open on exit. Maybe a socket leak?",
+ sock_iter);
+ gras_socket_close(sock_iter);
+ }
if ( ! --(hd->refcount)) {
xbt_dynar_free(&hd->ports);
free(hd);
#include "gras/Transport/transport_private.h"
typedef struct {
- int port; /* list of ports used by a server socket */
- int meas; /* (boolean) the channel is for measurements or for messages */
- smx_process_t process;
- gras_socket_t socket;
+ int port; /* list of ports used by a server socket */
+ int meas; /* (boolean) the channel is for measurements or for messages */
+ smx_process_t process;
+ gras_socket_t socket;
} gras_sg_portrec_t;
/* Data for each host */
/* data for each socket (FIXME: find a better location for that)*/
typedef struct {
- //int from_PID; /* process which sent this message */
- //int to_PID; /* process to which this message is destinated */
- smx_process_t from_process;
- smx_process_t to_process;
+ smx_process_t from_process;
+ smx_process_t to_process;
- smx_host_t to_host; /* Who's on other side */
-
- smx_cond_t cond;
- smx_mutex_t mutex;
- gras_socket_t to_socket;
+ smx_host_t to_host; /* Who's on other side */
+
+ smx_cond_t cond;
+ smx_mutex_t mutex;
+ gras_socket_t to_socket;
} gras_trp_sg_sock_data_t;