From: mquinson Date: Tue, 10 Nov 2009 12:28:14 +0000 (+0000) Subject: try to port the gras simulation side to the new smx_network infrastructure (not yet... X-Git-Tag: SVN~882 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/063c63642a29000a011c0d6176d30eb62a4e0dca try to port the gras simulation side to the new smx_network infrastructure (not yet working) git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6835 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/include/gras/messages.h b/include/gras/messages.h index e7485157ca..c522c53694 100644 --- a/include/gras/messages.h +++ b/include/gras/messages.h @@ -281,6 +281,7 @@ XBT_PUBLIC(void) gras_msg_rpcreturn(double timeOut, gras_msg_cb_ctx_t ctx, gras_msgtype_t type; unsigned long int ID; void *payl; + void *comm; /* simix_comm in SG */ int payl_size; } s_gras_msg_t, *gras_msg_t; diff --git a/src/gras/Msg/gras_msg_listener.c b/src/gras/Msg/gras_msg_listener.c index b583d58e1e..c66c18a603 100644 --- a/src/gras/Msg/gras_msg_listener.c +++ b/src/gras/Msg/gras_msg_listener.c @@ -26,6 +26,14 @@ typedef struct s_gras_msg_listener_ { xbt_thread_t listener; } s_gras_msg_listener_t; +static void do_close_socket(gras_socket_t sock) { + if (sock->plugin->socket_close) + (*sock->plugin->socket_close) (sock); + /* free the memory */ + if (sock->peer_name) + free(sock->peer_name); + free(sock); +} static void listener_function(void *p) { gras_msg_listener_t me = (gras_msg_listener_t) p; @@ -56,12 +64,9 @@ static void listener_function(void *p) /* empty the list of sockets to trash */ TRY { while (1) { - int sock; + gras_socket_t sock; xbt_queue_shift_timed(me->socks_to_close, &sock, 0); - if (tcp_close(sock) < 0) { - WARN3("error while closing tcp socket %d: %d (%s)\n", - sock, sock_errno, sock_errstr(sock_errno)); - } + do_close_socket(sock); } } CATCH(e) { @@ -134,14 +139,14 @@ void gras_msg_listener_awake() } } -void gras_msg_listener_close_socket(int sd) +void gras_msg_listener_close_socket(gras_socket_t sock) { gras_procdata_t *pd = gras_procdata_get(); if (pd->listener) { - xbt_queue_push(pd->listener->socks_to_close, &sd); + xbt_queue_push(pd->listener->socks_to_close, &sock); gras_msg_listener_awake(); } else { /* do it myself */ - tcp_close(sd); + do_close_socket(sock); } } diff --git a/src/gras/Msg/msg_interface.h b/src/gras/Msg/msg_interface.h index 0bad57a869..27c2c70f63 100644 --- a/src/gras/Msg/msg_interface.h +++ b/src/gras/Msg/msg_interface.h @@ -49,7 +49,7 @@ typedef struct { void gras_msg_send_namev(gras_socket_t sock, const char *namev, void *payload); void gras_msg_listener_awake(void); -void gras_msg_listener_close_socket(int sd); +void gras_msg_listener_close_socket(gras_socket_t sock); #define GRAS_PROTOCOL_VERSION '\1'; diff --git a/src/gras/Msg/sg_msg.c b/src/gras/Msg/sg_msg.c index 1afb8aa050..131ee9f7b1 100644 --- a/src/gras/Msg/sg_msg.c +++ b/src/gras/Msg/sg_msg.c @@ -27,20 +27,13 @@ void gras_msg_send_ext(gras_socket_t sock, unsigned long int ID, gras_msgtype_t msgtype, void *payload) { - - smx_action_t act; /* simix action */ gras_trp_sg_sock_data_t *sock_data; - gras_hostdata_t *hd; - gras_trp_procdata_t trp_remote_proc; - gras_msg_procdata_t msg_remote_proc; gras_msg_t msg; /* message to send */ int whole_payload_size = 0; /* msg->payload_size is used to memcpy the payload. This is used to report the load onto the simulator. It also counts the size of pointed stuff */ sock_data = (gras_trp_sg_sock_data_t *) sock->data; - hd = (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self()); - xbt_assert1(!gras_socket_is_meas(sock), "Asked to send a message on the measurement socket %p", sock); @@ -51,7 +44,7 @@ void gras_msg_send_ext(gras_socket_t sock, msg->type = msgtype; msg->ID = ID; if (kind == e_gras_msg_kind_rpcerror) { - /* error on remote host, carfull, payload is an exception */ + /* error on remote host, careful, payload is an exception */ msg->payl_size = gras_datadesc_size(gras_datadesc_by_name("ex_t")); msg->payl = xbt_malloc(msg->payl_size); whole_payload_size = gras_datadesc_memcpy(gras_datadesc_by_name("ex_t"), @@ -74,56 +67,23 @@ void gras_msg_send_ext(gras_socket_t sock, payload, msg->payl); } - /* put the selectable socket on the queue */ - trp_remote_proc = (gras_trp_procdata_t) - gras_libdata_by_name_from_remote("gras_trp", sock_data->to_process); - - xbt_queue_push(trp_remote_proc->msg_selectable_sockets, &sock); - - /* put message on msg_queue */ - msg_remote_proc = (gras_msg_procdata_t) - gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process); - xbt_fifo_push(msg_remote_proc->msg_to_receive_queue, msg); - - /* wait for the receiver */ - SIMIX_cond_wait(sock_data->cond, sock_data->mutex); - /* creates simix action and waits its ends, waits in the sender host - condition */ - act = SIMIX_action_communicate(SIMIX_host_self(), - sock_data->to_host, msgtype->name, - (double) whole_payload_size, -1); - SIMIX_register_action_to_condition(act, sock_data->cond); VERB5("Sending to %s(%s) a message type '%s' kind '%s' ID %lu", - SIMIX_host_get_name(sock_data->to_host), - SIMIX_process_get_name(sock_data->to_process), + sock->peer_name,sock->peer_proc, msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID); - - SIMIX_cond_wait(sock_data->cond, sock_data->mutex); - SIMIX_unregister_action_to_condition(act, sock_data->cond); - /* error treatmeant (FIXME) */ - - /* cleanup structures */ - SIMIX_action_destroy(act); - SIMIX_mutex_unlock(sock_data->mutex); + SIMIX_network_send(sock_data->rdv,whole_payload_size,-1.,-1.,msg,sizeof(s_gras_msg_t),(smx_comm_t*)&(msg->comm),&msg); VERB0("Message sent"); - } /* * receive the next message on the given socket. */ -void gras_msg_recv(gras_socket_t sock, gras_msg_t msg) -{ - +void gras_msg_recv(gras_socket_t sock, gras_msg_t msg) { + gras_trp_procdata_t pd = + (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id); gras_trp_sg_sock_data_t *sock_data; - gras_trp_sg_sock_data_t *remote_sock_data; - gras_hostdata_t *remote_hd; - gras_msg_t msg_got; - gras_msg_procdata_t msg_procdata = - (gras_msg_procdata_t) gras_libdata_by_name("gras_msg"); xbt_assert1(!gras_socket_is_meas(sock), "Asked to receive a message on the measurement socket %p", @@ -132,11 +92,28 @@ void gras_msg_recv(gras_socket_t sock, gras_msg_t msg) xbt_assert0(msg, "msg is an out parameter of gras_msg_recv..."); sock_data = (gras_trp_sg_sock_data_t *) sock->data; + + /* The message was already received while emulating the select, so simply copy it here */ + memcpy(msg,&(sock_data->ongoing_msg),sizeof(s_gras_msg_t)); + msg->expe = sock; + VERB1("Using %p as a msg",&(sock_data->ongoing_msg)); + VERB5("Received a message type '%s' kind '%s' ID %lu from %s(%s)", + msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID, + sock->peer_name,sock->peer_proc); + + /* Recreate another comm object to replace the one which just terminated */ + int rank = xbt_dynar_search(pd->sockets,&sock); + xbt_assert0(rank>=0,"Socket not found in my array"); + sock_data->ongoing_msg_size = sizeof(s_gras_msg_t); + smx_comm_t comm = SIMIX_network_irecv(sock_data->rdv,&(sock_data->ongoing_msg),&(sock_data->ongoing_msg_size)); + xbt_dynar_set(pd->comms,rank,&comm); + +#if 0 /* KILLME */ + SIMIX_network_recv(sock_data->rdv,-1.,&msg_got,NULL,&comm); + + remote_sock_data = ((gras_trp_sg_sock_data_t *) sock->data)->to_socket->data; - DEBUG3("Remote host %s, Remote Port: %d Local port %d", - SIMIX_host_get_name(sock_data->to_host), sock->peer_port, - sock->port); remote_hd = (gras_hostdata_t *) SIMIX_host_get_data(sock_data->to_host); if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0) { @@ -151,13 +128,13 @@ void gras_msg_recv(gras_socket_t sock, gras_msg_t msg) SIMIX_cond_signal(remote_sock_data->cond); /* wait for communication end */ + INFO2("Wait communication (from %s) termination on %p",sock->peer_name,sock_data->cond); + SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex); msg_got->expe = msg->expe; memcpy(msg, msg_got, sizeof(s_gras_msg_t)); xbt_free(msg_got); SIMIX_mutex_unlock(remote_sock_data->mutex); - - VERB3("Received a message type '%s' kind '%s' ID %lu", // from %s", - msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID); +#endif } diff --git a/src/gras/Transport/rl_transport.c b/src/gras/Transport/rl_transport.c index 395137fc63..a8c2a702ff 100644 --- a/src/gras/Transport/rl_transport.c +++ b/src/gras/Transport/rl_transport.c @@ -175,7 +175,7 @@ gras_socket_t gras_trp_select(double timeout) /* Got a socket to serve */ ready--; - if (sock_iter->accepting && sock_iter->plugin->socket_accept) { + if (sock_iter->is_master && sock_iter->plugin->socket_accept) { /* not a socket but an ear. accept on it and serve next socket */ gras_socket_t accepted = NULL; diff --git a/src/gras/Transport/sg_transport.c b/src/gras/Transport/sg_transport.c index f9bfbb72d7..96e5042110 100644 --- a/src/gras/Transport/sg_transport.c +++ b/src/gras/Transport/sg_transport.c @@ -10,6 +10,7 @@ #include "xbt/ex.h" #include "gras/Transport/transport_private.h" #include "gras/Virtu/virtu_sg.h" +#include "simix/private.h" XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_trp); @@ -27,8 +28,61 @@ gras_socket_t _gras_lastly_selected_socket = NULL; * * if timeout>0 and no message there, wait at most that amount of time before giving up. */ -gras_socket_t gras_trp_select(double timeout) -{ +gras_socket_t gras_trp_select(double timeout) { + static int warned=0; + if (timeout>=0 && !warned) { + warned=1; + WARN0("Timed select not implemented in SG. Switching to blocking select"); + } + gras_trp_procdata_t pd = + (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id); + + gras_socket_t sock_iter, active_socket; + gras_trp_sg_sock_data_t *active_socket_data; + smx_comm_t comm; + unsigned int cursor; + + + /* FIXME: make sure that the ongoing comm is canceled&destroyed when the corresponding socket is closed */ + xbt_assert(xbt_dynar_length(pd->sockets)==xbt_dynar_length(pd->comms)); + + /* Wait for the first terminating comm object */ + int rank = SIMIX_network_waitany(pd->comms); + + /* Don't wait on this socket until the comm object is recreated by gras_msg_recv */ + comm = NULL; + xbt_dynar_set(pd->comms,rank,&comm); + + /* Ok, got something. Open a socket back to the expeditor */ + active_socket = xbt_dynar_get_as(pd->sockets,rank,gras_socket_t); + active_socket_data = (gras_trp_sg_sock_data_t *) active_socket->data; + + /* Try to reuse an already opened socket to that expeditor */ + DEBUG1("Open sockets size %lu", xbt_dynar_length(pd->sockets)); + xbt_dynar_foreach(pd->sockets, cursor, sock_iter) { + gras_trp_sg_sock_data_t *sock_data; + DEBUG1("Consider %p as outgoing socket to expeditor", sock_iter); + + if (sock_iter->meas || !sock_iter->outgoing) + continue; + sock_data = ((gras_trp_sg_sock_data_t *) sock_iter->data); + + if ((sock_data->to_socket == active_socket) && + (sock_data->to_host == + SIMIX_process_get_host(active_socket_data->from_process))) { + xbt_dynar_cursor_unlock(pd->sockets); + return sock_iter; + } + } + + /* Socket to expeditor not created yet */ + DEBUG0("Create a socket to the expeditor"); + + + + return sock_iter; + +#if 0 /* KILLME */ gras_socket_t res; gras_trp_procdata_t pd = (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id); @@ -85,7 +139,7 @@ gras_socket_t gras_trp_select(double timeout) res->incoming = 1; res->outgoing = 1; - res->accepting = 0; + res->is_master = 0; res->sd = -1; res->port = -1; @@ -123,6 +177,7 @@ gras_socket_t gras_trp_select(double timeout) SIMIX_process_get_name(sockdata->to_process), res->port); return res; +#endif } diff --git a/src/gras/Transport/transport.c b/src/gras/Transport/transport.c index dd976c4f99..f36f83d266 100644 --- a/src/gras/Transport/transport.c +++ b/src/gras/Transport/transport.c @@ -157,7 +157,7 @@ void gras_trp_socket_new(int incoming, gras_socket_t * dst) sock->incoming = incoming ? 1 : 0; sock->outgoing = incoming ? 0 : 1; - sock->accepting = incoming ? 1 : 0; + sock->is_master = incoming ? 1 : 0; sock->meas = 0; sock->recvd = 0; sock->valid = 1; @@ -211,7 +211,7 @@ gras_socket_server_ext(unsigned short port, trp->socket_server(trp, sock); DEBUG3("in=%c out=%c accept=%c", sock->incoming ? 'y' : 'n', - sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n'); + sock->outgoing ? 'y' : 'n', sock->is_master ? 'y' : 'n'); } CATCH(e) { free(sock); @@ -297,7 +297,7 @@ gras_socket_client_ext(const char *host, (*trp->socket_client) (trp, sock); DEBUG3("in=%c out=%c accept=%c", sock->incoming ? 'y' : 'n', - sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n'); + sock->outgoing ? 'y' : 'n', sock->is_master ? 'y' : 'n'); } CATCH(e) { free(sock); RETHROW; @@ -340,7 +340,7 @@ void gras_socket_close_voidp(void *sock) { /** \brief Close socket */ void gras_socket_close(gras_socket_t sock) { - xbt_dynar_t sockets = + xbt_dynar_t my_sockets = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets; gras_socket_t sock_iter = NULL; unsigned int cursor; @@ -358,23 +358,17 @@ void gras_socket_close(gras_socket_t sock) } /* FIXME: Issue an event when the socket is closed */ - DEBUG1("sockets pointer before %p", sockets); + DEBUG1("sockets pointer before %p", my_sockets); if (sock) { /* FIXME: Cannot get the dynar mutex, because it can be already locked */ // _xbt_dynar_foreach(sockets,cursor,sock_iter) { - for (cursor = 0; cursor < xbt_dynar_length(sockets); cursor++) { - _xbt_dynar_cursor_get(sockets, cursor, &sock_iter); + for (cursor = 0; cursor < xbt_dynar_length(my_sockets); cursor++) { + _xbt_dynar_cursor_get(my_sockets, cursor, &sock_iter); if (sock == sock_iter) { DEBUG2("remove sock cursor %d dize %lu\n", cursor, - xbt_dynar_length(sockets)); - xbt_dynar_cursor_rm(sockets, &cursor); - if (sock->plugin->socket_close) - (*sock->plugin->socket_close) (sock); - - /* free the memory */ - if (sock->peer_name) - free(sock->peer_name); - free(sock); + xbt_dynar_length(my_sockets)); + xbt_dynar_cursor_rm(my_sockets, &cursor); + gras_msg_listener_close_socket(sock); XBT_OUT; return; } @@ -574,7 +568,7 @@ gras_socket_t gras_socket_meas_accept(gras_socket_t peer) xbt_assert0(peer->meas, "No need to accept on non-measurement sockets (it's automatic)"); - if (!peer->accepting) { + if (!peer->is_master) { /* nothing to accept here (must be in SG) */ /* BUG: FIXME: this is BAD! it makes tricky to free the accepted socket */ return peer; @@ -598,6 +592,7 @@ static void *gras_trp_procdata_new(void) res->name = xbt_strdup("gras_trp"); res->name_len = 0; res->sockets = xbt_dynar_new_sync(sizeof(gras_socket_t *), NULL); + res->comms = xbt_dynar_new(sizeof(void*),NULL); /* stores some smx_comm_t in SG (not used in RL) */ res->myport = 0; return (void *) res; @@ -611,6 +606,7 @@ static void gras_trp_procdata_free(void *data) gras_trp_procdata_t res = (gras_trp_procdata_t) data; xbt_dynar_free(&(res->sockets)); + xbt_dynar_free(&(res->comms)); free(res->name); free(res); } diff --git a/src/gras/Transport/transport_interface.h b/src/gras/Transport/transport_interface.h index 6b847139f0..21c52d70bd 100644 --- a/src/gras/Transport/transport_interface.h +++ b/src/gras/Transport/transport_interface.h @@ -99,12 +99,8 @@ XBT_PUBLIC(gras_trp_plugin_t) int myport; /* Port on which I listen myself */ xbt_dynar_t sockets; /* all sockets known to this process */ - - /* SG only elements. In RL, they are part of the OS ;) */ - - /* List of sockets ready to be select()ed */ - xbt_queue_t msg_selectable_sockets; /* regular sockets */ - xbt_queue_t meas_selectable_sockets; /* measurement ones */ + xbt_dynar_t comms; /* SG cruft: the ongoing communications */ + xbt_dynar_t sockets_to_close; /* The listener is in charge of closing the sockets */ } s_gras_trp_procdata_t, *gras_trp_procdata_t; diff --git a/src/gras/Transport/transport_plugin_file.c b/src/gras/Transport/transport_plugin_file.c index c8a13d0dd2..ca8aadfd0e 100644 --- a/src/gras/Transport/transport_plugin_file.c +++ b/src/gras/Transport/transport_plugin_file.c @@ -98,7 +98,7 @@ gras_socket_t gras_socket_client_from_file(const char *path) path, res->sd, res->incoming ? 'y' : 'n', - res->outgoing ? 'y' : 'n', res->accepting ? 'y' : 'n'); + res->outgoing ? 'y' : 'n', res->is_master ? 'y' : 'n'); xbt_dynar_push(((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets, &res); @@ -138,7 +138,7 @@ gras_socket_t gras_socket_server_from_file(const char *path) DEBUG4("sd=%d in=%c out=%c accept=%c", res->sd, res->incoming ? 'y' : 'n', - res->outgoing ? 'y' : 'n', res->accepting ? 'y' : 'n'); + res->outgoing ? 'y' : 'n', res->is_master ? 'y' : 'n'); xbt_dynar_push(((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets, &res); diff --git a/src/gras/Transport/transport_plugin_sg.c b/src/gras/Transport/transport_plugin_sg.c index 9ebfd79a75..a80c33118b 100644 --- a/src/gras/Transport/transport_plugin_sg.c +++ b/src/gras/Transport/transport_plugin_sg.c @@ -24,10 +24,6 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_sg, gras_trp, *** Prototypes ***/ -/* retrieve the port record associated to a numerical port on an host */ -static void find_port(gras_hostdata_t * hd, int port, - gras_sg_portrec_t * hpd); - void gras_trp_sg_socket_client(gras_trp_plugin_t self, /* OUT */ gras_socket_t sock); @@ -48,36 +44,27 @@ int gras_trp_sg_chunk_recv(gras_socket_t sd, *** Specific plugin part ***/ typedef struct { - int placeholder; /* nothing plugin specific so far */ -} gras_trp_sg_plug_data_t; + xbt_dict_t sockets; /* all known sockets */ +} s_gras_trp_sg_plug_data_t,*gras_trp_sg_plug_data_t; /*** *** Code ***/ -static void find_port(gras_hostdata_t * hd, int port, gras_sg_portrec_t * hpd) -{ - unsigned int cpt; - gras_sg_portrec_t pr; - xbt_assert0(hd, "Please run gras_process_init on each process"); - - xbt_dynar_foreach(hd->ports, cpt, pr) { - if (pr.port == port) { - memcpy(hpd, &pr, sizeof(gras_sg_portrec_t)); - return; - } - } - THROW1(mismatch_error, 0, "Unable to find any portrec for port #%d", port); +static void gras_trp_sg_exit(gras_trp_plugin_t plug){ + gras_trp_sg_plug_data_t mydata = (gras_trp_sg_plug_data_t) plug->data; + xbt_dict_free(&(mydata->sockets)); + xbt_free(plug->data); } - - void gras_trp_sg_setup(gras_trp_plugin_t plug) { - - gras_trp_sg_plug_data_t *data = xbt_new(gras_trp_sg_plug_data_t, 1); + gras_trp_sg_plug_data_t data = xbt_new(s_gras_trp_sg_plug_data_t, 1); plug->data = data; + data->sockets = xbt_dict_new(); + + plug->exit = gras_trp_sg_exit; plug->socket_client = gras_trp_sg_socket_client; plug->socket_server = gras_trp_sg_socket_server; @@ -91,155 +78,120 @@ void gras_trp_sg_setup(gras_trp_plugin_t plug) } void gras_trp_sg_socket_client(gras_trp_plugin_t self, - /* OUT */ gras_socket_t sock) -{ - xbt_ex_t e; + /* OUT */ gras_socket_t sock) { smx_host_t peer; - gras_hostdata_t *hd; gras_trp_sg_sock_data_t *data; - gras_sg_portrec_t pr; + gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id); + - /* make sure this socket will reach someone */ if (!(peer = SIMIX_host_get_by_name(sock->peer_name))) THROW1(mismatch_error, 0, - "Can't connect to %s: no such host.\n", sock->peer_name); + "Can't connect to %s: no such host", sock->peer_name); - if (!(hd = (gras_hostdata_t *) SIMIX_host_get_data(peer))) - THROW1(mismatch_error, 0, - "can't connect to %s: no process on this host", sock->peer_name); + /* make sure this socket will reach someone */ + xbt_dict_t all_sockets = ((gras_trp_sg_plug_data_t)self->data)->sockets; + char *sock_name=bprintf("%s:%d",sock->peer_name,sock->peer_port); + gras_socket_t server = xbt_dict_get_or_null(all_sockets,sock_name); + free(sock_name); - TRY { - find_port(hd, sock->peer_port, &pr); - } - CATCH(e) { - if (e.category == mismatch_error) { - xbt_ex_free(e); - THROW2(mismatch_error, 0, - "can't connect to %s:%d, no process listen on this port", - sock->peer_name, sock->peer_port); - } - RETHROW; - } + if (!server) + THROW2(mismatch_error, 0, + "can't connect to %s:%d, no process listen on this port", + sock->peer_name, sock->peer_port); - if (pr.meas && !sock->meas) { + if (server->meas && !sock->meas) { THROW2(mismatch_error, 0, "can't connect to %s:%d in regular mode, the process listen " "in measurement mode on this port", sock->peer_name, sock->peer_port); } - if (!pr.meas && sock->meas) { + if (!server->meas && sock->meas) { THROW2(mismatch_error, 0, "can't connect to %s:%d in measurement mode, the process listen " "in regular mode on this port", sock->peer_name, sock->peer_port); } /* create the socket */ - data = xbt_new(gras_trp_sg_sock_data_t, 1); + data = xbt_new0(gras_trp_sg_sock_data_t, 1); + data->rdv = ((gras_trp_sg_sock_data_t *)server->data)->rdv; data->from_process = SIMIX_process_self(); - data->to_process = pr.process; - data->to_host = peer; - - /* initialize mutex and condition of the socket */ - data->mutex = SIMIX_mutex_init(); - data->cond = SIMIX_cond_init(); - data->to_socket = pr.socket; sock->data = data; sock->incoming = 1; + /* Create a smx comm object about this socket */ + data->ongoing_msg_size = sizeof(s_gras_msg_t); + smx_comm_t comm = SIMIX_network_irecv(data->rdv,&(data->ongoing_msg),&(data->ongoing_msg_size)); + xbt_dynar_push(pd->comms,&comm); + DEBUG5("%s (PID %d) connects in %s mode to %s:%d", SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(), sock->meas ? "meas" : "regular", sock->peer_name, sock->peer_port); } -void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock) -{ - - gras_hostdata_t *hd = - (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self()); - gras_sg_portrec_t pr; - gras_trp_sg_sock_data_t *data; - volatile int found; - - const char *host = SIMIX_host_get_name(SIMIX_host_self()); - - xbt_ex_t e; - - xbt_assert0(hd, "Please run gras_process_init on each process"); - - sock->accepting = 0; /* no such nuisance in SG */ - found = 0; - TRY { - find_port(hd, sock->port, &pr); - found = 1; - } CATCH(e) { - if (e.category == mismatch_error) - xbt_ex_free(e); - else - RETHROW; - } +void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock) { + gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id); + xbt_dict_t all_sockets = ((gras_trp_sg_plug_data_t)self->data)->sockets; - if (found) - THROW2(mismatch_error, 0, - "can't listen on address %s:%d: port already in use.", - host, sock->port); + /* Make sure that this socket was not opened so far */ + char *sock_name=bprintf("%s:%d",gras_os_myname(),sock->port); + gras_socket_t old = xbt_dict_get_or_null(all_sockets,sock_name); + if (old) + THROW1(mismatch_error, 0, + "can't listen on address %s: port already in use.", + sock_name); - pr.port = sock->port; - pr.meas = sock->meas; - pr.socket = sock; - pr.process = SIMIX_process_self(); - xbt_dynar_push(hd->ports, &pr); - /* Create the socket */ - data = xbt_new(gras_trp_sg_sock_data_t, 1); + /* Create the data associated to the socket */ + gras_trp_sg_sock_data_t *data = xbt_new0(gras_trp_sg_sock_data_t, 1); + data->rdv = SIMIX_rdv_create(sock_name); data->from_process = SIMIX_process_self(); - data->to_process = NULL; - data->to_host = SIMIX_host_self(); + SIMIX_rdv_set_data(data->rdv,sock); - data->cond = SIMIX_cond_init(); - data->mutex = SIMIX_mutex_init(); + sock->is_master = 1; + sock->incoming = 1; sock->data = data; + /* Register the socket to the set of sockets known simulation-wide */ + xbt_dict_set(all_sockets,sock_name,sock,NULL); /* FIXME: add a function to raise a warning at simulation end for non-closed sockets */ + + /* Create a smx comm object about this socket */ + data->ongoing_msg_size = sizeof(s_gras_msg_t); + smx_comm_t comm = SIMIX_network_irecv(data->rdv,&(data->ongoing_msg),&(data->ongoing_msg_size)); + INFO2("irecv comm %p onto %p",comm,&(data->ongoing_msg)); + xbt_dynar_push(pd->comms,&comm); + VERB6("'%s' (%d) ears on %s:%d%s (%p)", SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(), - host, sock->port, sock->meas ? " (mode meas)" : "", sock); - + gras_os_myname(), sock->port, sock->meas ? " (mode meas)" : "", sock); + free(sock_name); } -void gras_trp_sg_socket_close(gras_socket_t sock) -{ - gras_hostdata_t *hd = - (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self()); - unsigned int cpt; - gras_sg_portrec_t pr; - - XBT_IN1(" (sock=%p)", sock); +void gras_trp_sg_socket_close(gras_socket_t sock) { + gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id); + if (sock->is_master) { + /* server mode socket. Unregister it from 'OS' tables */ + char *sock_name=bprintf("%s:%d",gras_os_myname(),sock->port); - if (!sock) - return; + xbt_dict_t sockets = ((gras_trp_sg_plug_data_t)sock->plugin->data)->sockets; + gras_socket_t old = xbt_dict_get_or_null(sockets,sock_name); + if (!old) + WARN2("socket_close called on the unknown server socket %p (port=%d)", + sock, sock->port); + xbt_dict_remove(sockets,sock_name); + free(sock_name); - xbt_assert0(hd, "Please run gras_process_init on each process"); + } if (sock->data) { - SIMIX_cond_destroy(((gras_trp_sg_sock_data_t *) sock->data)->cond); - SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t *) sock->data)->mutex); + SIMIX_rdv_destroy(((gras_trp_sg_sock_data_t *) sock->data)->rdv); free(sock->data); } - if (sock->incoming && !sock->outgoing && sock->port >= 0) { - /* server mode socket. Unregister it from 'OS' tables */ - xbt_dynar_foreach(hd->ports, cpt, pr) { - DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports)); - if (pr.port == sock->port) { - xbt_dynar_cursor_rm(hd->ports, &cpt); - XBT_OUT; - return; - } - } - WARN2("socket_close called on the unknown incoming socket %p (port=%d)", - sock, sock->port); - } + xbt_dynar_push(pd->sockets_to_close,&sock); + gras_msg_listener_awake(); + XBT_OUT; } @@ -261,10 +213,7 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock, char name[256]; static unsigned int count = 0; - smx_action_t act; /* simix action */ gras_trp_sg_sock_data_t *sock_data; - gras_trp_procdata_t trp_remote_proc; - gras_msg_procdata_t msg_remote_proc; gras_msg_t msg; /* message to send */ sock_data = (gras_trp_sg_sock_data_t *) sock->data; @@ -272,7 +221,6 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock, xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); - SIMIX_mutex_lock(sock_data->mutex); sprintf(name, "Chunk[%d]", count++); /*initialize gras message */ msg = xbt_new(s_gras_msg_t, 1); @@ -285,73 +233,22 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock, } else { msg->payl = NULL; } - - - /* put his socket on the selectable socket queue */ - trp_remote_proc = (gras_trp_procdata_t) - gras_libdata_by_name_from_remote("gras_trp", sock_data->to_process); - xbt_queue_push(trp_remote_proc->meas_selectable_sockets, &sock); - - /* put message on msg_queue */ - msg_remote_proc = (gras_msg_procdata_t) - gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process); - - xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas, msg); - - /* wait for the receiver */ - SIMIX_cond_wait(sock_data->cond, sock_data->mutex); - - /* creates simix action and waits its ends, waits in the sender host - condition */ - DEBUG5("send chunk %s from %s to %s:%d (size=%ld)", - name, SIMIX_host_get_name(SIMIX_host_self()), - SIMIX_host_get_name(sock_data->to_host), sock->peer_port, size); - - act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host, - name, size, -1); - SIMIX_register_action_to_condition(act, sock_data->cond); - SIMIX_cond_wait(sock_data->cond, sock_data->mutex); - SIMIX_unregister_action_to_condition(act, sock_data->cond); - /* error treatmeant (FIXME) */ - - /* cleanup structures */ - SIMIX_action_destroy(act); - - SIMIX_mutex_unlock(sock_data->mutex); + SIMIX_network_send(sock_data->rdv,size,-1.,-1.,&msg,sizeof(msg),(smx_comm_t*)&(msg->comm),msg); } int gras_trp_sg_chunk_recv(gras_socket_t sock, char *data, unsigned long int size) { gras_trp_sg_sock_data_t *sock_data; - gras_trp_sg_sock_data_t *remote_sock_data; - gras_socket_t remote_socket = NULL; gras_msg_t msg_got; - gras_msg_procdata_t msg_procdata = - (gras_msg_procdata_t) gras_libdata_by_name("gras_msg"); - gras_trp_procdata_t trp_proc = - (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id); + smx_comm_t comm; xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); - xbt_queue_shift_timed(trp_proc->meas_selectable_sockets, - &remote_socket, 60); - - if (remote_socket == NULL) { - THROW0(timeout_error, 0, "Timeout"); - } - - remote_sock_data = (gras_trp_sg_sock_data_t *) remote_socket->data; - msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas); sock_data = (gras_trp_sg_sock_data_t *) sock->data; - /* ok, I'm here, you can continue the communication */ - SIMIX_cond_signal(remote_sock_data->cond); - - SIMIX_mutex_lock(remote_sock_data->mutex); - /* wait for communication end */ - SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex); + SIMIX_network_recv(sock_data->rdv,-1.,&msg_got,NULL,&comm); if (msg_got->payl_size != size) THROW5(mismatch_error, 0, @@ -367,6 +264,5 @@ int gras_trp_sg_chunk_recv(gras_socket_t sock, xbt_free(msg_got->payl); xbt_free(msg_got); - SIMIX_mutex_unlock(remote_sock_data->mutex); return 0; } diff --git a/src/gras/Transport/transport_plugin_tcp.c b/src/gras/Transport/transport_plugin_tcp.c index de76134c83..ddbf0f8732 100644 --- a/src/gras/Transport/transport_plugin_tcp.c +++ b/src/gras/Transport/transport_plugin_tcp.c @@ -225,7 +225,7 @@ static gras_socket_t gras_trp_sock_socket_accept(gras_socket_t sock) res->plugin = sock->plugin; res->incoming = sock->incoming; res->outgoing = sock->outgoing; - res->accepting = 0; + res->is_master = 0; res->sd = sd; res->port = -1; @@ -266,8 +266,10 @@ static void gras_trp_sock_socket_close(gras_socket_t sock) VERB1("close tcp connection %d", sock->sd); - /* ask the listener to close the socket */ - gras_msg_listener_close_socket(sock->sd); + if (tcp_close(sock->sd) < 0) { + WARN3("error while closing tcp socket %d: %d (%s)\n", + sock->sd, sock_errno, sock_errstr(sock_errno)); + } } /************************************/ diff --git a/src/gras/Transport/transport_private.h b/src/gras/Transport/transport_private.h index 2588813a7e..b242ccbb90 100644 --- a/src/gras/Transport/transport_private.h +++ b/src/gras/Transport/transport_private.h @@ -56,7 +56,7 @@ typedef struct s_gras_socket { int incoming:1; /* true if we can read from this sock */ int outgoing:1; /* true if we can write on this sock */ - int accepting:1; /* true if master incoming sock in tcp */ + int is_master:1; /* true if master incoming sock */ int meas:1; /* true if this is an experiment socket instead of messaging */ int valid:1; /* false if a select returned that the peer quitted, forcing us to "close" the socket */ int moredata:1; /* TCP socket use a buffer and read operation get as much diff --git a/src/gras/Virtu/process.c b/src/gras/Virtu/process.c index fea5400ad2..f7fb45f388 100644 --- a/src/gras/Virtu/process.c +++ b/src/gras/Virtu/process.c @@ -103,10 +103,10 @@ void *gras_libdata_by_name_from_procdata(const char *name, } return res; } - -void *gras_libdata_by_id(int id) -{ - gras_procdata_t *pd = gras_procdata_get(); +void *gras_libdata_by_id(int id) { + return gras_libdata_by_id_from_procdata(id,gras_procdata_get()); +} +void *gras_libdata_by_id_from_procdata(int id,gras_procdata_t *pd) { if (xbt_set_length(pd->libdata) < xbt_dynar_length(_gras_procdata_fabrics)) { /* Damn, some new modules were added since procdata_init(). Amok? */ /* Get 'em all */ diff --git a/src/gras/Virtu/sg_process.c b/src/gras/Virtu/sg_process.c index bc39cf2c7a..569fd3ab1a 100644 --- a/src/gras/Virtu/sg_process.c +++ b/src/gras/Virtu/sg_process.c @@ -48,7 +48,6 @@ void gras_process_init() /* First process on this host */ hd = xbt_new(gras_hostdata_t, 1); hd->refcount = 1; - hd->ports = xbt_dynar_new(sizeof(gras_sg_portrec_t), NULL); SIMIX_host_set_data(SIMIX_host_self(), (void *) hd); } else { hd->refcount++; @@ -62,9 +61,7 @@ void gras_process_init() } else pd->ppid = -1; - trp_pd->msg_selectable_sockets = xbt_queue_new(0, sizeof(gras_socket_t)); - - trp_pd->meas_selectable_sockets = xbt_queue_new(0, sizeof(gras_socket_t)); + trp_pd->sockets_to_close = xbt_dynar_new(sizeof(gras_socket_t),NULL); VERB2("Creating process '%s' (%d)", SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid()); @@ -86,9 +83,7 @@ void gras_process_exit() gras_trp_procdata_t trp_pd = (gras_trp_procdata_t) gras_libdata_by_name("gras_trp"); - xbt_queue_free(&trp_pd->msg_selectable_sockets); - - xbt_queue_free(&trp_pd->meas_selectable_sockets); + xbt_dynar_free(&trp_pd->sockets_to_close); xbt_assert0(hd, "Run gras_process_init (ie, gras_init)!!"); @@ -115,7 +110,6 @@ void gras_process_exit() gras_socket_close(sock_iter); } if (!--(hd->refcount)) { - xbt_dynar_free(&hd->ports); free(hd); } gras_procdata_exit(); diff --git a/src/gras/Virtu/virtu_private.h b/src/gras/Virtu/virtu_private.h index 916223738e..39a31d6d14 100644 --- a/src/gras/Virtu/virtu_private.h +++ b/src/gras/Virtu/virtu_private.h @@ -44,5 +44,7 @@ typedef struct { gras_procdata_t *gras_procdata_get(void); void *gras_libdata_by_name_from_procdata(const char *name, gras_procdata_t * pd); +void *gras_libdata_by_id_from_procdata(int id,gras_procdata_t *pd); + #endif /* GRAS_VIRTU_PRIVATE_H */ diff --git a/src/gras/Virtu/virtu_sg.h b/src/gras/Virtu/virtu_sg.h index 51090f7b51..dab2df5c47 100644 --- a/src/gras/Virtu/virtu_sg.h +++ b/src/gras/Virtu/virtu_sg.h @@ -26,20 +26,21 @@ typedef struct { typedef struct { int refcount; - xbt_dynar_t ports; - + /* Nothing in particular (anymore) */ } gras_hostdata_t; /* data for each socket (FIXME: find a better location for that)*/ typedef struct { - smx_process_t from_process; - smx_process_t to_process; + smx_rdv_t rdv; + smx_process_t from_process; /* the one who created the socket */ smx_host_t to_host; /* Who's on other side */ + smx_comm_t comm; /* Ongoing communication */ + + s_gras_msg_t ongoing_msg; + size_t ongoing_msg_size; - smx_cond_t cond; - smx_mutex_t mutex; - gras_socket_t to_socket; + gras_socket_t to_socket; /* If != NULL, this socket was created as accept when receiving onto to_socket */ } gras_trp_sg_sock_data_t; diff --git a/src/include/simix/simix.h b/src/include/simix/simix.h index 82785d0b17..d746e71d7b 100644 --- a/src/include/simix/simix.h +++ b/src/include/simix/simix.h @@ -207,7 +207,7 @@ XBT_PUBLIC(void*) SIMIX_rdv_get_data(smx_rdv_t rdv); /*****Communication Requests*****/ XBT_PUBLIC(void) SIMIX_communication_cancel(smx_comm_t comm); XBT_PUBLIC(double) SIMIX_communication_get_remains(smx_comm_t comm); -XBT_PUBLIC(void *) SIMIX_communication_get_data(smx_comm_t comm); +XBT_PUBLIC(void *) SIMIX_communication_get_sentdata(smx_comm_t comm); /*****Networking*****/ XBT_PUBLIC(void) SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate, diff --git a/src/msg/msg_mailbox.c b/src/msg/msg_mailbox.c index 3425a413dc..f6f3230e25 100644 --- a/src/msg/msg_mailbox.c +++ b/src/msg/msg_mailbox.c @@ -64,7 +64,7 @@ m_task_t MSG_mailbox_get_head(msg_mailbox_t mailbox) if(!comm) return NULL; - return (m_task_t)SIMIX_communication_get_data(comm); + return (m_task_t)SIMIX_communication_get_sentdata(comm); } int diff --git a/src/simix/smx_global.c b/src/simix/smx_global.c index 26e0c5e11d..9df8f76170 100644 --- a/src/simix/smx_global.c +++ b/src/simix/smx_global.c @@ -19,6 +19,7 @@ XBT_LOG_EXTERNAL_CATEGORY(simix_host); XBT_LOG_EXTERNAL_CATEGORY(simix_process); XBT_LOG_EXTERNAL_CATEGORY(simix_synchro); XBT_LOG_EXTERNAL_CATEGORY(simix_context); +XBT_LOG_EXTERNAL_CATEGORY(simix_network); XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_kernel, simix, "Logging specific to SIMIX (kernel)"); @@ -57,6 +58,7 @@ void SIMIX_global_init(int *argc, char **argv) XBT_LOG_CONNECT(simix_process, simix); XBT_LOG_CONNECT(simix_synchro, simix); XBT_LOG_CONNECT(simix_context, simix); + XBT_LOG_CONNECT(simix_network, simix); simix_global = xbt_new0(s_SIMIX_Global_t, 1); @@ -417,8 +419,10 @@ double SIMIX_solve(xbt_fifo_t actions_done, xbt_fifo_t actions_failed) if (smx_action) { /* Copy the transfered data of the completed communication actions */ /* FIXME: find a better way to determine if its a comm action */ - if(smx_action->data != NULL) + if(smx_action->data != NULL) { + CDEBUG1(simix_network,"Communication %p finished",smx_action->data); SIMIX_network_copy_data((smx_comm_t)smx_action->data); + } SIMIX_action_signal_all(smx_action); } } diff --git a/src/simix/smx_network.c b/src/simix/smx_network.c index 7008ec30fe..4c32439033 100644 --- a/src/simix/smx_network.c +++ b/src/simix/smx_network.c @@ -87,7 +87,7 @@ smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rdv, smx_comm_type_t type) } /* no relevant request found. Return NULL */ - DEBUG0("Communication request not found"); + DEBUG0("Communication request not found. I assume that other side will arrive later on."); return NULL; } @@ -300,36 +300,32 @@ double SIMIX_communication_get_remains(smx_comm_t comm) */ void SIMIX_network_copy_data(smx_comm_t comm) { - /* If there is no data to be copy then return */ - if(!comm->src_buff || !comm->dst_buff) - return; - size_t src_buff_size = comm->src_buff_size; size_t dst_buff_size = *comm->dst_buff_size; - - /* Copy at most dst_buff_size bytes of the message to receiver's buffer */ - dst_buff_size = MIN(dst_buff_size, src_buff_size); - - /* Update the receiver's buffer size to the copied amount */ - if (comm->dst_buff_size) - *comm->dst_buff_size = dst_buff_size; - if(dst_buff_size == 0) + xbt_assert(src_buff_size == dst_buff_size); + + /* If there is no data to copy then return */ + if(!comm->src_buff || !comm->dst_buff || dst_buff_size == 0) return; memcpy(comm->dst_buff, comm->src_buff, dst_buff_size); - DEBUG4("Copying comm %p data from %s -> %s (%zu bytes)", + DEBUG6("Copying comm %p data from %s -> %s (%zu bytes %p->%p)", comm, comm->src_proc->smx_host->name, comm->dst_proc->smx_host->name, - dst_buff_size); + dst_buff_size,comm->src_buff,comm->dst_buff); } /** * \brief Return the user data associated to the communication + * + * In MSG and GRAS, that data is the exchanged task/msg itself, since + * (i) In MSG, the receiver still wants to read the task although the communication didn't complete. + * (ii) In GRAS, we need to retrieve that gras_msg_t during the select * \param comm The communication * \return the user data */ -void *SIMIX_communication_get_data(smx_comm_t comm) +void *SIMIX_communication_get_sentdata(smx_comm_t comm) { return comm->data; } @@ -427,6 +423,7 @@ smx_comm_t SIMIX_network_irecv(smx_rdv_t rdv, void *dst_buff, size_t *dst_buff_s comm->dst_buff = dst_buff; comm->dst_buff_size = dst_buff_size; + DEBUG2("Receive data for %p into %p",comm,comm->dst_buff); SIMIX_communication_start(comm); return comm; }