From 3473014423f69b4ec6ecaddca3f3c9e503a7adbf Mon Sep 17 00:00:00 2001 From: mquinson Date: Thu, 12 Jul 2007 10:36:49 +0000 Subject: [PATCH] various little cleanups in the gras/sg code. Mainly reindentation and more informative field names for the structures git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3743 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/gras/Msg/sg_msg.c | 119 +++++++------- src/gras/Transport/sg_transport.c | 171 +++++++++---------- src/gras/Transport/transport_interface.h | 31 ++-- src/gras/Transport/transport_plugin_sg.c | 200 ++++++++++++----------- src/gras/Transport/transport_private.h | 37 +++-- src/gras/Virtu/sg_process.c | 75 +++++---- src/gras/Virtu/virtu_sg.h | 24 ++- 7 files changed, 355 insertions(+), 302 deletions(-) diff --git a/src/gras/Msg/sg_msg.c b/src/gras/Msg/sg_msg.c index afd8d342e3..2a9c84a7b6 100644 --- a/src/gras/Msg/sg_msg.c +++ b/src/gras/Msg/sg_msg.c @@ -73,36 +73,42 @@ void gras_msg_send_ext(gras_socket_t sock, whole_payload_size = gras_datadesc_copy(msgtype->ctn_type, payload, msg->payl); } - /* put message on msg_queue */ - msg_remote_proc = (gras_msg_procdata_t)gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process); - xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg); - - /* wake-up the receiver */ - trp_remote_proc = (gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process); - xbt_fifo_push(trp_remote_proc->active_socket,sock); - - SIMIX_cond_signal(trp_remote_proc->cond); - - /* wait for the receiver */ - SIMIX_cond_wait(sock_data->cond, sock_data->mutex); - - /* creates simix action and waits its ends, waits in the sender host condition*/ - act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host,msgtype->name, (double)whole_payload_size, -1); - SIMIX_register_action_to_condition(act,sock_data->cond); - SIMIX_register_condition_to_action(act,sock_data->cond); - + /* put message on msg_queue */ + msg_remote_proc = (gras_msg_procdata_t) + gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process); + xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg); + + /* wake-up the receiver */ + trp_remote_proc = (gras_trp_procdata_t) + gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process); + + xbt_fifo_push(trp_remote_proc->msg_selectable_sockets,sock); + SIMIX_cond_signal(trp_remote_proc->msg_select_cond); + + /* wait for the receiver */ + SIMIX_cond_wait(sock_data->cond, sock_data->mutex); + + /* creates simix action and waits its ends, waits in the sender host + condition*/ + act = SIMIX_action_communicate(SIMIX_host_self(), + sock_data->to_host,msgtype->name, + (double)whole_payload_size, -1); + SIMIX_register_action_to_condition(act,sock_data->cond); + SIMIX_register_condition_to_action(act,sock_data->cond); + VERB5("Sending to %s(%s) a message type '%s' kind '%s' ID %lu", - SIMIX_host_get_name(sock_data->to_host),SIMIX_process_get_name(sock_data->to_process), - msg->type->name,e_gras_msg_kind_names[msg->kind], msg->ID); + SIMIX_host_get_name(sock_data->to_host), + SIMIX_process_get_name(sock_data->to_process), + msg->type->name,e_gras_msg_kind_names[msg->kind], msg->ID); - SIMIX_cond_wait(sock_data->cond, sock_data->mutex); - /* error treatmeant */ - - /* cleanup structures */ - SIMIX_action_destroy(act); - SIMIX_mutex_unlock(sock_data->mutex); + SIMIX_cond_wait(sock_data->cond, sock_data->mutex); + /* error treatmeant (FIXME)*/ - VERB0("Message sent"); + /* cleanup structures */ + SIMIX_action_destroy(act); + SIMIX_mutex_unlock(sock_data->mutex); + + VERB0("Message sent"); } /* @@ -111,42 +117,45 @@ void gras_msg_send_ext(gras_socket_t sock, void gras_msg_recv(gras_socket_t sock, gras_msg_t msg) { - - gras_trp_sg_sock_data_t *sock_data; - gras_trp_sg_sock_data_t *remote_sock_data; - gras_hostdata_t *remote_hd; + + gras_trp_sg_sock_data_t *sock_data; + gras_trp_sg_sock_data_t *remote_sock_data; + gras_hostdata_t *remote_hd; gras_msg_t msg_got; - gras_msg_procdata_t msg_procdata = (gras_msg_procdata_t)gras_libdata_by_name("gras_msg"); + gras_msg_procdata_t msg_procdata = + (gras_msg_procdata_t)gras_libdata_by_name("gras_msg"); xbt_assert1(!gras_socket_is_meas(sock), "Asked to receive a message on the measurement socket %p", sock); xbt_assert0(msg,"msg is an out parameter of gras_msg_recv..."); - - sock_data = (gras_trp_sg_sock_data_t *)sock->data; - remote_sock_data = ((gras_trp_sg_sock_data_t *)sock->data)->to_socket->data; - DEBUG3("Remote host %s, Remote Port: %d Local port %d", SIMIX_host_get_name(sock_data->to_host), sock->peer_port, sock->port); - remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host); - - if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) { - THROW_IMPOSSIBLE; - } - DEBUG1("Size msg_to_receive buffer: %d", xbt_fifo_size(msg_procdata->msg_to_receive_queue)); + + sock_data = (gras_trp_sg_sock_data_t *)sock->data; + remote_sock_data = ((gras_trp_sg_sock_data_t *)sock->data)->to_socket->data; + DEBUG3("Remote host %s, Remote Port: %d Local port %d", + SIMIX_host_get_name(sock_data->to_host), sock->peer_port, sock->port); + remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host); + + if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) { + THROW_IMPOSSIBLE; + } + DEBUG1("Size msg_to_receive buffer: %d", + xbt_fifo_size(msg_procdata->msg_to_receive_queue)); msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue); - - SIMIX_mutex_lock(remote_sock_data->mutex); -/* ok, I'm here, you can continuate the communication */ - SIMIX_cond_signal(remote_sock_data->cond); - -/* wait for communication end */ - SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex); - - msg_got->expe= msg->expe; + + SIMIX_mutex_lock(remote_sock_data->mutex); + /* ok, I'm here, you can continuate the communication */ + SIMIX_cond_signal(remote_sock_data->cond); + + /* wait for communication end */ + SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex); + + msg_got->expe= msg->expe; memcpy(msg,msg_got,sizeof(s_gras_msg_t)); - xbt_free(msg_got); - SIMIX_mutex_unlock(remote_sock_data->mutex); - - VERB3("Received a message type '%s' kind '%s' ID %lu",// from %s", + xbt_free(msg_got); + SIMIX_mutex_unlock(remote_sock_data->mutex); + + VERB3("Received a message type '%s' kind '%s' ID %lu",// from %s", msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID); diff --git a/src/gras/Transport/sg_transport.c b/src/gras/Transport/sg_transport.c index a767070c6d..729920570b 100644 --- a/src/gras/Transport/sg_transport.c +++ b/src/gras/Transport/sg_transport.c @@ -28,94 +28,101 @@ gras_socket_t _gras_lastly_selected_socket = NULL; * if timeout>0 and no message there, wait at most that amount of time before giving up. */ gras_socket_t gras_trp_select(double timeout) { - gras_socket_t res; - gras_trp_procdata_t pd = (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id); - gras_trp_sg_sock_data_t *sockdata; - gras_trp_plugin_t trp; - gras_socket_t active_socket; - gras_socket_t sock_iter; /* iterating over all sockets */ - int cursor; - - DEBUG0("Trying to get the lock pd, trp_select"); - SIMIX_mutex_lock(pd->mutex); - DEBUG3("select on %s@%s with timeout=%f", - SIMIX_process_get_name(SIMIX_process_self()), - SIMIX_host_get_name(SIMIX_host_self()), - timeout); - - if (xbt_fifo_size(pd->active_socket) == 0) { - /* message didn't arrive yet, wait */ - SIMIX_cond_wait_timeout(pd->cond,pd->mutex,timeout); - } - - if (xbt_fifo_size(pd->active_socket) == 0) { - DEBUG0("TIMEOUT"); - SIMIX_mutex_unlock(pd->mutex); - THROW0(timeout_error,0,"Timeout"); - } - active_socket = xbt_fifo_shift(pd->active_socket); - - /* Ok, got something. Open a socket back to the expeditor */ - - /* Try to reuse an already openned socket to that expeditor */ - DEBUG1("Open sockets size %lu",xbt_dynar_length(pd->sockets)); - xbt_dynar_foreach(pd->sockets,cursor,sock_iter) { - DEBUG1("Consider %p as outgoing socket to expeditor",sock_iter); - - if (sock_iter->meas || !sock_iter->outgoing) - continue; - /* - if ((sock_iter->peer_port == active_socket->port) && - (((gras_trp_sg_sock_data_t*)sock_iter->data)->to_host == SIMIX_process_get_host(((gras_trp_sg_sock_data_t*)active_socket->data)->from_process))) { - */ - if ( (((gras_trp_sg_sock_data_t*)sock_iter->data)->to_socket == active_socket) && (((gras_trp_sg_sock_data_t*)sock_iter->data)->to_host == SIMIX_process_get_host(((gras_trp_sg_sock_data_t*)active_socket->data)->from_process)) ) { - SIMIX_mutex_unlock(pd->mutex); - return sock_iter; - } - } - - /* Socket to expeditor not created yet */ - DEBUG0("Create a socket to the expeditor"); - - trp = gras_trp_plugin_get_by_name("sg"); - - gras_trp_socket_new(1,&res); - res->plugin = trp; - - res->incoming = 1; - res->outgoing = 1; - res->accepting = 0; - res->sd = -1; - - res->port = -1; - - /* initialize the ports */ - //res->peer_port = active_socket->port; - res->port = active_socket->peer_port; - - /* create sockdata */ - sockdata = xbt_new(gras_trp_sg_sock_data_t,1); - sockdata->from_process = SIMIX_process_self(); - sockdata->to_process = ((gras_trp_sg_sock_data_t*)(active_socket->data))->from_process; + gras_socket_t res; + gras_trp_procdata_t pd = + (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id); + gras_trp_sg_sock_data_t *sockdata; + gras_trp_plugin_t trp; + gras_socket_t active_socket; + gras_trp_sg_sock_data_t *active_socket_data; + gras_socket_t sock_iter; /* iterating over all sockets */ + int cursor; + + DEBUG0("Trying to get the lock pd, trp_select"); + SIMIX_mutex_lock(pd->msg_select_mutex); + DEBUG3("select on %s@%s with timeout=%f", + SIMIX_process_get_name(SIMIX_process_self()), + SIMIX_host_get_name(SIMIX_host_self()), + timeout); + + if (xbt_fifo_size(pd->msg_selectable_sockets) == 0) { + /* message didn't arrive yet, wait */ + SIMIX_cond_wait_timeout(pd->msg_select_cond,pd->msg_select_mutex,timeout); + } + + if (xbt_fifo_size(pd->msg_selectable_sockets) == 0) { + DEBUG0("TIMEOUT"); + SIMIX_mutex_unlock(pd->msg_select_mutex); + THROW0(timeout_error,0,"Timeout"); + } + active_socket = xbt_fifo_shift(pd->msg_selectable_sockets); + active_socket_data = (gras_trp_sg_sock_data_t*)active_socket->data; + + /* Ok, got something. Open a socket back to the expeditor */ + + /* Try to reuse an already openned socket to that expeditor */ + DEBUG1("Open sockets size %lu",xbt_dynar_length(pd->sockets)); + xbt_dynar_foreach(pd->sockets,cursor,sock_iter) { + gras_trp_sg_sock_data_t *sock_data; + DEBUG1("Consider %p as outgoing socket to expeditor",sock_iter); + + if (sock_iter->meas || !sock_iter->outgoing) + continue; + sock_data = ((gras_trp_sg_sock_data_t*)sock_iter->data); + + if ( (sock_data->to_socket == active_socket) && + (sock_data->to_host == SIMIX_process_get_host(active_socket_data->from_process)) ) { + SIMIX_mutex_unlock(pd->msg_select_mutex); + return sock_iter; + } + } + + /* Socket to expeditor not created yet */ + DEBUG0("Create a socket to the expeditor"); + + trp = gras_trp_plugin_get_by_name("sg"); + + gras_trp_socket_new(1,&res); + res->plugin = trp; + + res->incoming = 1; + res->outgoing = 1; + res->accepting = 0; + res->sd = -1; + + res->port = -1; + + /* initialize the ports */ + //res->peer_port = active_socket->port; + res->port = active_socket->peer_port; + + /* create sockdata */ + sockdata = xbt_new(gras_trp_sg_sock_data_t,1); + sockdata->from_process = SIMIX_process_self(); + sockdata->to_process = active_socket_data->from_process; - res->peer_port = ((gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sockdata->to_process))->myport; - sockdata->to_socket = active_socket; - /*update the peer to_socket variable */ - ((gras_trp_sg_sock_data_t*)active_socket->data)->to_socket = res; - sockdata->cond = SIMIX_cond_init(); - sockdata->mutex = SIMIX_mutex_init(); + res->peer_port = + ((gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sockdata->to_process))->myport; + sockdata->to_socket = active_socket; + /*update the peer to_socket variable */ + active_socket_data->to_socket = res; + sockdata->cond = SIMIX_cond_init(); + sockdata->mutex = SIMIX_mutex_init(); - sockdata->to_host = SIMIX_process_get_host(((gras_trp_sg_sock_data_t*)(active_socket->data))->from_process); + sockdata->to_host = SIMIX_process_get_host(active_socket_data->from_process); - res->data = sockdata; - res->peer_name = strdup(SIMIX_host_get_name(sockdata->to_host)); + res->data = sockdata; + res->peer_name = strdup(SIMIX_host_get_name(sockdata->to_host)); - gras_trp_buf_init_sock(res); + gras_trp_buf_init_sock(res); - DEBUG4("Create socket to process:%s(Port %d) from process: %s(Port %d)",SIMIX_process_get_name(sockdata->from_process),res->peer_port, SIMIX_process_get_name(sockdata->to_process), res->port); + DEBUG4("Create socket to process:%s(Port %d) from process: %s(Port %d)", + SIMIX_process_get_name(sockdata->from_process), + res->peer_port, + SIMIX_process_get_name(sockdata->to_process), res->port); - SIMIX_mutex_unlock(pd->mutex); - return res; + SIMIX_mutex_unlock(pd->msg_select_mutex); + return res; } diff --git a/src/gras/Transport/transport_interface.h b/src/gras/Transport/transport_interface.h index 379e5bca0a..63547d229a 100644 --- a/src/gras/Transport/transport_interface.h +++ b/src/gras/Transport/transport_interface.h @@ -24,7 +24,8 @@ extern int gras_opt_trp_nomoredata_on_close; *** Main user functions ***/ /* stable if we know the storage will keep as is until the next trp_flush */ -XBT_PUBLIC(void) gras_trp_send(gras_socket_t sd, char *data, long int size, int stable); +XBT_PUBLIC(void) gras_trp_send(gras_socket_t sd, char *data, long int size, + int stable); XBT_PUBLIC(void) gras_trp_recv(gras_socket_t sd, char *data, long int size); XBT_PUBLIC(void) gras_trp_flush(gras_socket_t sd); @@ -81,8 +82,10 @@ struct gras_trp_plugin_ { void *data; /* plugin-specific data */ - /* exit is responsible for freeing data and telling the OS this plugin goes */ - /* exit=NULL, data gets freed. (ie exit function needed only when data contains pointers) */ + /* exit is responsible for freeing data and telling to the OS that + this plugin is gone */ + /* exit=NULL, data gets brutally free()d by the generic interface. + (ie exit function needed only when data contains pointers) */ void (*exit)(gras_trp_plugin_t); }; @@ -98,17 +101,23 @@ typedef struct { char *name; unsigned int name_len; - xbt_dynar_t sockets; /* all sockets known to this process */ int myport; /* Port on which I listen myself */ - fd_set *fdset; + + xbt_dynar_t sockets; /* all sockets known to this process */ + fd_set *fdset; /* idem, in another formalism */ /* SG only elements. In RL, they are part of the OS ;) */ - smx_cond_t cond; - smx_mutex_t mutex; - smx_cond_t cond_meas; - smx_mutex_t mutex_meas; - xbt_fifo_t active_socket; - xbt_fifo_t active_socket_meas; + + /* List of sockets ready to be select()ed */ + xbt_fifo_t msg_selectable_sockets; /* regular sockets */ + xbt_fifo_t meas_selectable_sockets;/* measurement ones */ + + /* Synchronisation on msg_selectable_sockets */ + smx_cond_t msg_select_cond; + smx_mutex_t msg_select_mutex; + /* Synchronisation on meas_selectable_sockets */ + smx_cond_t meas_select_cond; + smx_mutex_t meas_select_mutex; } s_gras_trp_procdata_t,*gras_trp_procdata_t; diff --git a/src/gras/Transport/transport_plugin_sg.c b/src/gras/Transport/transport_plugin_sg.c index ee6de67289..b46e05e00d 100644 --- a/src/gras/Transport/transport_plugin_sg.c +++ b/src/gras/Transport/transport_plugin_sg.c @@ -102,7 +102,8 @@ void gras_trp_sg_socket_client(gras_trp_plugin_t self, /* make sure this socket will reach someone */ if (!(peer=SIMIX_host_get_by_name(sock->peer_name))) - THROW1(mismatch_error,0,"Can't connect to %s: no such host.\n",sock->peer_name); + THROW1(mismatch_error,0, + "Can't connect to %s: no such host.\n",sock->peer_name); if (!(hd=(gras_hostdata_t *)SIMIX_host_get_data(peer))) THROW1(mismatch_error,0, @@ -124,30 +125,30 @@ void gras_trp_sg_socket_client(gras_trp_plugin_t self, if (pr.meas && !sock->meas) { THROW2(mismatch_error,0, "can't connect to %s:%d in regular mode, the process listen " - "in meas mode on this port",sock->peer_name,sock->peer_port); + "in measurement mode on this port",sock->peer_name,sock->peer_port); } if (!pr.meas && sock->meas) { THROW2(mismatch_error,0, - "can't connect to %s:%d in meas mode, the process listen " + "can't connect to %s:%d in measurement mode, the process listen " "in regular mode on this port",sock->peer_name,sock->peer_port); } /* create the socket */ data = xbt_new(gras_trp_sg_sock_data_t,1); data->from_process = SIMIX_process_self(); - data->to_process = pr.process; + data->to_process = pr.process; data->to_host = peer; - /* initialize mutex and condition of the socket */ - data->mutex = SIMIX_mutex_init(); - data->cond = SIMIX_cond_init(); - data->to_socket = pr.socket; + /* initialize mutex and condition of the socket */ + data->mutex = SIMIX_mutex_init(); + data->cond = SIMIX_cond_init(); + data->to_socket = pr.socket; sock->data = data; sock->incoming = 1; DEBUG5("%s (PID %d) connects in %s mode to %s:%d", - SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(), - sock->meas?"meas":"regular", + SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(), + sock->meas?"meas":"regular", sock->peer_name,sock->peer_port); } @@ -182,26 +183,26 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self, "can't listen on address %s:%d: port already in use.", host,sock->port); - pr.port = sock->port; - pr.meas = sock->meas; - pr.socket = sock; - pr.process = SIMIX_process_self(); + pr.port = sock->port; + pr.meas = sock->meas; + pr.socket = sock; + pr.process = SIMIX_process_self(); xbt_dynar_push(hd->ports,&pr); /* Create the socket */ data = xbt_new(gras_trp_sg_sock_data_t,1); data->from_process = SIMIX_process_self(); data->to_process = NULL; - data->to_host = SIMIX_host_self(); + data->to_host = SIMIX_host_self(); - data->cond = SIMIX_cond_init(); - data->mutex = SIMIX_mutex_init(); + data->cond = SIMIX_cond_init(); + data->mutex = SIMIX_mutex_init(); sock->data = data; VERB6("'%s' (%d) ears on %s:%d%s (%p)", - SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(), - host,sock->port,sock->meas? " (mode meas)":"",sock); + SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(), + host,sock->port,sock->meas? " (mode meas)":"",sock); } @@ -217,26 +218,25 @@ void gras_trp_sg_socket_close(gras_socket_t sock){ xbt_assert0(hd,"Please run gras_process_init on each process"); if (sock->data) { - SIMIX_cond_destroy(((gras_trp_sg_sock_data_t*)sock->data)->cond); - SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t*)sock->data)->mutex); - free(sock->data); - } + SIMIX_cond_destroy(((gras_trp_sg_sock_data_t*)sock->data)->cond); + SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t*)sock->data)->mutex); + free(sock->data); + } if (sock->incoming && !sock->outgoing && sock->port >= 0) { /* server mode socket. Unregister it from 'OS' tables */ xbt_dynar_foreach(hd->ports, cpt, pr) { DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports)); if (pr.port == sock->port) { - xbt_dynar_cursor_rm(hd->ports, &cpt); - XBT_OUT; - return; + xbt_dynar_cursor_rm(hd->ports, &cpt); + XBT_OUT; + return; } } WARN2("socket_close called on the unknown incoming socket %p (port=%d)", sock,sock->port); } - XBT_OUT; - + XBT_OUT; } typedef struct { @@ -257,22 +257,23 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock, char name[256]; static unsigned int count=0; - smx_action_t act; /* simix action */ - gras_trp_sg_sock_data_t *sock_data; - gras_trp_procdata_t trp_remote_proc; - gras_msg_procdata_t msg_remote_proc; - gras_msg_t msg; /* message to send */ + smx_action_t act; /* simix action */ + gras_trp_sg_sock_data_t *sock_data; + gras_trp_procdata_t trp_remote_proc; + gras_msg_procdata_t msg_remote_proc; + gras_msg_t msg; /* message to send */ - sock_data = (gras_trp_sg_sock_data_t *)sock->data; + sock_data = (gras_trp_sg_sock_data_t *)sock->data; - xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); + xbt_assert0(sock->meas, + "SG chunk exchange shouldn't be used on non-measurement sockets"); - SIMIX_mutex_lock(sock_data->mutex); + SIMIX_mutex_lock(sock_data->mutex); sprintf(name,"Chunk[%d]",count++); - /*initialize gras message */ - msg = xbt_new(s_gras_msg_t,1); - msg->expe = sock; - msg->payl_size=size; + /*initialize gras message */ + msg = xbt_new(s_gras_msg_t,1); + msg->expe = sock; + msg->payl_size=size; if (data) { msg->payl=(void*)xbt_malloc(size); @@ -280,72 +281,83 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock, } else { msg->payl = NULL; } + + /* put message on msg_queue */ + msg_remote_proc = (gras_msg_procdata_t) + gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process); - /* put message on msg_queue */ - msg_remote_proc = (gras_msg_procdata_t)gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process); - xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas,msg); - /* put his socket on the active_socket list */ - trp_remote_proc = (gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process); - xbt_fifo_push(trp_remote_proc->active_socket_meas,sock); - /* wake-up the receiver */ + xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas,msg); - SIMIX_cond_signal(trp_remote_proc->cond_meas); + /* put his socket on the selectable socket list */ + trp_remote_proc = (gras_trp_procdata_t) + gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process); - /* wait for the receiver */ - SIMIX_cond_wait(sock_data->cond,sock_data->mutex); + xbt_fifo_push(trp_remote_proc->meas_selectable_sockets,sock); - /* creates simix action and waits its ends, waits in the sender host condition*/ + /* wake-up the receiver */ + SIMIX_cond_signal(trp_remote_proc->meas_select_cond); + + /* wait for the receiver */ + SIMIX_cond_wait(sock_data->cond,sock_data->mutex); + + /* creates simix action and waits its ends, waits in the sender host + condition*/ DEBUG5("send chunk %s from %s to %s:%d (size=%ld)", name, SIMIX_host_get_name(SIMIX_host_self()), SIMIX_host_get_name(sock_data->to_host), sock->peer_port,size); - act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host, name, size, -1); - SIMIX_register_action_to_condition(act,sock_data->cond); - SIMIX_register_condition_to_action(act,sock_data->cond); + act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host, + name, size, -1); + SIMIX_register_action_to_condition(act,sock_data->cond); + SIMIX_register_condition_to_action(act,sock_data->cond); - SIMIX_cond_wait(sock_data->cond,sock_data->mutex); - /* error treatmeant */ + SIMIX_cond_wait(sock_data->cond,sock_data->mutex); + /* error treatmeant (FIXME)*/ - /* cleanup structures */ - SIMIX_action_destroy(act); + /* cleanup structures */ + SIMIX_action_destroy(act); - SIMIX_mutex_unlock(sock_data->mutex); + SIMIX_mutex_unlock(sock_data->mutex); } int gras_trp_sg_chunk_recv(gras_socket_t sock, - char *data, - unsigned long int size){ - gras_trp_sg_sock_data_t *sock_data; - gras_trp_sg_sock_data_t *remote_sock_data; - gras_socket_t remote_socket; + char *data, + unsigned long int size){ + gras_trp_sg_sock_data_t *sock_data; + gras_trp_sg_sock_data_t *remote_sock_data; + gras_socket_t remote_socket; gras_msg_t msg_got; - gras_msg_procdata_t msg_procdata = (gras_msg_procdata_t)gras_libdata_by_name("gras_msg"); - gras_trp_procdata_t trp_proc=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id); + gras_msg_procdata_t msg_procdata = + (gras_msg_procdata_t)gras_libdata_by_name("gras_msg"); + gras_trp_procdata_t trp_proc = + (gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id); - xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); + xbt_assert0(sock->meas, + "SG chunk exchange shouldn't be used on non-measurement sockets"); - SIMIX_mutex_lock(trp_proc->mutex_meas); - if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) { - SIMIX_cond_wait_timeout(trp_proc->cond_meas,trp_proc->mutex_meas,60); - } - if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) { - SIMIX_mutex_unlock(trp_proc->mutex_meas); - THROW0(timeout_error,0,"Timeout"); - } - SIMIX_mutex_unlock(trp_proc->mutex_meas); - - remote_socket = xbt_fifo_shift(trp_proc->active_socket_meas); - remote_sock_data = (gras_trp_sg_sock_data_t *)remote_socket->data; + SIMIX_mutex_lock(trp_proc->meas_select_mutex); + if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) { + SIMIX_cond_wait_timeout(trp_proc->meas_select_cond, + trp_proc->meas_select_mutex,60); + } + if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) { + SIMIX_mutex_unlock(trp_proc->meas_select_mutex); + THROW0(timeout_error,0,"Timeout"); + } + SIMIX_mutex_unlock(trp_proc->meas_select_mutex); + + remote_socket = xbt_fifo_shift(trp_proc->meas_selectable_sockets); + remote_sock_data = (gras_trp_sg_sock_data_t *)remote_socket->data; msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas); + + sock_data = (gras_trp_sg_sock_data_t *)sock->data; - sock_data = (gras_trp_sg_sock_data_t *)sock->data; - -/* ok, I'm here, you can continue the communication */ - SIMIX_cond_signal(remote_sock_data->cond); + /* ok, I'm here, you can continue the communication */ + SIMIX_cond_signal(remote_sock_data->cond); - SIMIX_mutex_lock(remote_sock_data->mutex); -/* wait for communication end */ - SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex); + SIMIX_mutex_lock(remote_sock_data->mutex); + /* wait for communication end */ + SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex); if (msg_got->payl_size != size) THROW5(mismatch_error,0, @@ -353,13 +365,15 @@ int gras_trp_sg_chunk_recv(gras_socket_t sock, msg_got->payl_size, size, SIMIX_host_get_name(sock_data->to_host), SIMIX_host_get_name(SIMIX_host_self()), sock->peer_port); - if (data) { + + if (data) memcpy(data,msg_got->payl,size); - } - if (msg_got->payl) - xbt_free(msg_got->payl); - xbt_free(msg_got); - SIMIX_mutex_unlock(remote_sock_data->mutex); - return 0; + + if (msg_got->payl) + xbt_free(msg_got->payl); + + xbt_free(msg_got); + SIMIX_mutex_unlock(remote_sock_data->mutex); + return 0; } diff --git a/src/gras/Transport/transport_private.h b/src/gras/Transport/transport_private.h index becb2627aa..65284420e9 100644 --- a/src/gras/Transport/transport_private.h +++ b/src/gras/Transport/transport_private.h @@ -27,17 +27,21 @@ extern int gras_trp_libdata_id; /* our libdata identifier */ -/* The function that select returned the last time we asked. We need this because the TCP read - are greedy and try to get as much data in their buffer as possible (to avoid subsequent syscalls). +/* The function that select returned the last time we asked. We need this + because the TCP read are greedy and try to get as much data in their + buffer as possible (to avoid subsequent syscalls). (measurement sockets are not buffered and thus not concerned). - So, we can get more than one message in one shoot. And when this happens, we have to handle - the same socket again afterward without select()ing at all. + So, we can get more than one message in one shoot. And when this happens, + we have to handle the same socket again afterward without select()ing at + all. - Then, this data is not a static of the TCP driver because we want to zero it when - it gets closed by the user. If not, we use an already freed pointer, which is bad. + Then, this data is not a static of the TCP driver because we want to + zero it when it gets closed by the user. If not, we use an already freed + pointer, which is bad. - It gets tricky since gras_socket_close is part of the common API, not only the RL one. */ + It gets tricky since gras_socket_close is part of the common API, not + only the RL one. */ extern gras_socket_t _gras_lastly_selected_socket; /** @@ -56,12 +60,17 @@ typedef struct s_gras_socket { int meas :1; /* true if this is an experiment socket instead of messaging */ int recv_ok :1; /* true if it is valid to recv() on the socket (false if it is a file) */ int valid :1; /* false if a select returned that the peer quitted, forcing us to "close" the socket */ - int moredata :1; /* TCP socket use a buffer and read operation get as much data as possible. - It is possible that several messages are received in one shoot, and select won't catch them afterward again. - This boolean indicates that this is the case, so that we don't call select in that case. - Note that measurement sockets are not concerned since they use the TCP interface directly, with no buffer. */ - - unsigned long int buf_size; /* what to say to the OS. field here to remember it when accepting */ + int moredata :1; /* TCP socket use a buffer and read operation get as much + data as possible. It is possible that several messages + are received in one shoot, and select won't catch them + afterward again. + This boolean indicates that this is the case, so that we + don't call select in that case. Note that measurement + sockets are not concerned since they use the TCP + interface directly, with no buffer. */ + + unsigned long int buf_size; /* what to say to the OS. + Field here to remember it when accepting */ int sd; int port; /* port on this side */ @@ -86,7 +95,7 @@ void gras_trp_iov_setup(gras_trp_plugin_t plug); void gras_trp_file_setup(gras_trp_plugin_t plug); void gras_trp_sg_setup(gras_trp_plugin_t plug); -/* +/* FIXME: this should be solved by SIMIX I'm tired of that shit. the select in SG has to create a socket to expeditor manually do deal with the weirdness of the hostdata, themselves here to deal diff --git a/src/gras/Virtu/sg_process.c b/src/gras/Virtu/sg_process.c index f985c86f31..48cd2732ec 100644 --- a/src/gras/Virtu/sg_process.c +++ b/src/gras/Virtu/sg_process.c @@ -52,24 +52,25 @@ gras_process_init() { hd->refcount++; } - trp_pd = (gras_trp_procdata_t)gras_libdata_by_name("gras_trp"); - pd->pid = PID++; - - if (SIMIX_process_self() != NULL ) { - pd->ppid = gras_os_getpid(); - } - else pd->ppid = -1; - - trp_pd->mutex = SIMIX_mutex_init(); - trp_pd->cond = SIMIX_cond_init(); - trp_pd->mutex_meas = SIMIX_mutex_init(); - trp_pd->cond_meas = SIMIX_cond_init(); - trp_pd->active_socket = xbt_fifo_new(); - trp_pd->active_socket_meas = xbt_fifo_new(); - + trp_pd = (gras_trp_procdata_t)gras_libdata_by_name("gras_trp"); + pd->pid = PID++; + + if (SIMIX_process_self() != NULL ) { + pd->ppid = gras_os_getpid(); + } + else pd->ppid = -1; + + trp_pd->msg_selectable_sockets = xbt_fifo_new(); + trp_pd->msg_select_mutex = SIMIX_mutex_init(); + trp_pd->msg_select_cond = SIMIX_cond_init(); + + trp_pd->meas_selectable_sockets = xbt_fifo_new(); + trp_pd->meas_select_mutex = SIMIX_mutex_init(); + trp_pd->meas_select_cond = SIMIX_cond_init(); + VERB2("Creating process '%s' (%d)", - SIMIX_process_get_name(SIMIX_process_self()), - gras_os_getpid()); + SIMIX_process_get_name(SIMIX_process_self()), + gras_os_getpid()); } void @@ -77,18 +78,23 @@ gras_process_exit() { xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_name("gras_trp"))->sockets; gras_socket_t sock_iter; int cursor; - gras_hostdata_t *hd=(gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); - gras_procdata_t *pd=(gras_procdata_t*)SIMIX_process_get_data(SIMIX_process_self()); + gras_hostdata_t *hd= + (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); + gras_procdata_t *pd= + (gras_procdata_t*)SIMIX_process_get_data(SIMIX_process_self()); + + gras_msg_procdata_t msg_pd= + (gras_msg_procdata_t)gras_libdata_by_name("gras_msg"); + gras_trp_procdata_t trp_pd= + (gras_trp_procdata_t)gras_libdata_by_name("gras_trp"); - gras_msg_procdata_t msg_pd=(gras_msg_procdata_t)gras_libdata_by_name("gras_msg"); - gras_trp_procdata_t trp_pd=(gras_trp_procdata_t)gras_libdata_by_name("gras_trp"); + SIMIX_mutex_destroy(trp_pd->msg_select_mutex); + SIMIX_cond_destroy(trp_pd->msg_select_cond); + xbt_fifo_free(trp_pd->msg_selectable_sockets); - SIMIX_mutex_destroy(trp_pd->mutex); - SIMIX_cond_destroy(trp_pd->cond); - xbt_fifo_free(trp_pd->active_socket); - SIMIX_mutex_destroy(trp_pd->mutex_meas); - SIMIX_cond_destroy(trp_pd->cond_meas); - xbt_fifo_free(trp_pd->active_socket_meas); + SIMIX_mutex_destroy(trp_pd->meas_select_mutex); + SIMIX_cond_destroy(trp_pd->meas_select_cond); + xbt_fifo_free(trp_pd->meas_selectable_sockets); xbt_assert0(hd,"Run gras_process_init (ie, gras_init)!!"); @@ -99,13 +105,14 @@ gras_process_exit() { if (xbt_dynar_length(msg_pd->msg_queue)) WARN1("process %d terminated, but some messages are still queued", gras_os_getpid()); - - /* if each process has its sockets list, we need to close them when the process finish */ - xbt_dynar_foreach(sockets,cursor,sock_iter) { - VERB1("Closing the socket %p left open on exit. Maybe a socket leak?", - sock_iter); - gras_socket_close(sock_iter); - } + + /* if each process has its sockets list, we need to close them when the + process finish */ + xbt_dynar_foreach(sockets,cursor,sock_iter) { + VERB1("Closing the socket %p left open on exit. Maybe a socket leak?", + sock_iter); + gras_socket_close(sock_iter); + } if ( ! --(hd->refcount)) { xbt_dynar_free(&hd->ports); free(hd); diff --git a/src/gras/Virtu/virtu_sg.h b/src/gras/Virtu/virtu_sg.h index 1b72191d20..9e2f4f50d5 100644 --- a/src/gras/Virtu/virtu_sg.h +++ b/src/gras/Virtu/virtu_sg.h @@ -16,10 +16,10 @@ #include "gras/Transport/transport_private.h" typedef struct { - int port; /* list of ports used by a server socket */ - int meas; /* (boolean) the channel is for measurements or for messages */ - smx_process_t process; - gras_socket_t socket; + int port; /* list of ports used by a server socket */ + int meas; /* (boolean) the channel is for measurements or for messages */ + smx_process_t process; + gras_socket_t socket; } gras_sg_portrec_t; /* Data for each host */ @@ -32,16 +32,14 @@ typedef struct { /* data for each socket (FIXME: find a better location for that)*/ typedef struct { - //int from_PID; /* process which sent this message */ - //int to_PID; /* process to which this message is destinated */ - smx_process_t from_process; - smx_process_t to_process; + smx_process_t from_process; + smx_process_t to_process; - smx_host_t to_host; /* Who's on other side */ - - smx_cond_t cond; - smx_mutex_t mutex; - gras_socket_t to_socket; + smx_host_t to_host; /* Who's on other side */ + + smx_cond_t cond; + smx_mutex_t mutex; + gras_socket_t to_socket; } gras_trp_sg_sock_data_t; -- 2.20.1