#include "gras/Transport/transport_interface.h" /* gras_select */
typedef struct s_gras_msg_listener_ {
- xbt_queue_t incomming_messages;
+ xbt_queue_t incomming_messages; /* messages received from the wire and still to be used by master */
xbt_queue_t socks_to_close; /* let the listener close the sockets, since it may be selecting on them. Darwin don't like this trick */
gras_socket_t wakeup_sock_listener_side;
gras_socket_t wakeup_sock_master_side;
static void listener_function(void *p)
{
gras_msg_listener_t me = (gras_msg_listener_t) p;
- s_gras_msg_t msg;
+ gras_msg_t msg;
xbt_ex_t e;
gras_msgtype_t msg_wakeup_listener_t =
gras_msgtype_by_name("_wakeup_listener");
DEBUG0("I'm the listener");
while (1) {
- DEBUG0("Selecting");
- msg.expe = gras_trp_select(-1);
- DEBUG0("Select returned something");
- gras_msg_recv(msg.expe, &msg);
- if (msg.type != msg_wakeup_listener_t)
- xbt_queue_push(me->incomming_messages, &msg);
+ msg = gras_msg_recv_any();
+ if (msg->type != msg_wakeup_listener_t)
+ xbt_queue_push(me->incomming_messages, msg);
else {
- char got = *(char *) msg.payl;
+ char got = *(char *) msg->payl;
if (got == '1') {
VERB0("Asked to get awake");
- free(msg.payl);
+ free(msg->payl);
+ free(msg);
} else {
VERB0("Asked to die");
- // gras_socket_close(me->wakeup_sock_listener_side);
- free(msg.payl);
+ //gras_socket_close(me->wakeup_sock_listener_side);
+ free(msg->payl);
+ free(msg);
return;
}
}
/* functions to extract msg from socket or put it on wire (depend RL vs SG) */
+gras_msg_t gras_msg_recv_any(void); /* Get first message arriving */
void gras_msg_recv(gras_socket_t sock, gras_msg_t msg /*OUT*/);
void gras_msg_send_ext(gras_socket_t sock,
e_gras_msg_kind_t kind,
XBT_LOG_EXTERNAL_CATEGORY(gras_msg);
XBT_LOG_DEFAULT_CATEGORY(gras_msg);
+void gras_msg_recv(gras_socket_t sock, gras_msg_t msg);
+
+gras_msg_t gras_msg_recv_any(void) {
+ gras_msg_t msg = xbt_new0(s_gras_msg_t,1);
+ msg->expe = gras_trp_select(-1);
+ DEBUG0("Select returned something");
+ gras_msg_recv(msg->expe, msg);
+ return msg;
+}
+
void gras_msg_send_ext(gras_socket_t sock,
e_gras_msg_kind_t kind,
unsigned long int ID,
#include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
#include "gras/Transport/transport_private.h" /* sock->data */
-XBT_LOG_EXTERNAL_CATEGORY(gras_msg);
-XBT_LOG_DEFAULT_CATEGORY(gras_msg);
+XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_msg);
typedef void *gras_trp_bufdata_;
+gras_msg_t gras_msg_recv_any(void) {
+ gras_trp_procdata_t trp_proc =
+ (gras_trp_procdata_t) gras_libdata_by_name("gras_trp");
+ gras_msg_t msg;
+ /* Build a dynar of all communications I could get something from */
+ xbt_dynar_t comms = xbt_dynar_new(sizeof(smx_comm_t),NULL);
+ unsigned int cursor;
+ gras_socket_t sock;
+ gras_trp_sg_sock_data_t *sock_data;
+ xbt_dynar_foreach(trp_proc->sockets,cursor,sock) {
+ sock_data = (gras_trp_sg_sock_data_t *) sock->data;
+ if (sock_data->comm_recv) {
+ INFO2("Copy %p of size %d",sock_data->comm_recv,sizeof(smx_comm_t));
+ xbt_dynar_push(comms,&(sock_data->comm_recv));
+ }
+ }
+ VERB1("Wait on %ld 'sockets'",xbt_dynar_length(comms));
+ /* Wait for the end of any of these communications */
+ int got = SIMIX_network_waitany(comms);
+ smx_comm_t comm;
+
+ /* retrieve the message sent in that communication */
+ xbt_dynar_get_cpy(comms,got,&(comm));
+ msg=SIMIX_communication_get_data(comm);
+ VERB1("Got something. Communication %p's over",comm);
+
+ /* Reinstall a waiting communication on that rdv */
+ /* Get the sock again */
+ xbt_dynar_foreach(trp_proc->sockets,cursor,sock) {
+ sock_data = (gras_trp_sg_sock_data_t *) sock->data;
+ if (sock_data->comm_recv && sock_data->comm_recv == comm)
+ break;
+ }
+ sock_data = (gras_trp_sg_sock_data_t *) sock->data;
+ sock_data->comm_recv = SIMIX_network_irecv(
+ sock_data->im_server?sock_data->rdv_server:sock_data->rdv_client,
+ NULL,0);
+ SIMIX_communication_destroy(comm);
+
+ return msg;
+}
+
+
void gras_msg_send_ext(gras_socket_t sock,
e_gras_msg_kind_t kind,
unsigned long int ID,
gras_msgtype_t msgtype, void *payload)
{
-
- smx_action_t act; /* simix action */
- gras_trp_sg_sock_data_t *sock_data;
- gras_hostdata_t *hd;
- gras_trp_procdata_t trp_remote_proc;
- gras_msg_procdata_t msg_remote_proc;
- gras_msg_t msg; /* message to send */
int whole_payload_size = 0; /* msg->payload_size is used to memcpy the payload.
This is used to report the load onto the simulator. It also counts the size of pointed stuff */
-
- sock_data = (gras_trp_sg_sock_data_t *) sock->data;
-
- hd = (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
-
- xbt_assert1(!gras_socket_is_meas(sock),
- "Asked to send a message on the measurement socket %p", sock);
+ gras_msg_t msg; /* message to send */
/*initialize gras message */
msg = xbt_new(s_gras_msg_t, 1);
msg->type = msgtype;
msg->ID = ID;
if (kind == e_gras_msg_kind_rpcerror) {
- /* error on remote host, carfull, payload is an exception */
+ /* error on remote host, careful, payload is an exception */
msg->payl_size = gras_datadesc_size(gras_datadesc_by_name("ex_t"));
msg->payl = xbt_malloc(msg->payl_size);
whole_payload_size = gras_datadesc_memcpy(gras_datadesc_by_name("ex_t"),
whole_payload_size = gras_datadesc_memcpy(msgtype->ctn_type,
payload, msg->payl);
}
+ gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *) sock->data;
+ smx_comm_t comm;
+ SIMIX_network_send(sock_data->im_server ? sock_data->rdv_client : sock_data->rdv_client,
+ whole_payload_size,-1,-1,&msg,sizeof(void*),&comm,msg);
+
+#ifdef KILLME
+ smx_action_t act; /* simix action */
+ gras_hostdata_t *hd;
+ gras_trp_procdata_t trp_remote_proc;
+ gras_msg_procdata_t msg_remote_proc;
+
+ sock_data = (gras_trp_sg_sock_data_t *) sock->data;
+
+ hd = (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
+
+ xbt_assert1(!gras_socket_is_meas(sock),
+ "Asked to send a message on the measurement socket %p", sock);
+
/* put the selectable socket on the queue */
trp_remote_proc = (gras_trp_procdata_t)
/* cleanup structures */
SIMIX_action_destroy(act);
SIMIX_mutex_unlock(sock_data->mutex);
-
+#endif
VERB0("Message sent");
}
+#ifdef KILLMETOO
/*
* receive the next message on the given socket.
*/
void gras_msg_recv(gras_socket_t sock, gras_msg_t msg)
{
- gras_trp_sg_sock_data_t *sock_data;
- gras_trp_sg_sock_data_t *remote_sock_data;
- gras_hostdata_t *remote_hd;
+ gras_trp_sg_sock_data_t *sock_data =
+ (gras_trp_sg_sock_data_t *) sock->data;
gras_msg_t msg_got;
- gras_msg_procdata_t msg_procdata =
- (gras_msg_procdata_t) gras_libdata_by_name("gras_msg");
+ size_t size_got = sizeof(void*);
xbt_assert1(!gras_socket_is_meas(sock),
"Asked to receive a message on the measurement socket %p",
sock);
+ SIMIX_network_recv(sock_data->rdv,-1,&msg_got,&size_got,NULL);
+#ifdef KILLME
+ gras_trp_sg_sock_data_t *remote_sock_data;
+ gras_hostdata_t *remote_hd;
+ gras_msg_procdata_t msg_procdata =
+ (gras_msg_procdata_t) gras_libdata_by_name("gras_msg");
+
xbt_assert0(msg, "msg is an out parameter of gras_msg_recv...");
sock_data = (gras_trp_sg_sock_data_t *) sock->data;
memcpy(msg, msg_got, sizeof(s_gras_msg_t));
xbt_free(msg_got);
SIMIX_mutex_unlock(remote_sock_data->mutex);
-
+#endif
VERB3("Received a message type '%s' kind '%s' ID %lu", // from %s",
msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID);
}
+#endif
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_trp);
-/* check transport_private.h for an explanation of this variable; this just need to be defined to NULL in SG */
+/* check transport_private.h for an explanation of this variable;
+ * this just need to be defined to NULL in SG */
gras_socket_t _gras_lastly_selected_socket = NULL;
+#ifdef KILLME
/**
* gras_trp_select:
*
return res;
}
-
+#endif
/* dummy implementations of the functions used in RL mode */
data->to_host = peer;
/* initialize mutex and condition of the socket */
- data->mutex = SIMIX_mutex_init();
- data->cond = SIMIX_cond_init();
- data->to_socket = pr.socket;
+ data->rdv_server = pr.rdv;
+ data->rdv_client = SIMIX_rdv_create(NULL);
+ data->im_server = 0;
sock->data = data;
sock->incoming = 1;
pr.port = sock->port;
pr.meas = sock->meas;
- pr.socket = sock;
pr.process = SIMIX_process_self();
+ pr.rdv = SIMIX_rdv_create(NULL);
xbt_dynar_push(hd->ports, &pr);
/* Create the socket */
data->from_process = SIMIX_process_self();
data->to_process = NULL;
data->to_host = SIMIX_host_self();
-
- data->cond = SIMIX_cond_init();
- data->mutex = SIMIX_mutex_init();
+ data->rdv_server = pr.rdv;
+ data->rdv_client = NULL;
+ data->im_server = 0;
+ data->comm_recv = SIMIX_network_irecv(pr.rdv,NULL,0);
+ INFO1("Comm %p",data->comm_recv);
sock->data = data;
xbt_assert0(hd, "Please run gras_process_init on each process");
if (sock->data) {
- SIMIX_cond_destroy(((gras_trp_sg_sock_data_t *) sock->data)->cond);
- SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t *) sock->data)->mutex);
+ /* FIXME: kill the rdv point if receiver side */
free(sock->data);
}
void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
const char *data, unsigned long int size)
{
+#ifdef KILLME
char name[256];
static unsigned int count = 0;
smx_action_t act; /* simix action */
- gras_trp_sg_sock_data_t *sock_data;
gras_trp_procdata_t trp_remote_proc;
gras_msg_procdata_t msg_remote_proc;
gras_msg_t msg; /* message to send */
+#endif
- sock_data = (gras_trp_sg_sock_data_t *) sock->data;
-
+ gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *) sock->data;
xbt_assert0(sock->meas,
"SG chunk exchange shouldn't be used on non-measurement sockets");
- SIMIX_mutex_lock(sock_data->mutex);
- sprintf(name, "Chunk[%d]", count++);
- /*initialize gras message */
- msg = xbt_new(s_gras_msg_t, 1);
- msg->expe = sock;
- msg->payl_size = size;
-
- if (data) {
- msg->payl = (void *) xbt_malloc(size);
- memcpy(msg->payl, data, size);
- } else {
- msg->payl = NULL;
- }
-
-
- /* put his socket on the selectable socket queue */
- trp_remote_proc = (gras_trp_procdata_t)
- gras_libdata_by_name_from_remote("gras_trp", sock_data->to_process);
- xbt_queue_push(trp_remote_proc->meas_selectable_sockets, &sock);
-
- /* put message on msg_queue */
- msg_remote_proc = (gras_msg_procdata_t)
- gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
-
- xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas, msg);
-
- /* wait for the receiver */
- SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
/* creates simix action and waits its ends, waits in the sender host
condition */
- DEBUG5("send chunk %s from %s to %s:%d (size=%ld)",
- name, SIMIX_host_get_name(SIMIX_host_self()),
+ DEBUG4("send chunk from %s to %s:%d (size=%ld)",
+ SIMIX_host_get_name(SIMIX_host_self()),
SIMIX_host_get_name(sock_data->to_host), sock->peer_port, size);
-
- act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host,
- name, size, -1);
- SIMIX_register_action_to_condition(act, sock_data->cond);
- SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
- SIMIX_unregister_action_to_condition(act, sock_data->cond);
- /* error treatmeant (FIXME) */
-
- /* cleanup structures */
- SIMIX_action_destroy(act);
-
- SIMIX_mutex_unlock(sock_data->mutex);
+ //SIMIX_network_send(sock_data->rdv,size,1,-1,NULL,0,NULL,NULL);
+ THROW_UNIMPLEMENTED;
}
int gras_trp_sg_chunk_recv(gras_socket_t sock,
char *data, unsigned long int size)
{
- gras_trp_sg_sock_data_t *sock_data;
+ //gras_trp_sg_sock_data_t *sock_data =
+ // (gras_trp_sg_sock_data_t *) sock->data;
+
+ //SIMIX_network_recv(sock_data->rdv,-1,NULL,0,NULL);
+ THROW_UNIMPLEMENTED;
+#ifdef KILLME
gras_trp_sg_sock_data_t *remote_sock_data;
gras_socket_t remote_socket = NULL;
gras_msg_t msg_got;
xbt_free(msg_got);
SIMIX_mutex_unlock(remote_sock_data->mutex);
+#endif
return 0;
}
typedef struct {
int port; /* list of ports used by a server socket */
int meas; /* (boolean) the channel is for measurements or for messages */
- smx_process_t process;
- gras_socket_t socket;
+ smx_process_t process; /* process listening */
+ smx_rdv_t rdv; /* rendez-vous point to the listener */
+// gras_socket_t socket; FIXME KILLME
} gras_sg_portrec_t;
/* Data for each host */
smx_host_t to_host; /* Who's on other side */
- smx_cond_t cond;
- smx_mutex_t mutex;
- gras_socket_t to_socket;
+ smx_rdv_t rdv_server; /* The rendez-vous point to use */
+ smx_rdv_t rdv_client; /* The rendez-vous point to use */
+ int im_server:1;
+ smx_comm_t comm_recv; /* The comm of irecv on receiving sockets */
} gras_trp_sg_sock_data_t;