From 1a2f031cc377418f3716415ba669434bfd3a5df4 Mon Sep 17 00:00:00 2001 From: mquinson Date: Tue, 10 Nov 2009 12:32:54 +0000 Subject: [PATCH] Revert "try to port the gras simulation side to the new smx_network infrastructure (not yet working)" This git branch is not ready for public consumption yet (sorry) This reverts commit 063c63642a29000a011c0d6176d30eb62a4e0dca. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6836 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- include/gras/messages.h | 1 - src/gras/Msg/gras_msg_listener.c | 21 +- src/gras/Msg/msg_interface.h | 2 +- src/gras/Msg/sg_msg.c | 81 ++++--- src/gras/Transport/rl_transport.c | 2 +- src/gras/Transport/sg_transport.c | 61 +---- src/gras/Transport/transport.c | 30 ++- src/gras/Transport/transport_interface.h | 8 +- src/gras/Transport/transport_plugin_file.c | 4 +- src/gras/Transport/transport_plugin_sg.c | 262 ++++++++++++++------- src/gras/Transport/transport_plugin_tcp.c | 8 +- src/gras/Transport/transport_private.h | 2 +- src/gras/Virtu/process.c | 8 +- src/gras/Virtu/sg_process.c | 10 +- src/gras/Virtu/virtu_private.h | 2 - src/gras/Virtu/virtu_sg.h | 15 +- src/include/simix/simix.h | 2 +- src/msg/msg_mailbox.c | 2 +- src/simix/smx_global.c | 6 +- src/simix/smx_network.c | 29 ++- 20 files changed, 315 insertions(+), 241 deletions(-) diff --git a/include/gras/messages.h b/include/gras/messages.h index c522c53694..e7485157ca 100644 --- a/include/gras/messages.h +++ b/include/gras/messages.h @@ -281,7 +281,6 @@ XBT_PUBLIC(void) gras_msg_rpcreturn(double timeOut, gras_msg_cb_ctx_t ctx, gras_msgtype_t type; unsigned long int ID; void *payl; - void *comm; /* simix_comm in SG */ int payl_size; } s_gras_msg_t, *gras_msg_t; diff --git a/src/gras/Msg/gras_msg_listener.c b/src/gras/Msg/gras_msg_listener.c index c66c18a603..b583d58e1e 100644 --- a/src/gras/Msg/gras_msg_listener.c +++ b/src/gras/Msg/gras_msg_listener.c @@ -26,14 +26,6 @@ typedef struct s_gras_msg_listener_ { xbt_thread_t listener; } s_gras_msg_listener_t; -static void do_close_socket(gras_socket_t sock) { - if (sock->plugin->socket_close) - (*sock->plugin->socket_close) (sock); - /* free the memory */ - if (sock->peer_name) - free(sock->peer_name); - free(sock); -} static void listener_function(void *p) { gras_msg_listener_t me = (gras_msg_listener_t) p; @@ -64,9 +56,12 @@ static void listener_function(void *p) /* empty the list of sockets to trash */ TRY { while (1) { - gras_socket_t sock; + int sock; xbt_queue_shift_timed(me->socks_to_close, &sock, 0); - do_close_socket(sock); + if (tcp_close(sock) < 0) { + WARN3("error while closing tcp socket %d: %d (%s)\n", + sock, sock_errno, sock_errstr(sock_errno)); + } } } CATCH(e) { @@ -139,14 +134,14 @@ void gras_msg_listener_awake() } } -void gras_msg_listener_close_socket(gras_socket_t sock) +void gras_msg_listener_close_socket(int sd) { gras_procdata_t *pd = gras_procdata_get(); if (pd->listener) { - xbt_queue_push(pd->listener->socks_to_close, &sock); + xbt_queue_push(pd->listener->socks_to_close, &sd); gras_msg_listener_awake(); } else { /* do it myself */ - do_close_socket(sock); + tcp_close(sd); } } diff --git a/src/gras/Msg/msg_interface.h b/src/gras/Msg/msg_interface.h index 27c2c70f63..0bad57a869 100644 --- a/src/gras/Msg/msg_interface.h +++ b/src/gras/Msg/msg_interface.h @@ -49,7 +49,7 @@ typedef struct { void gras_msg_send_namev(gras_socket_t sock, const char *namev, void *payload); void gras_msg_listener_awake(void); -void gras_msg_listener_close_socket(gras_socket_t sock); +void gras_msg_listener_close_socket(int sd); #define GRAS_PROTOCOL_VERSION '\1'; diff --git a/src/gras/Msg/sg_msg.c b/src/gras/Msg/sg_msg.c index 131ee9f7b1..1afb8aa050 100644 --- a/src/gras/Msg/sg_msg.c +++ b/src/gras/Msg/sg_msg.c @@ -27,13 +27,20 @@ void gras_msg_send_ext(gras_socket_t sock, unsigned long int ID, gras_msgtype_t msgtype, void *payload) { + + smx_action_t act; /* simix action */ gras_trp_sg_sock_data_t *sock_data; + gras_hostdata_t *hd; + gras_trp_procdata_t trp_remote_proc; + gras_msg_procdata_t msg_remote_proc; gras_msg_t msg; /* message to send */ int whole_payload_size = 0; /* msg->payload_size is used to memcpy the payload. This is used to report the load onto the simulator. It also counts the size of pointed stuff */ sock_data = (gras_trp_sg_sock_data_t *) sock->data; + hd = (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self()); + xbt_assert1(!gras_socket_is_meas(sock), "Asked to send a message on the measurement socket %p", sock); @@ -44,7 +51,7 @@ void gras_msg_send_ext(gras_socket_t sock, msg->type = msgtype; msg->ID = ID; if (kind == e_gras_msg_kind_rpcerror) { - /* error on remote host, careful, payload is an exception */ + /* error on remote host, carfull, payload is an exception */ msg->payl_size = gras_datadesc_size(gras_datadesc_by_name("ex_t")); msg->payl = xbt_malloc(msg->payl_size); whole_payload_size = gras_datadesc_memcpy(gras_datadesc_by_name("ex_t"), @@ -67,23 +74,56 @@ void gras_msg_send_ext(gras_socket_t sock, payload, msg->payl); } + /* put the selectable socket on the queue */ + trp_remote_proc = (gras_trp_procdata_t) + gras_libdata_by_name_from_remote("gras_trp", sock_data->to_process); + + xbt_queue_push(trp_remote_proc->msg_selectable_sockets, &sock); + + /* put message on msg_queue */ + msg_remote_proc = (gras_msg_procdata_t) + gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process); + xbt_fifo_push(msg_remote_proc->msg_to_receive_queue, msg); + + /* wait for the receiver */ + SIMIX_cond_wait(sock_data->cond, sock_data->mutex); + /* creates simix action and waits its ends, waits in the sender host + condition */ + act = SIMIX_action_communicate(SIMIX_host_self(), + sock_data->to_host, msgtype->name, + (double) whole_payload_size, -1); + SIMIX_register_action_to_condition(act, sock_data->cond); VERB5("Sending to %s(%s) a message type '%s' kind '%s' ID %lu", - sock->peer_name,sock->peer_proc, + SIMIX_host_get_name(sock_data->to_host), + SIMIX_process_get_name(sock_data->to_process), msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID); - SIMIX_network_send(sock_data->rdv,whole_payload_size,-1.,-1.,msg,sizeof(s_gras_msg_t),(smx_comm_t*)&(msg->comm),&msg); + + SIMIX_cond_wait(sock_data->cond, sock_data->mutex); + SIMIX_unregister_action_to_condition(act, sock_data->cond); + /* error treatmeant (FIXME) */ + + /* cleanup structures */ + SIMIX_action_destroy(act); + SIMIX_mutex_unlock(sock_data->mutex); VERB0("Message sent"); + } /* * receive the next message on the given socket. */ -void gras_msg_recv(gras_socket_t sock, gras_msg_t msg) { - gras_trp_procdata_t pd = - (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id); +void gras_msg_recv(gras_socket_t sock, gras_msg_t msg) +{ + gras_trp_sg_sock_data_t *sock_data; + gras_trp_sg_sock_data_t *remote_sock_data; + gras_hostdata_t *remote_hd; + gras_msg_t msg_got; + gras_msg_procdata_t msg_procdata = + (gras_msg_procdata_t) gras_libdata_by_name("gras_msg"); xbt_assert1(!gras_socket_is_meas(sock), "Asked to receive a message on the measurement socket %p", @@ -92,28 +132,11 @@ void gras_msg_recv(gras_socket_t sock, gras_msg_t msg) { xbt_assert0(msg, "msg is an out parameter of gras_msg_recv..."); sock_data = (gras_trp_sg_sock_data_t *) sock->data; - - /* The message was already received while emulating the select, so simply copy it here */ - memcpy(msg,&(sock_data->ongoing_msg),sizeof(s_gras_msg_t)); - msg->expe = sock; - VERB1("Using %p as a msg",&(sock_data->ongoing_msg)); - VERB5("Received a message type '%s' kind '%s' ID %lu from %s(%s)", - msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID, - sock->peer_name,sock->peer_proc); - - /* Recreate another comm object to replace the one which just terminated */ - int rank = xbt_dynar_search(pd->sockets,&sock); - xbt_assert0(rank>=0,"Socket not found in my array"); - sock_data->ongoing_msg_size = sizeof(s_gras_msg_t); - smx_comm_t comm = SIMIX_network_irecv(sock_data->rdv,&(sock_data->ongoing_msg),&(sock_data->ongoing_msg_size)); - xbt_dynar_set(pd->comms,rank,&comm); - -#if 0 /* KILLME */ - SIMIX_network_recv(sock_data->rdv,-1.,&msg_got,NULL,&comm); - - remote_sock_data = ((gras_trp_sg_sock_data_t *) sock->data)->to_socket->data; + DEBUG3("Remote host %s, Remote Port: %d Local port %d", + SIMIX_host_get_name(sock_data->to_host), sock->peer_port, + sock->port); remote_hd = (gras_hostdata_t *) SIMIX_host_get_data(sock_data->to_host); if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0) { @@ -128,13 +151,13 @@ void gras_msg_recv(gras_socket_t sock, gras_msg_t msg) { SIMIX_cond_signal(remote_sock_data->cond); /* wait for communication end */ - INFO2("Wait communication (from %s) termination on %p",sock->peer_name,sock_data->cond); - SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex); msg_got->expe = msg->expe; memcpy(msg, msg_got, sizeof(s_gras_msg_t)); xbt_free(msg_got); SIMIX_mutex_unlock(remote_sock_data->mutex); -#endif + + VERB3("Received a message type '%s' kind '%s' ID %lu", // from %s", + msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID); } diff --git a/src/gras/Transport/rl_transport.c b/src/gras/Transport/rl_transport.c index a8c2a702ff..395137fc63 100644 --- a/src/gras/Transport/rl_transport.c +++ b/src/gras/Transport/rl_transport.c @@ -175,7 +175,7 @@ gras_socket_t gras_trp_select(double timeout) /* Got a socket to serve */ ready--; - if (sock_iter->is_master && sock_iter->plugin->socket_accept) { + if (sock_iter->accepting && sock_iter->plugin->socket_accept) { /* not a socket but an ear. accept on it and serve next socket */ gras_socket_t accepted = NULL; diff --git a/src/gras/Transport/sg_transport.c b/src/gras/Transport/sg_transport.c index 96e5042110..f9bfbb72d7 100644 --- a/src/gras/Transport/sg_transport.c +++ b/src/gras/Transport/sg_transport.c @@ -10,7 +10,6 @@ #include "xbt/ex.h" #include "gras/Transport/transport_private.h" #include "gras/Virtu/virtu_sg.h" -#include "simix/private.h" XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_trp); @@ -28,61 +27,8 @@ gras_socket_t _gras_lastly_selected_socket = NULL; * * if timeout>0 and no message there, wait at most that amount of time before giving up. */ -gras_socket_t gras_trp_select(double timeout) { - static int warned=0; - if (timeout>=0 && !warned) { - warned=1; - WARN0("Timed select not implemented in SG. Switching to blocking select"); - } - gras_trp_procdata_t pd = - (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id); - - gras_socket_t sock_iter, active_socket; - gras_trp_sg_sock_data_t *active_socket_data; - smx_comm_t comm; - unsigned int cursor; - - - /* FIXME: make sure that the ongoing comm is canceled&destroyed when the corresponding socket is closed */ - xbt_assert(xbt_dynar_length(pd->sockets)==xbt_dynar_length(pd->comms)); - - /* Wait for the first terminating comm object */ - int rank = SIMIX_network_waitany(pd->comms); - - /* Don't wait on this socket until the comm object is recreated by gras_msg_recv */ - comm = NULL; - xbt_dynar_set(pd->comms,rank,&comm); - - /* Ok, got something. Open a socket back to the expeditor */ - active_socket = xbt_dynar_get_as(pd->sockets,rank,gras_socket_t); - active_socket_data = (gras_trp_sg_sock_data_t *) active_socket->data; - - /* Try to reuse an already opened socket to that expeditor */ - DEBUG1("Open sockets size %lu", xbt_dynar_length(pd->sockets)); - xbt_dynar_foreach(pd->sockets, cursor, sock_iter) { - gras_trp_sg_sock_data_t *sock_data; - DEBUG1("Consider %p as outgoing socket to expeditor", sock_iter); - - if (sock_iter->meas || !sock_iter->outgoing) - continue; - sock_data = ((gras_trp_sg_sock_data_t *) sock_iter->data); - - if ((sock_data->to_socket == active_socket) && - (sock_data->to_host == - SIMIX_process_get_host(active_socket_data->from_process))) { - xbt_dynar_cursor_unlock(pd->sockets); - return sock_iter; - } - } - - /* Socket to expeditor not created yet */ - DEBUG0("Create a socket to the expeditor"); - - - - return sock_iter; - -#if 0 /* KILLME */ +gras_socket_t gras_trp_select(double timeout) +{ gras_socket_t res; gras_trp_procdata_t pd = (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id); @@ -139,7 +85,7 @@ gras_socket_t gras_trp_select(double timeout) { res->incoming = 1; res->outgoing = 1; - res->is_master = 0; + res->accepting = 0; res->sd = -1; res->port = -1; @@ -177,7 +123,6 @@ gras_socket_t gras_trp_select(double timeout) { SIMIX_process_get_name(sockdata->to_process), res->port); return res; -#endif } diff --git a/src/gras/Transport/transport.c b/src/gras/Transport/transport.c index f36f83d266..dd976c4f99 100644 --- a/src/gras/Transport/transport.c +++ b/src/gras/Transport/transport.c @@ -157,7 +157,7 @@ void gras_trp_socket_new(int incoming, gras_socket_t * dst) sock->incoming = incoming ? 1 : 0; sock->outgoing = incoming ? 0 : 1; - sock->is_master = incoming ? 1 : 0; + sock->accepting = incoming ? 1 : 0; sock->meas = 0; sock->recvd = 0; sock->valid = 1; @@ -211,7 +211,7 @@ gras_socket_server_ext(unsigned short port, trp->socket_server(trp, sock); DEBUG3("in=%c out=%c accept=%c", sock->incoming ? 'y' : 'n', - sock->outgoing ? 'y' : 'n', sock->is_master ? 'y' : 'n'); + sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n'); } CATCH(e) { free(sock); @@ -297,7 +297,7 @@ gras_socket_client_ext(const char *host, (*trp->socket_client) (trp, sock); DEBUG3("in=%c out=%c accept=%c", sock->incoming ? 'y' : 'n', - sock->outgoing ? 'y' : 'n', sock->is_master ? 'y' : 'n'); + sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n'); } CATCH(e) { free(sock); RETHROW; @@ -340,7 +340,7 @@ void gras_socket_close_voidp(void *sock) { /** \brief Close socket */ void gras_socket_close(gras_socket_t sock) { - xbt_dynar_t my_sockets = + xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets; gras_socket_t sock_iter = NULL; unsigned int cursor; @@ -358,17 +358,23 @@ void gras_socket_close(gras_socket_t sock) } /* FIXME: Issue an event when the socket is closed */ - DEBUG1("sockets pointer before %p", my_sockets); + DEBUG1("sockets pointer before %p", sockets); if (sock) { /* FIXME: Cannot get the dynar mutex, because it can be already locked */ // _xbt_dynar_foreach(sockets,cursor,sock_iter) { - for (cursor = 0; cursor < xbt_dynar_length(my_sockets); cursor++) { - _xbt_dynar_cursor_get(my_sockets, cursor, &sock_iter); + for (cursor = 0; cursor < xbt_dynar_length(sockets); cursor++) { + _xbt_dynar_cursor_get(sockets, cursor, &sock_iter); if (sock == sock_iter) { DEBUG2("remove sock cursor %d dize %lu\n", cursor, - xbt_dynar_length(my_sockets)); - xbt_dynar_cursor_rm(my_sockets, &cursor); - gras_msg_listener_close_socket(sock); + xbt_dynar_length(sockets)); + xbt_dynar_cursor_rm(sockets, &cursor); + if (sock->plugin->socket_close) + (*sock->plugin->socket_close) (sock); + + /* free the memory */ + if (sock->peer_name) + free(sock->peer_name); + free(sock); XBT_OUT; return; } @@ -568,7 +574,7 @@ gras_socket_t gras_socket_meas_accept(gras_socket_t peer) xbt_assert0(peer->meas, "No need to accept on non-measurement sockets (it's automatic)"); - if (!peer->is_master) { + if (!peer->accepting) { /* nothing to accept here (must be in SG) */ /* BUG: FIXME: this is BAD! it makes tricky to free the accepted socket */ return peer; @@ -592,7 +598,6 @@ static void *gras_trp_procdata_new(void) res->name = xbt_strdup("gras_trp"); res->name_len = 0; res->sockets = xbt_dynar_new_sync(sizeof(gras_socket_t *), NULL); - res->comms = xbt_dynar_new(sizeof(void*),NULL); /* stores some smx_comm_t in SG (not used in RL) */ res->myport = 0; return (void *) res; @@ -606,7 +611,6 @@ static void gras_trp_procdata_free(void *data) gras_trp_procdata_t res = (gras_trp_procdata_t) data; xbt_dynar_free(&(res->sockets)); - xbt_dynar_free(&(res->comms)); free(res->name); free(res); } diff --git a/src/gras/Transport/transport_interface.h b/src/gras/Transport/transport_interface.h index 21c52d70bd..6b847139f0 100644 --- a/src/gras/Transport/transport_interface.h +++ b/src/gras/Transport/transport_interface.h @@ -99,8 +99,12 @@ XBT_PUBLIC(gras_trp_plugin_t) int myport; /* Port on which I listen myself */ xbt_dynar_t sockets; /* all sockets known to this process */ - xbt_dynar_t comms; /* SG cruft: the ongoing communications */ - xbt_dynar_t sockets_to_close; /* The listener is in charge of closing the sockets */ + + /* SG only elements. In RL, they are part of the OS ;) */ + + /* List of sockets ready to be select()ed */ + xbt_queue_t msg_selectable_sockets; /* regular sockets */ + xbt_queue_t meas_selectable_sockets; /* measurement ones */ } s_gras_trp_procdata_t, *gras_trp_procdata_t; diff --git a/src/gras/Transport/transport_plugin_file.c b/src/gras/Transport/transport_plugin_file.c index ca8aadfd0e..c8a13d0dd2 100644 --- a/src/gras/Transport/transport_plugin_file.c +++ b/src/gras/Transport/transport_plugin_file.c @@ -98,7 +98,7 @@ gras_socket_t gras_socket_client_from_file(const char *path) path, res->sd, res->incoming ? 'y' : 'n', - res->outgoing ? 'y' : 'n', res->is_master ? 'y' : 'n'); + res->outgoing ? 'y' : 'n', res->accepting ? 'y' : 'n'); xbt_dynar_push(((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets, &res); @@ -138,7 +138,7 @@ gras_socket_t gras_socket_server_from_file(const char *path) DEBUG4("sd=%d in=%c out=%c accept=%c", res->sd, res->incoming ? 'y' : 'n', - res->outgoing ? 'y' : 'n', res->is_master ? 'y' : 'n'); + res->outgoing ? 'y' : 'n', res->accepting ? 'y' : 'n'); xbt_dynar_push(((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets, &res); diff --git a/src/gras/Transport/transport_plugin_sg.c b/src/gras/Transport/transport_plugin_sg.c index a80c33118b..9ebfd79a75 100644 --- a/src/gras/Transport/transport_plugin_sg.c +++ b/src/gras/Transport/transport_plugin_sg.c @@ -24,6 +24,10 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_sg, gras_trp, *** Prototypes ***/ +/* retrieve the port record associated to a numerical port on an host */ +static void find_port(gras_hostdata_t * hd, int port, + gras_sg_portrec_t * hpd); + void gras_trp_sg_socket_client(gras_trp_plugin_t self, /* OUT */ gras_socket_t sock); @@ -44,27 +48,36 @@ int gras_trp_sg_chunk_recv(gras_socket_t sd, *** Specific plugin part ***/ typedef struct { - xbt_dict_t sockets; /* all known sockets */ -} s_gras_trp_sg_plug_data_t,*gras_trp_sg_plug_data_t; + int placeholder; /* nothing plugin specific so far */ +} gras_trp_sg_plug_data_t; /*** *** Code ***/ +static void find_port(gras_hostdata_t * hd, int port, gras_sg_portrec_t * hpd) +{ + unsigned int cpt; + gras_sg_portrec_t pr; -static void gras_trp_sg_exit(gras_trp_plugin_t plug){ - gras_trp_sg_plug_data_t mydata = (gras_trp_sg_plug_data_t) plug->data; - xbt_dict_free(&(mydata->sockets)); - xbt_free(plug->data); + xbt_assert0(hd, "Please run gras_process_init on each process"); + + xbt_dynar_foreach(hd->ports, cpt, pr) { + if (pr.port == port) { + memcpy(hpd, &pr, sizeof(gras_sg_portrec_t)); + return; + } + } + THROW1(mismatch_error, 0, "Unable to find any portrec for port #%d", port); } + + void gras_trp_sg_setup(gras_trp_plugin_t plug) { - gras_trp_sg_plug_data_t data = xbt_new(s_gras_trp_sg_plug_data_t, 1); - plug->data = data; - data->sockets = xbt_dict_new(); + gras_trp_sg_plug_data_t *data = xbt_new(gras_trp_sg_plug_data_t, 1); - plug->exit = gras_trp_sg_exit; + plug->data = data; plug->socket_client = gras_trp_sg_socket_client; plug->socket_server = gras_trp_sg_socket_server; @@ -78,120 +91,155 @@ void gras_trp_sg_setup(gras_trp_plugin_t plug) } void gras_trp_sg_socket_client(gras_trp_plugin_t self, - /* OUT */ gras_socket_t sock) { + /* OUT */ gras_socket_t sock) +{ + xbt_ex_t e; smx_host_t peer; + gras_hostdata_t *hd; gras_trp_sg_sock_data_t *data; - gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id); - + gras_sg_portrec_t pr; + /* make sure this socket will reach someone */ if (!(peer = SIMIX_host_get_by_name(sock->peer_name))) THROW1(mismatch_error, 0, - "Can't connect to %s: no such host", sock->peer_name); + "Can't connect to %s: no such host.\n", sock->peer_name); - /* make sure this socket will reach someone */ - xbt_dict_t all_sockets = ((gras_trp_sg_plug_data_t)self->data)->sockets; - char *sock_name=bprintf("%s:%d",sock->peer_name,sock->peer_port); - gras_socket_t server = xbt_dict_get_or_null(all_sockets,sock_name); - free(sock_name); + if (!(hd = (gras_hostdata_t *) SIMIX_host_get_data(peer))) + THROW1(mismatch_error, 0, + "can't connect to %s: no process on this host", sock->peer_name); - if (!server) - THROW2(mismatch_error, 0, - "can't connect to %s:%d, no process listen on this port", - sock->peer_name, sock->peer_port); + TRY { + find_port(hd, sock->peer_port, &pr); + } + CATCH(e) { + if (e.category == mismatch_error) { + xbt_ex_free(e); + THROW2(mismatch_error, 0, + "can't connect to %s:%d, no process listen on this port", + sock->peer_name, sock->peer_port); + } + RETHROW; + } - if (server->meas && !sock->meas) { + if (pr.meas && !sock->meas) { THROW2(mismatch_error, 0, "can't connect to %s:%d in regular mode, the process listen " "in measurement mode on this port", sock->peer_name, sock->peer_port); } - if (!server->meas && sock->meas) { + if (!pr.meas && sock->meas) { THROW2(mismatch_error, 0, "can't connect to %s:%d in measurement mode, the process listen " "in regular mode on this port", sock->peer_name, sock->peer_port); } /* create the socket */ - data = xbt_new0(gras_trp_sg_sock_data_t, 1); - data->rdv = ((gras_trp_sg_sock_data_t *)server->data)->rdv; + data = xbt_new(gras_trp_sg_sock_data_t, 1); data->from_process = SIMIX_process_self(); + data->to_process = pr.process; + data->to_host = peer; + + /* initialize mutex and condition of the socket */ + data->mutex = SIMIX_mutex_init(); + data->cond = SIMIX_cond_init(); + data->to_socket = pr.socket; sock->data = data; sock->incoming = 1; - /* Create a smx comm object about this socket */ - data->ongoing_msg_size = sizeof(s_gras_msg_t); - smx_comm_t comm = SIMIX_network_irecv(data->rdv,&(data->ongoing_msg),&(data->ongoing_msg_size)); - xbt_dynar_push(pd->comms,&comm); - DEBUG5("%s (PID %d) connects in %s mode to %s:%d", SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(), sock->meas ? "meas" : "regular", sock->peer_name, sock->peer_port); } -void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock) { - gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id); - xbt_dict_t all_sockets = ((gras_trp_sg_plug_data_t)self->data)->sockets; +void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock) +{ - /* Make sure that this socket was not opened so far */ - char *sock_name=bprintf("%s:%d",gras_os_myname(),sock->port); - gras_socket_t old = xbt_dict_get_or_null(all_sockets,sock_name); - if (old) - THROW1(mismatch_error, 0, - "can't listen on address %s: port already in use.", - sock_name); + gras_hostdata_t *hd = + (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self()); + gras_sg_portrec_t pr; + gras_trp_sg_sock_data_t *data; + volatile int found; + const char *host = SIMIX_host_get_name(SIMIX_host_self()); - /* Create the data associated to the socket */ - gras_trp_sg_sock_data_t *data = xbt_new0(gras_trp_sg_sock_data_t, 1); - data->rdv = SIMIX_rdv_create(sock_name); - data->from_process = SIMIX_process_self(); - SIMIX_rdv_set_data(data->rdv,sock); + xbt_ex_t e; - sock->is_master = 1; - sock->incoming = 1; + xbt_assert0(hd, "Please run gras_process_init on each process"); - sock->data = data; + sock->accepting = 0; /* no such nuisance in SG */ + found = 0; + TRY { + find_port(hd, sock->port, &pr); + found = 1; + } CATCH(e) { + if (e.category == mismatch_error) + xbt_ex_free(e); + else + RETHROW; + } + + if (found) + THROW2(mismatch_error, 0, + "can't listen on address %s:%d: port already in use.", + host, sock->port); + + pr.port = sock->port; + pr.meas = sock->meas; + pr.socket = sock; + pr.process = SIMIX_process_self(); + xbt_dynar_push(hd->ports, &pr); - /* Register the socket to the set of sockets known simulation-wide */ - xbt_dict_set(all_sockets,sock_name,sock,NULL); /* FIXME: add a function to raise a warning at simulation end for non-closed sockets */ + /* Create the socket */ + data = xbt_new(gras_trp_sg_sock_data_t, 1); + data->from_process = SIMIX_process_self(); + data->to_process = NULL; + data->to_host = SIMIX_host_self(); - /* Create a smx comm object about this socket */ - data->ongoing_msg_size = sizeof(s_gras_msg_t); - smx_comm_t comm = SIMIX_network_irecv(data->rdv,&(data->ongoing_msg),&(data->ongoing_msg_size)); - INFO2("irecv comm %p onto %p",comm,&(data->ongoing_msg)); - xbt_dynar_push(pd->comms,&comm); + data->cond = SIMIX_cond_init(); + data->mutex = SIMIX_mutex_init(); + + sock->data = data; VERB6("'%s' (%d) ears on %s:%d%s (%p)", SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(), - gras_os_myname(), sock->port, sock->meas ? " (mode meas)" : "", sock); - free(sock_name); + host, sock->port, sock->meas ? " (mode meas)" : "", sock); + } -void gras_trp_sg_socket_close(gras_socket_t sock) { - gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id); - if (sock->is_master) { - /* server mode socket. Unregister it from 'OS' tables */ - char *sock_name=bprintf("%s:%d",gras_os_myname(),sock->port); +void gras_trp_sg_socket_close(gras_socket_t sock) +{ + gras_hostdata_t *hd = + (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self()); + unsigned int cpt; + gras_sg_portrec_t pr; - xbt_dict_t sockets = ((gras_trp_sg_plug_data_t)sock->plugin->data)->sockets; - gras_socket_t old = xbt_dict_get_or_null(sockets,sock_name); - if (!old) - WARN2("socket_close called on the unknown server socket %p (port=%d)", - sock, sock->port); - xbt_dict_remove(sockets,sock_name); - free(sock_name); + XBT_IN1(" (sock=%p)", sock); - } + if (!sock) + return; + + xbt_assert0(hd, "Please run gras_process_init on each process"); if (sock->data) { - SIMIX_rdv_destroy(((gras_trp_sg_sock_data_t *) sock->data)->rdv); + SIMIX_cond_destroy(((gras_trp_sg_sock_data_t *) sock->data)->cond); + SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t *) sock->data)->mutex); free(sock->data); } - xbt_dynar_push(pd->sockets_to_close,&sock); - gras_msg_listener_awake(); - + if (sock->incoming && !sock->outgoing && sock->port >= 0) { + /* server mode socket. Unregister it from 'OS' tables */ + xbt_dynar_foreach(hd->ports, cpt, pr) { + DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports)); + if (pr.port == sock->port) { + xbt_dynar_cursor_rm(hd->ports, &cpt); + XBT_OUT; + return; + } + } + WARN2("socket_close called on the unknown incoming socket %p (port=%d)", + sock, sock->port); + } XBT_OUT; } @@ -213,7 +261,10 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock, char name[256]; static unsigned int count = 0; + smx_action_t act; /* simix action */ gras_trp_sg_sock_data_t *sock_data; + gras_trp_procdata_t trp_remote_proc; + gras_msg_procdata_t msg_remote_proc; gras_msg_t msg; /* message to send */ sock_data = (gras_trp_sg_sock_data_t *) sock->data; @@ -221,6 +272,7 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock, xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); + SIMIX_mutex_lock(sock_data->mutex); sprintf(name, "Chunk[%d]", count++); /*initialize gras message */ msg = xbt_new(s_gras_msg_t, 1); @@ -233,22 +285,73 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock, } else { msg->payl = NULL; } - SIMIX_network_send(sock_data->rdv,size,-1.,-1.,&msg,sizeof(msg),(smx_comm_t*)&(msg->comm),msg); + + + /* put his socket on the selectable socket queue */ + trp_remote_proc = (gras_trp_procdata_t) + gras_libdata_by_name_from_remote("gras_trp", sock_data->to_process); + xbt_queue_push(trp_remote_proc->meas_selectable_sockets, &sock); + + /* put message on msg_queue */ + msg_remote_proc = (gras_msg_procdata_t) + gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process); + + xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas, msg); + + /* wait for the receiver */ + SIMIX_cond_wait(sock_data->cond, sock_data->mutex); + + /* creates simix action and waits its ends, waits in the sender host + condition */ + DEBUG5("send chunk %s from %s to %s:%d (size=%ld)", + name, SIMIX_host_get_name(SIMIX_host_self()), + SIMIX_host_get_name(sock_data->to_host), sock->peer_port, size); + + act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host, + name, size, -1); + SIMIX_register_action_to_condition(act, sock_data->cond); + SIMIX_cond_wait(sock_data->cond, sock_data->mutex); + SIMIX_unregister_action_to_condition(act, sock_data->cond); + /* error treatmeant (FIXME) */ + + /* cleanup structures */ + SIMIX_action_destroy(act); + + SIMIX_mutex_unlock(sock_data->mutex); } int gras_trp_sg_chunk_recv(gras_socket_t sock, char *data, unsigned long int size) { gras_trp_sg_sock_data_t *sock_data; + gras_trp_sg_sock_data_t *remote_sock_data; + gras_socket_t remote_socket = NULL; gras_msg_t msg_got; - smx_comm_t comm; + gras_msg_procdata_t msg_procdata = + (gras_msg_procdata_t) gras_libdata_by_name("gras_msg"); + gras_trp_procdata_t trp_proc = + (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id); xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); + xbt_queue_shift_timed(trp_proc->meas_selectable_sockets, + &remote_socket, 60); + + if (remote_socket == NULL) { + THROW0(timeout_error, 0, "Timeout"); + } + + remote_sock_data = (gras_trp_sg_sock_data_t *) remote_socket->data; + msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas); sock_data = (gras_trp_sg_sock_data_t *) sock->data; - SIMIX_network_recv(sock_data->rdv,-1.,&msg_got,NULL,&comm); + /* ok, I'm here, you can continue the communication */ + SIMIX_cond_signal(remote_sock_data->cond); + + SIMIX_mutex_lock(remote_sock_data->mutex); + /* wait for communication end */ + SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex); if (msg_got->payl_size != size) THROW5(mismatch_error, 0, @@ -264,5 +367,6 @@ int gras_trp_sg_chunk_recv(gras_socket_t sock, xbt_free(msg_got->payl); xbt_free(msg_got); + SIMIX_mutex_unlock(remote_sock_data->mutex); return 0; } diff --git a/src/gras/Transport/transport_plugin_tcp.c b/src/gras/Transport/transport_plugin_tcp.c index ddbf0f8732..de76134c83 100644 --- a/src/gras/Transport/transport_plugin_tcp.c +++ b/src/gras/Transport/transport_plugin_tcp.c @@ -225,7 +225,7 @@ static gras_socket_t gras_trp_sock_socket_accept(gras_socket_t sock) res->plugin = sock->plugin; res->incoming = sock->incoming; res->outgoing = sock->outgoing; - res->is_master = 0; + res->accepting = 0; res->sd = sd; res->port = -1; @@ -266,10 +266,8 @@ static void gras_trp_sock_socket_close(gras_socket_t sock) VERB1("close tcp connection %d", sock->sd); - if (tcp_close(sock->sd) < 0) { - WARN3("error while closing tcp socket %d: %d (%s)\n", - sock->sd, sock_errno, sock_errstr(sock_errno)); - } + /* ask the listener to close the socket */ + gras_msg_listener_close_socket(sock->sd); } /************************************/ diff --git a/src/gras/Transport/transport_private.h b/src/gras/Transport/transport_private.h index b242ccbb90..2588813a7e 100644 --- a/src/gras/Transport/transport_private.h +++ b/src/gras/Transport/transport_private.h @@ -56,7 +56,7 @@ typedef struct s_gras_socket { int incoming:1; /* true if we can read from this sock */ int outgoing:1; /* true if we can write on this sock */ - int is_master:1; /* true if master incoming sock */ + int accepting:1; /* true if master incoming sock in tcp */ int meas:1; /* true if this is an experiment socket instead of messaging */ int valid:1; /* false if a select returned that the peer quitted, forcing us to "close" the socket */ int moredata:1; /* TCP socket use a buffer and read operation get as much diff --git a/src/gras/Virtu/process.c b/src/gras/Virtu/process.c index f7fb45f388..fea5400ad2 100644 --- a/src/gras/Virtu/process.c +++ b/src/gras/Virtu/process.c @@ -103,10 +103,10 @@ void *gras_libdata_by_name_from_procdata(const char *name, } return res; } -void *gras_libdata_by_id(int id) { - return gras_libdata_by_id_from_procdata(id,gras_procdata_get()); -} -void *gras_libdata_by_id_from_procdata(int id,gras_procdata_t *pd) { + +void *gras_libdata_by_id(int id) +{ + gras_procdata_t *pd = gras_procdata_get(); if (xbt_set_length(pd->libdata) < xbt_dynar_length(_gras_procdata_fabrics)) { /* Damn, some new modules were added since procdata_init(). Amok? */ /* Get 'em all */ diff --git a/src/gras/Virtu/sg_process.c b/src/gras/Virtu/sg_process.c index 569fd3ab1a..bc39cf2c7a 100644 --- a/src/gras/Virtu/sg_process.c +++ b/src/gras/Virtu/sg_process.c @@ -48,6 +48,7 @@ void gras_process_init() /* First process on this host */ hd = xbt_new(gras_hostdata_t, 1); hd->refcount = 1; + hd->ports = xbt_dynar_new(sizeof(gras_sg_portrec_t), NULL); SIMIX_host_set_data(SIMIX_host_self(), (void *) hd); } else { hd->refcount++; @@ -61,7 +62,9 @@ void gras_process_init() } else pd->ppid = -1; - trp_pd->sockets_to_close = xbt_dynar_new(sizeof(gras_socket_t),NULL); + trp_pd->msg_selectable_sockets = xbt_queue_new(0, sizeof(gras_socket_t)); + + trp_pd->meas_selectable_sockets = xbt_queue_new(0, sizeof(gras_socket_t)); VERB2("Creating process '%s' (%d)", SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid()); @@ -83,7 +86,9 @@ void gras_process_exit() gras_trp_procdata_t trp_pd = (gras_trp_procdata_t) gras_libdata_by_name("gras_trp"); - xbt_dynar_free(&trp_pd->sockets_to_close); + xbt_queue_free(&trp_pd->msg_selectable_sockets); + + xbt_queue_free(&trp_pd->meas_selectable_sockets); xbt_assert0(hd, "Run gras_process_init (ie, gras_init)!!"); @@ -110,6 +115,7 @@ void gras_process_exit() gras_socket_close(sock_iter); } if (!--(hd->refcount)) { + xbt_dynar_free(&hd->ports); free(hd); } gras_procdata_exit(); diff --git a/src/gras/Virtu/virtu_private.h b/src/gras/Virtu/virtu_private.h index 39a31d6d14..916223738e 100644 --- a/src/gras/Virtu/virtu_private.h +++ b/src/gras/Virtu/virtu_private.h @@ -44,7 +44,5 @@ typedef struct { gras_procdata_t *gras_procdata_get(void); void *gras_libdata_by_name_from_procdata(const char *name, gras_procdata_t * pd); -void *gras_libdata_by_id_from_procdata(int id,gras_procdata_t *pd); - #endif /* GRAS_VIRTU_PRIVATE_H */ diff --git a/src/gras/Virtu/virtu_sg.h b/src/gras/Virtu/virtu_sg.h index dab2df5c47..51090f7b51 100644 --- a/src/gras/Virtu/virtu_sg.h +++ b/src/gras/Virtu/virtu_sg.h @@ -26,21 +26,20 @@ typedef struct { typedef struct { int refcount; - /* Nothing in particular (anymore) */ + xbt_dynar_t ports; + } gras_hostdata_t; /* data for each socket (FIXME: find a better location for that)*/ typedef struct { - smx_rdv_t rdv; + smx_process_t from_process; + smx_process_t to_process; - smx_process_t from_process; /* the one who created the socket */ smx_host_t to_host; /* Who's on other side */ - smx_comm_t comm; /* Ongoing communication */ - - s_gras_msg_t ongoing_msg; - size_t ongoing_msg_size; - gras_socket_t to_socket; /* If != NULL, this socket was created as accept when receiving onto to_socket */ + smx_cond_t cond; + smx_mutex_t mutex; + gras_socket_t to_socket; } gras_trp_sg_sock_data_t; diff --git a/src/include/simix/simix.h b/src/include/simix/simix.h index d746e71d7b..82785d0b17 100644 --- a/src/include/simix/simix.h +++ b/src/include/simix/simix.h @@ -207,7 +207,7 @@ XBT_PUBLIC(void*) SIMIX_rdv_get_data(smx_rdv_t rdv); /*****Communication Requests*****/ XBT_PUBLIC(void) SIMIX_communication_cancel(smx_comm_t comm); XBT_PUBLIC(double) SIMIX_communication_get_remains(smx_comm_t comm); -XBT_PUBLIC(void *) SIMIX_communication_get_sentdata(smx_comm_t comm); +XBT_PUBLIC(void *) SIMIX_communication_get_data(smx_comm_t comm); /*****Networking*****/ XBT_PUBLIC(void) SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate, diff --git a/src/msg/msg_mailbox.c b/src/msg/msg_mailbox.c index f6f3230e25..3425a413dc 100644 --- a/src/msg/msg_mailbox.c +++ b/src/msg/msg_mailbox.c @@ -64,7 +64,7 @@ m_task_t MSG_mailbox_get_head(msg_mailbox_t mailbox) if(!comm) return NULL; - return (m_task_t)SIMIX_communication_get_sentdata(comm); + return (m_task_t)SIMIX_communication_get_data(comm); } int diff --git a/src/simix/smx_global.c b/src/simix/smx_global.c index 9df8f76170..26e0c5e11d 100644 --- a/src/simix/smx_global.c +++ b/src/simix/smx_global.c @@ -19,7 +19,6 @@ XBT_LOG_EXTERNAL_CATEGORY(simix_host); XBT_LOG_EXTERNAL_CATEGORY(simix_process); XBT_LOG_EXTERNAL_CATEGORY(simix_synchro); XBT_LOG_EXTERNAL_CATEGORY(simix_context); -XBT_LOG_EXTERNAL_CATEGORY(simix_network); XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_kernel, simix, "Logging specific to SIMIX (kernel)"); @@ -58,7 +57,6 @@ void SIMIX_global_init(int *argc, char **argv) XBT_LOG_CONNECT(simix_process, simix); XBT_LOG_CONNECT(simix_synchro, simix); XBT_LOG_CONNECT(simix_context, simix); - XBT_LOG_CONNECT(simix_network, simix); simix_global = xbt_new0(s_SIMIX_Global_t, 1); @@ -419,10 +417,8 @@ double SIMIX_solve(xbt_fifo_t actions_done, xbt_fifo_t actions_failed) if (smx_action) { /* Copy the transfered data of the completed communication actions */ /* FIXME: find a better way to determine if its a comm action */ - if(smx_action->data != NULL) { - CDEBUG1(simix_network,"Communication %p finished",smx_action->data); + if(smx_action->data != NULL) SIMIX_network_copy_data((smx_comm_t)smx_action->data); - } SIMIX_action_signal_all(smx_action); } } diff --git a/src/simix/smx_network.c b/src/simix/smx_network.c index 4c32439033..7008ec30fe 100644 --- a/src/simix/smx_network.c +++ b/src/simix/smx_network.c @@ -87,7 +87,7 @@ smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rdv, smx_comm_type_t type) } /* no relevant request found. Return NULL */ - DEBUG0("Communication request not found. I assume that other side will arrive later on."); + DEBUG0("Communication request not found"); return NULL; } @@ -300,32 +300,36 @@ double SIMIX_communication_get_remains(smx_comm_t comm) */ void SIMIX_network_copy_data(smx_comm_t comm) { + /* If there is no data to be copy then return */ + if(!comm->src_buff || !comm->dst_buff) + return; + size_t src_buff_size = comm->src_buff_size; size_t dst_buff_size = *comm->dst_buff_size; + + /* Copy at most dst_buff_size bytes of the message to receiver's buffer */ + dst_buff_size = MIN(dst_buff_size, src_buff_size); + + /* Update the receiver's buffer size to the copied amount */ + if (comm->dst_buff_size) + *comm->dst_buff_size = dst_buff_size; - xbt_assert(src_buff_size == dst_buff_size); - - /* If there is no data to copy then return */ - if(!comm->src_buff || !comm->dst_buff || dst_buff_size == 0) + if(dst_buff_size == 0) return; memcpy(comm->dst_buff, comm->src_buff, dst_buff_size); - DEBUG6("Copying comm %p data from %s -> %s (%zu bytes %p->%p)", + DEBUG4("Copying comm %p data from %s -> %s (%zu bytes)", comm, comm->src_proc->smx_host->name, comm->dst_proc->smx_host->name, - dst_buff_size,comm->src_buff,comm->dst_buff); + dst_buff_size); } /** * \brief Return the user data associated to the communication - * - * In MSG and GRAS, that data is the exchanged task/msg itself, since - * (i) In MSG, the receiver still wants to read the task although the communication didn't complete. - * (ii) In GRAS, we need to retrieve that gras_msg_t during the select * \param comm The communication * \return the user data */ -void *SIMIX_communication_get_sentdata(smx_comm_t comm) +void *SIMIX_communication_get_data(smx_comm_t comm) { return comm->data; } @@ -423,7 +427,6 @@ smx_comm_t SIMIX_network_irecv(smx_rdv_t rdv, void *dst_buff, size_t *dst_buff_s comm->dst_buff = dst_buff; comm->dst_buff_size = dst_buff_size; - DEBUG2("Receive data for %p into %p",comm,comm->dst_buff); SIMIX_communication_start(comm); return comm; } -- 2.20.1