gras_msgtype_t type;
unsigned long int ID;
void *payl;
- void *comm; /* simix_comm in SG */
int payl_size;
} s_gras_msg_t, *gras_msg_t;
xbt_thread_t listener;
} s_gras_msg_listener_t;
-static void do_close_socket(gras_socket_t sock) {
- if (sock->plugin->socket_close)
- (*sock->plugin->socket_close) (sock);
- /* free the memory */
- if (sock->peer_name)
- free(sock->peer_name);
- free(sock);
-}
static void listener_function(void *p)
{
gras_msg_listener_t me = (gras_msg_listener_t) p;
/* empty the list of sockets to trash */
TRY {
while (1) {
- gras_socket_t sock;
+ int sock;
xbt_queue_shift_timed(me->socks_to_close, &sock, 0);
- do_close_socket(sock);
+ if (tcp_close(sock) < 0) {
+ WARN3("error while closing tcp socket %d: %d (%s)\n",
+ sock, sock_errno, sock_errstr(sock_errno));
+ }
}
}
CATCH(e) {
}
}
-void gras_msg_listener_close_socket(gras_socket_t sock)
+void gras_msg_listener_close_socket(int sd)
{
gras_procdata_t *pd = gras_procdata_get();
if (pd->listener) {
- xbt_queue_push(pd->listener->socks_to_close, &sock);
+ xbt_queue_push(pd->listener->socks_to_close, &sd);
gras_msg_listener_awake();
} else {
/* do it myself */
- do_close_socket(sock);
+ tcp_close(sd);
}
}
void gras_msg_send_namev(gras_socket_t sock,
const char *namev, void *payload);
void gras_msg_listener_awake(void);
-void gras_msg_listener_close_socket(gras_socket_t sock);
+void gras_msg_listener_close_socket(int sd);
#define GRAS_PROTOCOL_VERSION '\1';
unsigned long int ID,
gras_msgtype_t msgtype, void *payload)
{
+
+ smx_action_t act; /* simix action */
gras_trp_sg_sock_data_t *sock_data;
+ gras_hostdata_t *hd;
+ gras_trp_procdata_t trp_remote_proc;
+ gras_msg_procdata_t msg_remote_proc;
gras_msg_t msg; /* message to send */
int whole_payload_size = 0; /* msg->payload_size is used to memcpy the payload.
This is used to report the load onto the simulator. It also counts the size of pointed stuff */
sock_data = (gras_trp_sg_sock_data_t *) sock->data;
+ hd = (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
+
xbt_assert1(!gras_socket_is_meas(sock),
"Asked to send a message on the measurement socket %p", sock);
msg->type = msgtype;
msg->ID = ID;
if (kind == e_gras_msg_kind_rpcerror) {
- /* error on remote host, careful, payload is an exception */
+ /* error on remote host, carfull, payload is an exception */
msg->payl_size = gras_datadesc_size(gras_datadesc_by_name("ex_t"));
msg->payl = xbt_malloc(msg->payl_size);
whole_payload_size = gras_datadesc_memcpy(gras_datadesc_by_name("ex_t"),
payload, msg->payl);
}
+ /* put the selectable socket on the queue */
+ trp_remote_proc = (gras_trp_procdata_t)
+ gras_libdata_by_name_from_remote("gras_trp", sock_data->to_process);
+
+ xbt_queue_push(trp_remote_proc->msg_selectable_sockets, &sock);
+
+ /* put message on msg_queue */
+ msg_remote_proc = (gras_msg_procdata_t)
+ gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
+ xbt_fifo_push(msg_remote_proc->msg_to_receive_queue, msg);
+
+ /* wait for the receiver */
+ SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
+ /* creates simix action and waits its ends, waits in the sender host
+ condition */
+ act = SIMIX_action_communicate(SIMIX_host_self(),
+ sock_data->to_host, msgtype->name,
+ (double) whole_payload_size, -1);
+ SIMIX_register_action_to_condition(act, sock_data->cond);
VERB5("Sending to %s(%s) a message type '%s' kind '%s' ID %lu",
- sock->peer_name,sock->peer_proc,
+ SIMIX_host_get_name(sock_data->to_host),
+ SIMIX_process_get_name(sock_data->to_process),
msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID);
- SIMIX_network_send(sock_data->rdv,whole_payload_size,-1.,-1.,msg,sizeof(s_gras_msg_t),(smx_comm_t*)&(msg->comm),&msg);
+
+ SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
+ SIMIX_unregister_action_to_condition(act, sock_data->cond);
+ /* error treatmeant (FIXME) */
+
+ /* cleanup structures */
+ SIMIX_action_destroy(act);
+ SIMIX_mutex_unlock(sock_data->mutex);
VERB0("Message sent");
+
}
/*
* receive the next message on the given socket.
*/
-void gras_msg_recv(gras_socket_t sock, gras_msg_t msg) {
- gras_trp_procdata_t pd =
- (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
+void gras_msg_recv(gras_socket_t sock, gras_msg_t msg)
+{
+
gras_trp_sg_sock_data_t *sock_data;
+ gras_trp_sg_sock_data_t *remote_sock_data;
+ gras_hostdata_t *remote_hd;
+ gras_msg_t msg_got;
+ gras_msg_procdata_t msg_procdata =
+ (gras_msg_procdata_t) gras_libdata_by_name("gras_msg");
xbt_assert1(!gras_socket_is_meas(sock),
"Asked to receive a message on the measurement socket %p",
xbt_assert0(msg, "msg is an out parameter of gras_msg_recv...");
sock_data = (gras_trp_sg_sock_data_t *) sock->data;
-
- /* The message was already received while emulating the select, so simply copy it here */
- memcpy(msg,&(sock_data->ongoing_msg),sizeof(s_gras_msg_t));
- msg->expe = sock;
- VERB1("Using %p as a msg",&(sock_data->ongoing_msg));
- VERB5("Received a message type '%s' kind '%s' ID %lu from %s(%s)",
- msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID,
- sock->peer_name,sock->peer_proc);
-
- /* Recreate another comm object to replace the one which just terminated */
- int rank = xbt_dynar_search(pd->sockets,&sock);
- xbt_assert0(rank>=0,"Socket not found in my array");
- sock_data->ongoing_msg_size = sizeof(s_gras_msg_t);
- smx_comm_t comm = SIMIX_network_irecv(sock_data->rdv,&(sock_data->ongoing_msg),&(sock_data->ongoing_msg_size));
- xbt_dynar_set(pd->comms,rank,&comm);
-
-#if 0 /* KILLME */
- SIMIX_network_recv(sock_data->rdv,-1.,&msg_got,NULL,&comm);
-
-
remote_sock_data =
((gras_trp_sg_sock_data_t *) sock->data)->to_socket->data;
+ DEBUG3("Remote host %s, Remote Port: %d Local port %d",
+ SIMIX_host_get_name(sock_data->to_host), sock->peer_port,
+ sock->port);
remote_hd = (gras_hostdata_t *) SIMIX_host_get_data(sock_data->to_host);
if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0) {
SIMIX_cond_signal(remote_sock_data->cond);
/* wait for communication end */
- INFO2("Wait communication (from %s) termination on %p",sock->peer_name,sock_data->cond);
-
SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex);
msg_got->expe = msg->expe;
memcpy(msg, msg_got, sizeof(s_gras_msg_t));
xbt_free(msg_got);
SIMIX_mutex_unlock(remote_sock_data->mutex);
-#endif
+
+ VERB3("Received a message type '%s' kind '%s' ID %lu", // from %s",
+ msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID);
}
/* Got a socket to serve */
ready--;
- if (sock_iter->is_master && sock_iter->plugin->socket_accept) {
+ if (sock_iter->accepting && sock_iter->plugin->socket_accept) {
/* not a socket but an ear. accept on it and serve next socket */
gras_socket_t accepted = NULL;
#include "xbt/ex.h"
#include "gras/Transport/transport_private.h"
#include "gras/Virtu/virtu_sg.h"
-#include "simix/private.h"
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_trp);
*
* if timeout>0 and no message there, wait at most that amount of time before giving up.
*/
-gras_socket_t gras_trp_select(double timeout) {
- static int warned=0;
- if (timeout>=0 && !warned) {
- warned=1;
- WARN0("Timed select not implemented in SG. Switching to blocking select");
- }
- gras_trp_procdata_t pd =
- (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
-
- gras_socket_t sock_iter, active_socket;
- gras_trp_sg_sock_data_t *active_socket_data;
- smx_comm_t comm;
- unsigned int cursor;
-
-
- /* FIXME: make sure that the ongoing comm is canceled&destroyed when the corresponding socket is closed */
- xbt_assert(xbt_dynar_length(pd->sockets)==xbt_dynar_length(pd->comms));
-
- /* Wait for the first terminating comm object */
- int rank = SIMIX_network_waitany(pd->comms);
-
- /* Don't wait on this socket until the comm object is recreated by gras_msg_recv */
- comm = NULL;
- xbt_dynar_set(pd->comms,rank,&comm);
-
- /* Ok, got something. Open a socket back to the expeditor */
- active_socket = xbt_dynar_get_as(pd->sockets,rank,gras_socket_t);
- active_socket_data = (gras_trp_sg_sock_data_t *) active_socket->data;
-
- /* Try to reuse an already opened socket to that expeditor */
- DEBUG1("Open sockets size %lu", xbt_dynar_length(pd->sockets));
- xbt_dynar_foreach(pd->sockets, cursor, sock_iter) {
- gras_trp_sg_sock_data_t *sock_data;
- DEBUG1("Consider %p as outgoing socket to expeditor", sock_iter);
-
- if (sock_iter->meas || !sock_iter->outgoing)
- continue;
- sock_data = ((gras_trp_sg_sock_data_t *) sock_iter->data);
-
- if ((sock_data->to_socket == active_socket) &&
- (sock_data->to_host ==
- SIMIX_process_get_host(active_socket_data->from_process))) {
- xbt_dynar_cursor_unlock(pd->sockets);
- return sock_iter;
- }
- }
-
- /* Socket to expeditor not created yet */
- DEBUG0("Create a socket to the expeditor");
-
-
-
- return sock_iter;
-
-#if 0 /* KILLME */
+gras_socket_t gras_trp_select(double timeout)
+{
gras_socket_t res;
gras_trp_procdata_t pd =
(gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
res->incoming = 1;
res->outgoing = 1;
- res->is_master = 0;
+ res->accepting = 0;
res->sd = -1;
res->port = -1;
SIMIX_process_get_name(sockdata->to_process), res->port);
return res;
-#endif
}
sock->incoming = incoming ? 1 : 0;
sock->outgoing = incoming ? 0 : 1;
- sock->is_master = incoming ? 1 : 0;
+ sock->accepting = incoming ? 1 : 0;
sock->meas = 0;
sock->recvd = 0;
sock->valid = 1;
trp->socket_server(trp, sock);
DEBUG3("in=%c out=%c accept=%c",
sock->incoming ? 'y' : 'n',
- sock->outgoing ? 'y' : 'n', sock->is_master ? 'y' : 'n');
+ sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
} CATCH(e) {
free(sock);
(*trp->socket_client) (trp, sock);
DEBUG3("in=%c out=%c accept=%c",
sock->incoming ? 'y' : 'n',
- sock->outgoing ? 'y' : 'n', sock->is_master ? 'y' : 'n');
+ sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
} CATCH(e) {
free(sock);
RETHROW;
/** \brief Close socket */
void gras_socket_close(gras_socket_t sock)
{
- xbt_dynar_t my_sockets =
+ xbt_dynar_t sockets =
((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets;
gras_socket_t sock_iter = NULL;
unsigned int cursor;
}
/* FIXME: Issue an event when the socket is closed */
- DEBUG1("sockets pointer before %p", my_sockets);
+ DEBUG1("sockets pointer before %p", sockets);
if (sock) {
/* FIXME: Cannot get the dynar mutex, because it can be already locked */
// _xbt_dynar_foreach(sockets,cursor,sock_iter) {
- for (cursor = 0; cursor < xbt_dynar_length(my_sockets); cursor++) {
- _xbt_dynar_cursor_get(my_sockets, cursor, &sock_iter);
+ for (cursor = 0; cursor < xbt_dynar_length(sockets); cursor++) {
+ _xbt_dynar_cursor_get(sockets, cursor, &sock_iter);
if (sock == sock_iter) {
DEBUG2("remove sock cursor %d dize %lu\n", cursor,
- xbt_dynar_length(my_sockets));
- xbt_dynar_cursor_rm(my_sockets, &cursor);
- gras_msg_listener_close_socket(sock);
+ xbt_dynar_length(sockets));
+ xbt_dynar_cursor_rm(sockets, &cursor);
+ if (sock->plugin->socket_close)
+ (*sock->plugin->socket_close) (sock);
+
+ /* free the memory */
+ if (sock->peer_name)
+ free(sock->peer_name);
+ free(sock);
XBT_OUT;
return;
}
xbt_assert0(peer->meas,
"No need to accept on non-measurement sockets (it's automatic)");
- if (!peer->is_master) {
+ if (!peer->accepting) {
/* nothing to accept here (must be in SG) */
/* BUG: FIXME: this is BAD! it makes tricky to free the accepted socket */
return peer;
res->name = xbt_strdup("gras_trp");
res->name_len = 0;
res->sockets = xbt_dynar_new_sync(sizeof(gras_socket_t *), NULL);
- res->comms = xbt_dynar_new(sizeof(void*),NULL); /* stores some smx_comm_t in SG (not used in RL) */
res->myport = 0;
return (void *) res;
gras_trp_procdata_t res = (gras_trp_procdata_t) data;
xbt_dynar_free(&(res->sockets));
- xbt_dynar_free(&(res->comms));
free(res->name);
free(res);
}
int myport; /* Port on which I listen myself */
xbt_dynar_t sockets; /* all sockets known to this process */
- xbt_dynar_t comms; /* SG cruft: the ongoing communications */
- xbt_dynar_t sockets_to_close; /* The listener is in charge of closing the sockets */
+
+ /* SG only elements. In RL, they are part of the OS ;) */
+
+ /* List of sockets ready to be select()ed */
+ xbt_queue_t msg_selectable_sockets; /* regular sockets */
+ xbt_queue_t meas_selectable_sockets; /* measurement ones */
} s_gras_trp_procdata_t, *gras_trp_procdata_t;
path,
res->sd,
res->incoming ? 'y' : 'n',
- res->outgoing ? 'y' : 'n', res->is_master ? 'y' : 'n');
+ res->outgoing ? 'y' : 'n', res->accepting ? 'y' : 'n');
xbt_dynar_push(((gras_trp_procdata_t)
gras_libdata_by_id(gras_trp_libdata_id))->sockets, &res);
DEBUG4("sd=%d in=%c out=%c accept=%c",
res->sd,
res->incoming ? 'y' : 'n',
- res->outgoing ? 'y' : 'n', res->is_master ? 'y' : 'n');
+ res->outgoing ? 'y' : 'n', res->accepting ? 'y' : 'n');
xbt_dynar_push(((gras_trp_procdata_t)
gras_libdata_by_id(gras_trp_libdata_id))->sockets, &res);
*** Prototypes
***/
+/* retrieve the port record associated to a numerical port on an host */
+static void find_port(gras_hostdata_t * hd, int port,
+ gras_sg_portrec_t * hpd);
+
void gras_trp_sg_socket_client(gras_trp_plugin_t self,
/* OUT */ gras_socket_t sock);
*** Specific plugin part
***/
typedef struct {
- xbt_dict_t sockets; /* all known sockets */
-} s_gras_trp_sg_plug_data_t,*gras_trp_sg_plug_data_t;
+ int placeholder; /* nothing plugin specific so far */
+} gras_trp_sg_plug_data_t;
/***
*** Code
***/
+static void find_port(gras_hostdata_t * hd, int port, gras_sg_portrec_t * hpd)
+{
+ unsigned int cpt;
+ gras_sg_portrec_t pr;
-static void gras_trp_sg_exit(gras_trp_plugin_t plug){
- gras_trp_sg_plug_data_t mydata = (gras_trp_sg_plug_data_t) plug->data;
- xbt_dict_free(&(mydata->sockets));
- xbt_free(plug->data);
+ xbt_assert0(hd, "Please run gras_process_init on each process");
+
+ xbt_dynar_foreach(hd->ports, cpt, pr) {
+ if (pr.port == port) {
+ memcpy(hpd, &pr, sizeof(gras_sg_portrec_t));
+ return;
+ }
+ }
+ THROW1(mismatch_error, 0, "Unable to find any portrec for port #%d", port);
}
+
+
void gras_trp_sg_setup(gras_trp_plugin_t plug)
{
- gras_trp_sg_plug_data_t data = xbt_new(s_gras_trp_sg_plug_data_t, 1);
- plug->data = data;
- data->sockets = xbt_dict_new();
+ gras_trp_sg_plug_data_t *data = xbt_new(gras_trp_sg_plug_data_t, 1);
- plug->exit = gras_trp_sg_exit;
+ plug->data = data;
plug->socket_client = gras_trp_sg_socket_client;
plug->socket_server = gras_trp_sg_socket_server;
}
void gras_trp_sg_socket_client(gras_trp_plugin_t self,
- /* OUT */ gras_socket_t sock) {
+ /* OUT */ gras_socket_t sock)
+{
+ xbt_ex_t e;
smx_host_t peer;
+ gras_hostdata_t *hd;
gras_trp_sg_sock_data_t *data;
- gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
-
+ gras_sg_portrec_t pr;
+ /* make sure this socket will reach someone */
if (!(peer = SIMIX_host_get_by_name(sock->peer_name)))
THROW1(mismatch_error, 0,
- "Can't connect to %s: no such host", sock->peer_name);
+ "Can't connect to %s: no such host.\n", sock->peer_name);
- /* make sure this socket will reach someone */
- xbt_dict_t all_sockets = ((gras_trp_sg_plug_data_t)self->data)->sockets;
- char *sock_name=bprintf("%s:%d",sock->peer_name,sock->peer_port);
- gras_socket_t server = xbt_dict_get_or_null(all_sockets,sock_name);
- free(sock_name);
+ if (!(hd = (gras_hostdata_t *) SIMIX_host_get_data(peer)))
+ THROW1(mismatch_error, 0,
+ "can't connect to %s: no process on this host", sock->peer_name);
- if (!server)
- THROW2(mismatch_error, 0,
- "can't connect to %s:%d, no process listen on this port",
- sock->peer_name, sock->peer_port);
+ TRY {
+ find_port(hd, sock->peer_port, &pr);
+ }
+ CATCH(e) {
+ if (e.category == mismatch_error) {
+ xbt_ex_free(e);
+ THROW2(mismatch_error, 0,
+ "can't connect to %s:%d, no process listen on this port",
+ sock->peer_name, sock->peer_port);
+ }
+ RETHROW;
+ }
- if (server->meas && !sock->meas) {
+ if (pr.meas && !sock->meas) {
THROW2(mismatch_error, 0,
"can't connect to %s:%d in regular mode, the process listen "
"in measurement mode on this port", sock->peer_name,
sock->peer_port);
}
- if (!server->meas && sock->meas) {
+ if (!pr.meas && sock->meas) {
THROW2(mismatch_error, 0,
"can't connect to %s:%d in measurement mode, the process listen "
"in regular mode on this port", sock->peer_name, sock->peer_port);
}
/* create the socket */
- data = xbt_new0(gras_trp_sg_sock_data_t, 1);
- data->rdv = ((gras_trp_sg_sock_data_t *)server->data)->rdv;
+ data = xbt_new(gras_trp_sg_sock_data_t, 1);
data->from_process = SIMIX_process_self();
+ data->to_process = pr.process;
+ data->to_host = peer;
+
+ /* initialize mutex and condition of the socket */
+ data->mutex = SIMIX_mutex_init();
+ data->cond = SIMIX_cond_init();
+ data->to_socket = pr.socket;
sock->data = data;
sock->incoming = 1;
- /* Create a smx comm object about this socket */
- data->ongoing_msg_size = sizeof(s_gras_msg_t);
- smx_comm_t comm = SIMIX_network_irecv(data->rdv,&(data->ongoing_msg),&(data->ongoing_msg_size));
- xbt_dynar_push(pd->comms,&comm);
-
DEBUG5("%s (PID %d) connects in %s mode to %s:%d",
SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
sock->meas ? "meas" : "regular", sock->peer_name, sock->peer_port);
}
-void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock) {
- gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
- xbt_dict_t all_sockets = ((gras_trp_sg_plug_data_t)self->data)->sockets;
+void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock)
+{
- /* Make sure that this socket was not opened so far */
- char *sock_name=bprintf("%s:%d",gras_os_myname(),sock->port);
- gras_socket_t old = xbt_dict_get_or_null(all_sockets,sock_name);
- if (old)
- THROW1(mismatch_error, 0,
- "can't listen on address %s: port already in use.",
- sock_name);
+ gras_hostdata_t *hd =
+ (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
+ gras_sg_portrec_t pr;
+ gras_trp_sg_sock_data_t *data;
+ volatile int found;
+ const char *host = SIMIX_host_get_name(SIMIX_host_self());
- /* Create the data associated to the socket */
- gras_trp_sg_sock_data_t *data = xbt_new0(gras_trp_sg_sock_data_t, 1);
- data->rdv = SIMIX_rdv_create(sock_name);
- data->from_process = SIMIX_process_self();
- SIMIX_rdv_set_data(data->rdv,sock);
+ xbt_ex_t e;
- sock->is_master = 1;
- sock->incoming = 1;
+ xbt_assert0(hd, "Please run gras_process_init on each process");
- sock->data = data;
+ sock->accepting = 0; /* no such nuisance in SG */
+ found = 0;
+ TRY {
+ find_port(hd, sock->port, &pr);
+ found = 1;
+ } CATCH(e) {
+ if (e.category == mismatch_error)
+ xbt_ex_free(e);
+ else
+ RETHROW;
+ }
+
+ if (found)
+ THROW2(mismatch_error, 0,
+ "can't listen on address %s:%d: port already in use.",
+ host, sock->port);
+
+ pr.port = sock->port;
+ pr.meas = sock->meas;
+ pr.socket = sock;
+ pr.process = SIMIX_process_self();
+ xbt_dynar_push(hd->ports, &pr);
- /* Register the socket to the set of sockets known simulation-wide */
- xbt_dict_set(all_sockets,sock_name,sock,NULL); /* FIXME: add a function to raise a warning at simulation end for non-closed sockets */
+ /* Create the socket */
+ data = xbt_new(gras_trp_sg_sock_data_t, 1);
+ data->from_process = SIMIX_process_self();
+ data->to_process = NULL;
+ data->to_host = SIMIX_host_self();
- /* Create a smx comm object about this socket */
- data->ongoing_msg_size = sizeof(s_gras_msg_t);
- smx_comm_t comm = SIMIX_network_irecv(data->rdv,&(data->ongoing_msg),&(data->ongoing_msg_size));
- INFO2("irecv comm %p onto %p",comm,&(data->ongoing_msg));
- xbt_dynar_push(pd->comms,&comm);
+ data->cond = SIMIX_cond_init();
+ data->mutex = SIMIX_mutex_init();
+
+ sock->data = data;
VERB6("'%s' (%d) ears on %s:%d%s (%p)",
SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
- gras_os_myname(), sock->port, sock->meas ? " (mode meas)" : "", sock);
- free(sock_name);
+ host, sock->port, sock->meas ? " (mode meas)" : "", sock);
+
}
-void gras_trp_sg_socket_close(gras_socket_t sock) {
- gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
- if (sock->is_master) {
- /* server mode socket. Unregister it from 'OS' tables */
- char *sock_name=bprintf("%s:%d",gras_os_myname(),sock->port);
+void gras_trp_sg_socket_close(gras_socket_t sock)
+{
+ gras_hostdata_t *hd =
+ (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
+ unsigned int cpt;
+ gras_sg_portrec_t pr;
- xbt_dict_t sockets = ((gras_trp_sg_plug_data_t)sock->plugin->data)->sockets;
- gras_socket_t old = xbt_dict_get_or_null(sockets,sock_name);
- if (!old)
- WARN2("socket_close called on the unknown server socket %p (port=%d)",
- sock, sock->port);
- xbt_dict_remove(sockets,sock_name);
- free(sock_name);
+ XBT_IN1(" (sock=%p)", sock);
- }
+ if (!sock)
+ return;
+
+ xbt_assert0(hd, "Please run gras_process_init on each process");
if (sock->data) {
- SIMIX_rdv_destroy(((gras_trp_sg_sock_data_t *) sock->data)->rdv);
+ SIMIX_cond_destroy(((gras_trp_sg_sock_data_t *) sock->data)->cond);
+ SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t *) sock->data)->mutex);
free(sock->data);
}
- xbt_dynar_push(pd->sockets_to_close,&sock);
- gras_msg_listener_awake();
-
+ if (sock->incoming && !sock->outgoing && sock->port >= 0) {
+ /* server mode socket. Unregister it from 'OS' tables */
+ xbt_dynar_foreach(hd->ports, cpt, pr) {
+ DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports));
+ if (pr.port == sock->port) {
+ xbt_dynar_cursor_rm(hd->ports, &cpt);
+ XBT_OUT;
+ return;
+ }
+ }
+ WARN2("socket_close called on the unknown incoming socket %p (port=%d)",
+ sock, sock->port);
+ }
XBT_OUT;
}
char name[256];
static unsigned int count = 0;
+ smx_action_t act; /* simix action */
gras_trp_sg_sock_data_t *sock_data;
+ gras_trp_procdata_t trp_remote_proc;
+ gras_msg_procdata_t msg_remote_proc;
gras_msg_t msg; /* message to send */
sock_data = (gras_trp_sg_sock_data_t *) sock->data;
xbt_assert0(sock->meas,
"SG chunk exchange shouldn't be used on non-measurement sockets");
+ SIMIX_mutex_lock(sock_data->mutex);
sprintf(name, "Chunk[%d]", count++);
/*initialize gras message */
msg = xbt_new(s_gras_msg_t, 1);
} else {
msg->payl = NULL;
}
- SIMIX_network_send(sock_data->rdv,size,-1.,-1.,&msg,sizeof(msg),(smx_comm_t*)&(msg->comm),msg);
+
+
+ /* put his socket on the selectable socket queue */
+ trp_remote_proc = (gras_trp_procdata_t)
+ gras_libdata_by_name_from_remote("gras_trp", sock_data->to_process);
+ xbt_queue_push(trp_remote_proc->meas_selectable_sockets, &sock);
+
+ /* put message on msg_queue */
+ msg_remote_proc = (gras_msg_procdata_t)
+ gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
+
+ xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas, msg);
+
+ /* wait for the receiver */
+ SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
+
+ /* creates simix action and waits its ends, waits in the sender host
+ condition */
+ DEBUG5("send chunk %s from %s to %s:%d (size=%ld)",
+ name, SIMIX_host_get_name(SIMIX_host_self()),
+ SIMIX_host_get_name(sock_data->to_host), sock->peer_port, size);
+
+ act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host,
+ name, size, -1);
+ SIMIX_register_action_to_condition(act, sock_data->cond);
+ SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
+ SIMIX_unregister_action_to_condition(act, sock_data->cond);
+ /* error treatmeant (FIXME) */
+
+ /* cleanup structures */
+ SIMIX_action_destroy(act);
+
+ SIMIX_mutex_unlock(sock_data->mutex);
}
int gras_trp_sg_chunk_recv(gras_socket_t sock,
char *data, unsigned long int size)
{
gras_trp_sg_sock_data_t *sock_data;
+ gras_trp_sg_sock_data_t *remote_sock_data;
+ gras_socket_t remote_socket = NULL;
gras_msg_t msg_got;
- smx_comm_t comm;
+ gras_msg_procdata_t msg_procdata =
+ (gras_msg_procdata_t) gras_libdata_by_name("gras_msg");
+ gras_trp_procdata_t trp_proc =
+ (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
xbt_assert0(sock->meas,
"SG chunk exchange shouldn't be used on non-measurement sockets");
+ xbt_queue_shift_timed(trp_proc->meas_selectable_sockets,
+ &remote_socket, 60);
+
+ if (remote_socket == NULL) {
+ THROW0(timeout_error, 0, "Timeout");
+ }
+
+ remote_sock_data = (gras_trp_sg_sock_data_t *) remote_socket->data;
+ msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas);
sock_data = (gras_trp_sg_sock_data_t *) sock->data;
- SIMIX_network_recv(sock_data->rdv,-1.,&msg_got,NULL,&comm);
+ /* ok, I'm here, you can continue the communication */
+ SIMIX_cond_signal(remote_sock_data->cond);
+
+ SIMIX_mutex_lock(remote_sock_data->mutex);
+ /* wait for communication end */
+ SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex);
if (msg_got->payl_size != size)
THROW5(mismatch_error, 0,
xbt_free(msg_got->payl);
xbt_free(msg_got);
+ SIMIX_mutex_unlock(remote_sock_data->mutex);
return 0;
}
res->plugin = sock->plugin;
res->incoming = sock->incoming;
res->outgoing = sock->outgoing;
- res->is_master = 0;
+ res->accepting = 0;
res->sd = sd;
res->port = -1;
VERB1("close tcp connection %d", sock->sd);
- if (tcp_close(sock->sd) < 0) {
- WARN3("error while closing tcp socket %d: %d (%s)\n",
- sock->sd, sock_errno, sock_errstr(sock_errno));
- }
+ /* ask the listener to close the socket */
+ gras_msg_listener_close_socket(sock->sd);
}
/************************************/
int incoming:1; /* true if we can read from this sock */
int outgoing:1; /* true if we can write on this sock */
- int is_master:1; /* true if master incoming sock */
+ int accepting:1; /* true if master incoming sock in tcp */
int meas:1; /* true if this is an experiment socket instead of messaging */
int valid:1; /* false if a select returned that the peer quitted, forcing us to "close" the socket */
int moredata:1; /* TCP socket use a buffer and read operation get as much
}
return res;
}
-void *gras_libdata_by_id(int id) {
- return gras_libdata_by_id_from_procdata(id,gras_procdata_get());
-}
-void *gras_libdata_by_id_from_procdata(int id,gras_procdata_t *pd) {
+
+void *gras_libdata_by_id(int id)
+{
+ gras_procdata_t *pd = gras_procdata_get();
if (xbt_set_length(pd->libdata) < xbt_dynar_length(_gras_procdata_fabrics)) {
/* Damn, some new modules were added since procdata_init(). Amok? */
/* Get 'em all */
/* First process on this host */
hd = xbt_new(gras_hostdata_t, 1);
hd->refcount = 1;
+ hd->ports = xbt_dynar_new(sizeof(gras_sg_portrec_t), NULL);
SIMIX_host_set_data(SIMIX_host_self(), (void *) hd);
} else {
hd->refcount++;
} else
pd->ppid = -1;
- trp_pd->sockets_to_close = xbt_dynar_new(sizeof(gras_socket_t),NULL);
+ trp_pd->msg_selectable_sockets = xbt_queue_new(0, sizeof(gras_socket_t));
+
+ trp_pd->meas_selectable_sockets = xbt_queue_new(0, sizeof(gras_socket_t));
VERB2("Creating process '%s' (%d)",
SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid());
gras_trp_procdata_t trp_pd =
(gras_trp_procdata_t) gras_libdata_by_name("gras_trp");
- xbt_dynar_free(&trp_pd->sockets_to_close);
+ xbt_queue_free(&trp_pd->msg_selectable_sockets);
+
+ xbt_queue_free(&trp_pd->meas_selectable_sockets);
xbt_assert0(hd, "Run gras_process_init (ie, gras_init)!!");
gras_socket_close(sock_iter);
}
if (!--(hd->refcount)) {
+ xbt_dynar_free(&hd->ports);
free(hd);
}
gras_procdata_exit();
gras_procdata_t *gras_procdata_get(void);
void *gras_libdata_by_name_from_procdata(const char *name,
gras_procdata_t * pd);
-void *gras_libdata_by_id_from_procdata(int id,gras_procdata_t *pd);
-
#endif /* GRAS_VIRTU_PRIVATE_H */
typedef struct {
int refcount;
- /* Nothing in particular (anymore) */
+ xbt_dynar_t ports;
+
} gras_hostdata_t;
/* data for each socket (FIXME: find a better location for that)*/
typedef struct {
- smx_rdv_t rdv;
+ smx_process_t from_process;
+ smx_process_t to_process;
- smx_process_t from_process; /* the one who created the socket */
smx_host_t to_host; /* Who's on other side */
- smx_comm_t comm; /* Ongoing communication */
-
- s_gras_msg_t ongoing_msg;
- size_t ongoing_msg_size;
- gras_socket_t to_socket; /* If != NULL, this socket was created as accept when receiving onto to_socket */
+ smx_cond_t cond;
+ smx_mutex_t mutex;
+ gras_socket_t to_socket;
} gras_trp_sg_sock_data_t;
/*****Communication Requests*****/
XBT_PUBLIC(void) SIMIX_communication_cancel(smx_comm_t comm);
XBT_PUBLIC(double) SIMIX_communication_get_remains(smx_comm_t comm);
-XBT_PUBLIC(void *) SIMIX_communication_get_sentdata(smx_comm_t comm);
+XBT_PUBLIC(void *) SIMIX_communication_get_data(smx_comm_t comm);
/*****Networking*****/
XBT_PUBLIC(void) SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate,
if(!comm)
return NULL;
- return (m_task_t)SIMIX_communication_get_sentdata(comm);
+ return (m_task_t)SIMIX_communication_get_data(comm);
}
int
XBT_LOG_EXTERNAL_CATEGORY(simix_process);
XBT_LOG_EXTERNAL_CATEGORY(simix_synchro);
XBT_LOG_EXTERNAL_CATEGORY(simix_context);
-XBT_LOG_EXTERNAL_CATEGORY(simix_network);
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_kernel, simix,
"Logging specific to SIMIX (kernel)");
XBT_LOG_CONNECT(simix_process, simix);
XBT_LOG_CONNECT(simix_synchro, simix);
XBT_LOG_CONNECT(simix_context, simix);
- XBT_LOG_CONNECT(simix_network, simix);
simix_global = xbt_new0(s_SIMIX_Global_t, 1);
if (smx_action) {
/* Copy the transfered data of the completed communication actions */
/* FIXME: find a better way to determine if its a comm action */
- if(smx_action->data != NULL) {
- CDEBUG1(simix_network,"Communication %p finished",smx_action->data);
+ if(smx_action->data != NULL)
SIMIX_network_copy_data((smx_comm_t)smx_action->data);
- }
SIMIX_action_signal_all(smx_action);
}
}
}
/* no relevant request found. Return NULL */
- DEBUG0("Communication request not found. I assume that other side will arrive later on.");
+ DEBUG0("Communication request not found");
return NULL;
}
*/
void SIMIX_network_copy_data(smx_comm_t comm)
{
+ /* If there is no data to be copy then return */
+ if(!comm->src_buff || !comm->dst_buff)
+ return;
+
size_t src_buff_size = comm->src_buff_size;
size_t dst_buff_size = *comm->dst_buff_size;
+
+ /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
+ dst_buff_size = MIN(dst_buff_size, src_buff_size);
+
+ /* Update the receiver's buffer size to the copied amount */
+ if (comm->dst_buff_size)
+ *comm->dst_buff_size = dst_buff_size;
- xbt_assert(src_buff_size == dst_buff_size);
-
- /* If there is no data to copy then return */
- if(!comm->src_buff || !comm->dst_buff || dst_buff_size == 0)
+ if(dst_buff_size == 0)
return;
memcpy(comm->dst_buff, comm->src_buff, dst_buff_size);
- DEBUG6("Copying comm %p data from %s -> %s (%zu bytes %p->%p)",
+ DEBUG4("Copying comm %p data from %s -> %s (%zu bytes)",
comm, comm->src_proc->smx_host->name, comm->dst_proc->smx_host->name,
- dst_buff_size,comm->src_buff,comm->dst_buff);
+ dst_buff_size);
}
/**
* \brief Return the user data associated to the communication
- *
- * In MSG and GRAS, that data is the exchanged task/msg itself, since
- * (i) In MSG, the receiver still wants to read the task although the communication didn't complete.
- * (ii) In GRAS, we need to retrieve that gras_msg_t during the select
* \param comm The communication
* \return the user data
*/
-void *SIMIX_communication_get_sentdata(smx_comm_t comm)
+void *SIMIX_communication_get_data(smx_comm_t comm)
{
return comm->data;
}
comm->dst_buff = dst_buff;
comm->dst_buff_size = dst_buff_size;
- DEBUG2("Receive data for %p into %p",comm,comm->dst_buff);
SIMIX_communication_start(comm);
return comm;
}