From 7b643b5afd98b28e54b82339284af43b1ab7f5ca Mon Sep 17 00:00:00 2001 From: mquinson Date: Thu, 24 Jun 2010 09:22:30 +0000 Subject: [PATCH] Port GRAS to smx_network infrastructure. Kinda working, but not tested enough yet git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@7930 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/gras/Msg/gras_msg_listener.c | 39 ++--- src/gras/Msg/sg_msg.c | 187 ++++++++--------------- src/gras/Transport/README | 80 +++++++++- src/gras/Transport/transport_plugin_sg.c | 135 ++++++++-------- src/gras/Virtu/virtu_sg.h | 20 +-- 5 files changed, 233 insertions(+), 228 deletions(-) diff --git a/src/gras/Msg/gras_msg_listener.c b/src/gras/Msg/gras_msg_listener.c index 9805950ab0..9649e53a6d 100644 --- a/src/gras/Msg/gras_msg_listener.c +++ b/src/gras/Msg/gras_msg_listener.c @@ -33,11 +33,19 @@ static void listener_function(void *p) gras_msgtype_t msg_wakeup_listener_t = gras_msgtype_by_name("_wakeup_listener"); DEBUG0("I'm the listener"); + + /* get a free socket for the receiving part of the listener */ + me->wakeup_sock_listener_side = gras_socket_server_range(5000, 6000, -1, 0); + + + /* Main loop */ while (1) { msg = gras_msg_recv_any(); - if (msg->type != msg_wakeup_listener_t) + if (msg->type != msg_wakeup_listener_t) { + VERB1("Got a '%s' message. Queue it for handling by main thread", + gras_msgtype_get_name(msg->type)); xbt_queue_push(me->incomming_messages, msg); - else { + } else { char got = *(char *) msg->payl; if (got == '1') { VERB0("Asked to get awake"); @@ -58,7 +66,7 @@ static void listener_function(void *p) xbt_queue_shift_timed(me->socks_to_close, &sock, 0); if (tcp_close(sock) < 0) { #ifdef _XBT_WIN32 - WARN2("error while closing tcp socket %d: %d (%s)\n", sock, sock_errno); + WARN2("error while closing tcp socket %d: %d\n", sock, sock_errno); #else WARN3("error while closing tcp socket %d: %d (%s)\n", sock, sock_errno, sock_errstr(sock_errno)); @@ -74,33 +82,26 @@ static void listener_function(void *p) } } -gras_msg_listener_t gras_msg_listener_launch(xbt_queue_t msg_exchange) +gras_msg_listener_t gras_msg_listener_launch(xbt_queue_t msg_received) { gras_msg_listener_t arg = xbt_new0(s_gras_msg_listener_t, 1); - int my_port; - DEBUG0("Launch listener"); - arg->incomming_messages = msg_exchange; + VERB0("Launch listener"); + arg->incomming_messages = msg_received; arg->socks_to_close = xbt_queue_new(0, sizeof(int)); - /* get a free socket for the receiving part of the listener, taking care that it does not get saved as "myport" number */ - my_port = - ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport; - arg->wakeup_sock_listener_side = - gras_socket_server_range(5000, 6000, -1, 0); - ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport = - my_port; - /* Connect the other part of the socket */ - arg->wakeup_sock_master_side = - gras_socket_client(gras_os_myname(), - gras_socket_my_port(arg->wakeup_sock_listener_side)); /* declare the message used to awake the listener from the master */ gras_msgtype_declare("_wakeup_listener", gras_datadesc_by_name("char")); /* actually start the thread */ arg->listener = xbt_thread_create("listener", listener_function, arg,1/*joinable*/); - gras_os_sleep(0); /* TODO: useless? give the listener a chance to initialize even if the main is empty and we cancel it right afterward */ + gras_os_sleep(0); /* give the listener a chance to initialize before we connect to its socket */ + + /* Connect the other part of the socket */ + arg->wakeup_sock_master_side = + gras_socket_client(gras_os_myname(), + gras_socket_my_port(arg->wakeup_sock_listener_side)); return arg; } diff --git a/src/gras/Msg/sg_msg.c b/src/gras/Msg/sg_msg.c index c0115aefd5..69dd85f5cb 100644 --- a/src/gras/Msg/sg_msg.c +++ b/src/gras/Msg/sg_msg.c @@ -30,13 +30,54 @@ gras_msg_t gras_msg_recv_any(void) { int got = 0; smx_comm_t comm = NULL; gras_socket_t sock = NULL; - gras_trp_sg_sock_data_t *sock_data; + gras_trp_sg_sock_data_t sock_data; xbt_dynar_foreach(trp_proc->sockets,cursor,sock) { - sock_data = (gras_trp_sg_sock_data_t *) sock->data; - if (sock_data->comm_recv) { - INFO2("Copy %p of size %lu",sock_data->comm_recv,(unsigned long int)sizeof(smx_comm_t)); - xbt_dynar_push(comms,&(sock_data->comm_recv)); - } + sock_data = (gras_trp_sg_sock_data_t) sock->data; + + + DEBUG5("Consider socket %p (data:%p; Here rdv: %p; Remote rdv: %p; Comm %p) to get a message", + sock,sock_data, + (sock_data->server==SIMIX_process_self())?sock_data->rdv_server:sock_data->rdv_client, + (sock_data->server==SIMIX_process_self())?sock_data->rdv_client:sock_data->rdv_server, + sock_data->comm_recv); + + + /* The following assert fails in some valid conditions, we need to + * change the code downward looking for the socket again. + * + * For now it relies on the facts (A) that sockets and comms are aligned + * (B) every sockets has a posted irecv in comms + * + * This is not trivial because we need that alignment to hold after the waitany(), so + * after other processes get scheduled. + * + * I cannot think of conditions where they get desynchronized (A violated) as long as + * 1) only the listener calls that function + * 2) Nobody but the listener removes sockets from that set (in main listener loop) + * 3) New sockets are added at the end, and signified ASAP to the listener (by awaking him) + * The throw bellow ensures that B is never violated without failing out loudly. + * + * We cannot search by comparing the comm object pointer that object got + * freed by the waiting process (down in smx_network, in + * comm_wait_for_completion or comm_cleanup). So, actually, we could + * use that pointer since that's a dangling pointer, but no one changes it. + * I still feel unconfortable with using dangling pointers, even if that would + * let the code work even if A and/or B are violated, provided that + * (C) the new irecv is never posted before we return from waitany to that function. + * + * Another approach, robust to B violation would be to retraverse the socks dynar with + * an iterator, incremented only when the socket has a comm. And we've the right socket + * when that iterator is equal to "got", the result of waitany. Not needed if B holds. + */ + xbt_assert1(sock_data->comm_recv, + "Comm_recv of socket %p is empty; please report that nasty bug",sock); + /* End of paranoia */ + + VERB3("Copy comm_recv %p rdv:%p (other rdv:%p)", + sock_data->comm_recv, + (sock_data->server==SIMIX_process_self())?sock_data->rdv_server:sock_data->rdv_client, + (sock_data->server==SIMIX_process_self())?sock_data->rdv_client:sock_data->rdv_server); + xbt_dynar_push(comms,&(sock_data->comm_recv)); } VERB1("Wait on %ld 'sockets'",xbt_dynar_length(comms)); /* Wait for the end of any of these communications */ @@ -48,17 +89,22 @@ gras_msg_t gras_msg_recv_any(void) { VERB1("Got something. Communication %p's over",comm); /* Reinstall a waiting communication on that rdv */ - /* Get the sock again */ - xbt_dynar_foreach(trp_proc->sockets,cursor,sock) { - sock_data = (gras_trp_sg_sock_data_t *) sock->data; + /* Get the sock again + * For that, we use the fact that */ + sock=xbt_dynar_get_as(trp_proc->sockets,got,gras_socket_t); +/* xbt_dynar_foreach(trp_proc->sockets,cursor,sock) { + sock_data = (gras_trp_sg_sock_data_t) sock->data; if (sock_data->comm_recv && sock_data->comm_recv == comm) break; } - sock_data = (gras_trp_sg_sock_data_t *) sock->data; + */ + sock_data = (gras_trp_sg_sock_data_t) sock->data; sock_data->comm_recv = SIMIX_network_irecv( - sock_data->im_server?sock_data->rdv_server:sock_data->rdv_client, + sock_data->rdv_server!=NULL? + //(sock_data->server==SIMIX_process_self())? + sock_data->rdv_server + :sock_data->rdv_client, NULL,0); - SIMIX_communication_destroy(comm); return msg; } @@ -73,13 +119,20 @@ void gras_msg_send_ext(gras_socket_t sock, This is used to report the load onto the simulator. It also counts the size of pointed stuff */ gras_msg_t msg; /* message to send */ smx_comm_t comm; - gras_trp_sg_sock_data_t *sock_data = NULL; + gras_trp_sg_sock_data_t sock_data = (gras_trp_sg_sock_data_t) sock->data; + + smx_rdv_t target_rdv = (sock_data->server==SIMIX_process_self())?sock_data->rdv_client + :sock_data->rdv_server; + /*initialize gras message */ msg = xbt_new(s_gras_msg_t, 1); msg->expe = sock; msg->kind = kind; msg->type = msgtype; msg->ID = ID; + + VERB2("Send msg %s to rdv %p", msgtype->name,target_rdv); + if (kind == e_gras_msg_kind_rpcerror) { /* error on remote host, careful, payload is an exception */ msg->payl_size = gras_datadesc_size(gras_datadesc_by_name("ex_t")); @@ -103,116 +156,10 @@ void gras_msg_send_ext(gras_socket_t sock, whole_payload_size = gras_datadesc_memcpy(msgtype->ctn_type, payload, msg->payl); } - sock_data = (gras_trp_sg_sock_data_t *) sock->data; - - SIMIX_network_send(sock_data->im_server ? sock_data->rdv_client : sock_data->rdv_client, - whole_payload_size,-1,-1,&msg,sizeof(void*),&comm,msg); - -#ifdef KILLME - smx_action_t act; /* simix action */ - gras_hostdata_t *hd; - gras_trp_procdata_t trp_remote_proc; - gras_msg_procdata_t msg_remote_proc; - - sock_data = (gras_trp_sg_sock_data_t *) sock->data; - - hd = (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self()); - - xbt_assert1(!gras_socket_is_meas(sock), - "Asked to send a message on the measurement socket %p", sock); - - /* put the selectable socket on the queue */ - trp_remote_proc = (gras_trp_procdata_t) - gras_libdata_by_name_from_remote("gras_trp", sock_data->to_process); + SIMIX_network_send(target_rdv, whole_payload_size,-1,-1,&msg,sizeof(void*),&comm,msg); - xbt_queue_push(trp_remote_proc->msg_selectable_sockets, &sock); - - /* put message on msg_queue */ - msg_remote_proc = (gras_msg_procdata_t) - gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process); - xbt_fifo_push(msg_remote_proc->msg_to_receive_queue, msg); - - /* wait for the receiver */ - SIMIX_cond_wait(sock_data->cond, sock_data->mutex); - - /* creates simix action and waits its ends, waits in the sender host - condition */ - act = SIMIX_action_communicate(SIMIX_host_self(), - sock_data->to_host, msgtype->name, - (double) whole_payload_size, -1); - SIMIX_register_action_to_condition(act, sock_data->cond); - - VERB5("Sending to %s(%s) a message type '%s' kind '%s' ID %lu", - SIMIX_host_get_name(sock_data->to_host), - SIMIX_process_get_name(sock_data->to_process), - msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID); - - SIMIX_cond_wait(sock_data->cond, sock_data->mutex); - SIMIX_unregister_action_to_condition(act, sock_data->cond); - /* error treatmeant (FIXME) */ - - /* cleanup structures */ - SIMIX_action_destroy(act); - SIMIX_mutex_unlock(sock_data->mutex); -#endif VERB0("Message sent"); } -#ifdef KILLMETOO -/* - * receive the next message on the given socket. - */ -void gras_msg_recv(gras_socket_t sock, gras_msg_t msg) -{ - - gras_trp_sg_sock_data_t *sock_data = - (gras_trp_sg_sock_data_t *) sock->data; - gras_msg_t msg_got; - size_t size_got = sizeof(void*); - - xbt_assert1(!gras_socket_is_meas(sock), - "Asked to receive a message on the measurement socket %p", - sock); - - SIMIX_network_recv(sock_data->rdv,-1,&msg_got,&size_got,NULL); -#ifdef KILLME - gras_trp_sg_sock_data_t *remote_sock_data; - gras_hostdata_t *remote_hd; - gras_msg_procdata_t msg_procdata = - (gras_msg_procdata_t) gras_libdata_by_name("gras_msg"); - - xbt_assert0(msg, "msg is an out parameter of gras_msg_recv..."); - - sock_data = (gras_trp_sg_sock_data_t *) sock->data; - remote_sock_data = - ((gras_trp_sg_sock_data_t *) sock->data)->to_socket->data; - DEBUG3("Remote host %s, Remote Port: %d Local port %d", - SIMIX_host_get_name(sock_data->to_host), sock->peer_port, - sock->port); - remote_hd = (gras_hostdata_t *) SIMIX_host_get_data(sock_data->to_host); - - if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0) { - THROW_IMPOSSIBLE; - } - DEBUG1("Size msg_to_receive buffer: %d", - xbt_fifo_size(msg_procdata->msg_to_receive_queue)); - msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue); - - SIMIX_mutex_lock(remote_sock_data->mutex); - /* ok, I'm here, you can continuate the communication */ - SIMIX_cond_signal(remote_sock_data->cond); - - /* wait for communication end */ - SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex); - - msg_got->expe = msg->expe; - memcpy(msg, msg_got, sizeof(s_gras_msg_t)); - xbt_free(msg_got); - SIMIX_mutex_unlock(remote_sock_data->mutex); -#endif - VERB3("Received a message type '%s' kind '%s' ID %lu", // from %s", - msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID); -} -#endif diff --git a/src/gras/Transport/README b/src/gras/Transport/README index 93322d119b..e66f9b315b 100644 --- a/src/gras/Transport/README +++ b/src/gras/Transport/README @@ -3,6 +3,80 @@ There is a plugin mechanism in here, and the selection is automatic If you open a socket in SG realm, you'll get a SG socket. If you open a socket in RL realm, you'll get a TCP socket. -To get a file socket, you'll have to build it manually. -To get a pipe socket, use gras_fork, or whatever name I'll give it when I - implement this. +To get a file socket, you'll have to build it manually using +gras_socket_client_from_file() to read from it, or +gras_socket_server_from_file() to write to it. + +There is no way (yet?) to build a pipe socket, so threads have to +discuss using network sockets, or xbt_queue_t structures. + +IMPLEMENTATION NOTES ABOUT THE SG SIDE + +This area is quite messy. The thing is that I strived too much to keep +the existing interface, which is lousely inspirated from BSD sockets. + +Vocabulary: + Server is the one who created the master socket. + Client is the one connecting to that from a remote host. + Their roles keep the same for the whole connexion duration. + Sender and Receiver denote the roles during one message exchange. + If the server answers to the client, it becomes the sender while the + client becomes the receiver. + All this seems trivial, but confusion is easy and dangerous. + + +The connexion story goes that way. When we create a master socket, we +look whether the given port is free on that host or not. For that, we +traverse the gras_hostdata_t->ports dynar, which contains +gras_sg_portrec_t records. If it's free, we create a socket with a +gras_trp_sg_sock_data_t structure. Here is that struct: + +typedef struct { + smx_process_t server; + smx_process_t client; + + smx_rdv_t rdv_server; /* The rendez-vous point to use */ + smx_rdv_t rdv_client; /* The rendez-vous point to use */ + smx_comm_t comm_recv; /* The comm of irecv on receiver side */ +} s_gras_trp_sg_sock_data_t,*gras_trp_sg_sock_data_t; + +In GRAS, there is a listener process, in charge of pumping everything +comming from the network and post that to the main user thread. That +to overlap (incomming) communications and computations. +In SG, this is done by ensuring that a receive is posted on every +opened socket, and having the listener doing a smx_sem_waitany() to +find the first ending one (in RL, a select+ddt_recv does the same). + +Another extra complexity is due to the fact that when the user +receives a message, it gets a socket being a mean to contact the +sender of that message. In RL, that's easy since sockets are +full-duplex. In SG, I have to either create a new socket for each +message (slow and leak-prone), or maintain a set of opened sockets on +receiver side and check if the one I need is there or create it. The +approach used currently is to give to the receiver a pointer to the +structure created on the sender side directly. + +At the end of the day, everything is as if there were master socket +and working sockets, just like in BSD. There is no explicit accept. +Master sockets get created by gras_socket_server() and friends. You +can recognize them by the fact that the rdv_client field is always +NULL. Such sockets are not really used to exchange data, but more to +establish connexions. For actual exchanges, you need a working socket +created by gras_socket_client() and friends. So, they are created on +client side, but the master side will see it as message expeditor when +getting a message. + +When sending, you can see if the current process is the server by +checking if data_sock->server == SMX_process_self(). If wrong, that +means that we are the client process today. + +When receiving this won't work because SMX_process_self() is the +listener associated to the user thread. So, when receiving, you can +see if you are on the server side or by checking if rdv_client==NULL. +If not that means that we are on the client side today. + +That's messy, and should probably be reworked a bit, but I feel like +the main issue is the interface used. It's too close and too differnt +from BSD at the same time. One day, I hope to find the time to redo +everything with an interface similar to the one of the 0MQ project for +example. \ No newline at end of file diff --git a/src/gras/Transport/transport_plugin_sg.c b/src/gras/Transport/transport_plugin_sg.c index 7b429ac896..55bc0d240b 100644 --- a/src/gras/Transport/transport_plugin_sg.c +++ b/src/gras/Transport/transport_plugin_sg.c @@ -24,9 +24,7 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_sg, gras_trp, ***/ /* retrieve the port record associated to a numerical port on an host */ -static void find_port(gras_hostdata_t * hd, int port, - gras_sg_portrec_t * hpd); - +static gras_sg_portrec_t find_port(gras_hostdata_t * hd, int port); void gras_trp_sg_socket_client(gras_trp_plugin_t self, /* OUT */ gras_socket_t sock); @@ -54,7 +52,7 @@ typedef struct { /*** *** Code ***/ -static void find_port(gras_hostdata_t * hd, int port, gras_sg_portrec_t * hpd) +static gras_sg_portrec_t find_port(gras_hostdata_t * hd, int port) { unsigned int cpt; gras_sg_portrec_t pr; @@ -62,12 +60,10 @@ static void find_port(gras_hostdata_t * hd, int port, gras_sg_portrec_t * hpd) xbt_assert0(hd, "Please run gras_process_init on each process"); xbt_dynar_foreach(hd->ports, cpt, pr) { - if (pr.port == port) { - memcpy(hpd, &pr, sizeof(gras_sg_portrec_t)); - return; - } + if (pr->port == port) + return pr; } - THROW1(mismatch_error, 0, "Unable to find any portrec for port #%d", port); + return NULL; } @@ -90,13 +86,11 @@ void gras_trp_sg_setup(gras_trp_plugin_t plug) } void gras_trp_sg_socket_client(gras_trp_plugin_t self, - /* OUT */ gras_socket_t sock) -{ - xbt_ex_t e; + /* OUT */ gras_socket_t sock) { smx_host_t peer; gras_hostdata_t *hd; - gras_trp_sg_sock_data_t *data; + gras_trp_sg_sock_data_t data; gras_sg_portrec_t pr; /* make sure this socket will reach someone */ @@ -108,41 +102,38 @@ void gras_trp_sg_socket_client(gras_trp_plugin_t self, THROW1(mismatch_error, 0, "can't connect to %s: no process on this host", sock->peer_name); - TRY { - find_port(hd, sock->peer_port, &pr); - } - CATCH(e) { - if (e.category == mismatch_error) { - xbt_ex_free(e); - THROW2(mismatch_error, 0, - "can't connect to %s:%d, no process listen on this port", - sock->peer_name, sock->peer_port); - } - RETHROW; + pr = find_port(hd, sock->peer_port); + + if (pr == NULL) { + THROW2(mismatch_error, 0, + "can't connect to %s:%d, no process listen on this port", + sock->peer_name, sock->peer_port); } - if (pr.meas && !sock->meas) { + /* Ensure that the listener is expecting the kind of stuff we want to send */ + if (pr->meas && !sock->meas) { THROW2(mismatch_error, 0, "can't connect to %s:%d in regular mode, the process listen " "in measurement mode on this port", sock->peer_name, sock->peer_port); } - if (!pr.meas && sock->meas) { + if (!pr->meas && sock->meas) { THROW2(mismatch_error, 0, "can't connect to %s:%d in measurement mode, the process listen " "in regular mode on this port", sock->peer_name, sock->peer_port); } - /* create the socket */ - data = xbt_new(gras_trp_sg_sock_data_t, 1); - data->from_process = SIMIX_process_self(); - data->to_process = pr.process; - data->to_host = peer; - - /* initialize mutex and condition of the socket */ - data->rdv_server = pr.rdv; + + /* create simulation data of the socket */ + data = xbt_new(s_gras_trp_sg_sock_data_t, 1); + data->client = SIMIX_process_self(); + data->server = pr->server; + + /* initialize synchronization stuff on the socket */ + data->rdv_server = pr->rdv; data->rdv_client = SIMIX_rdv_create(NULL); - data->im_server = 0; + data->comm_recv = SIMIX_network_irecv(data->rdv_client,NULL,0); + /* connect that simulation data to the socket */ sock->data = data; sock->incoming = 1; @@ -157,54 +148,45 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock) gras_hostdata_t *hd = (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self()); gras_sg_portrec_t pr; - gras_trp_sg_sock_data_t *data; - volatile int found; - - const char *host = SIMIX_host_get_name(SIMIX_host_self()); - - xbt_ex_t e; + gras_trp_sg_sock_data_t data; xbt_assert0(hd, "Please run gras_process_init on each process"); - sock->accepting = 0; /* no such nuisance in SG */ - found = 0; - TRY { - find_port(hd, sock->port, &pr); - found = 1; - } CATCH(e) { - if (e.category == mismatch_error) - xbt_ex_free(e); - else - RETHROW; - } + sock->accepting = 1; - if (found) + /* Check whether a server is already listening on that port or not */ + pr = find_port(hd, sock->port); + + if (pr) THROW2(mismatch_error, 0, "can't listen on address %s:%d: port already in use.", - host, sock->port); - - pr.port = sock->port; - pr.meas = sock->meas; - pr.process = SIMIX_process_self(); - pr.rdv = SIMIX_rdv_create(NULL); + SIMIX_host_get_name(SIMIX_host_self()), sock->port); + + /* This port is free, let's take it */ + pr = xbt_new(s_gras_sg_portrec_t,1); + pr->port = sock->port; + pr->meas = sock->meas; + pr->server = SIMIX_process_self(); + pr->rdv = SIMIX_rdv_create(NULL); xbt_dynar_push(hd->ports, &pr); /* Create the socket */ - data = xbt_new(gras_trp_sg_sock_data_t, 1); - data->from_process = SIMIX_process_self(); - data->to_process = NULL; - data->to_host = SIMIX_host_self(); - data->rdv_server = pr.rdv; + data = xbt_new(s_gras_trp_sg_sock_data_t, 1); + data->server = SIMIX_process_self(); + data->client = NULL; + data->rdv_server = pr->rdv; data->rdv_client = NULL; - data->im_server = 0; - data->comm_recv = SIMIX_network_irecv(pr.rdv,NULL,0); - INFO1("Comm %p",data->comm_recv); + data->comm_recv = SIMIX_network_irecv(pr->rdv,NULL,0); sock->data = data; - VERB6("'%s' (%d) ears on %s:%d%s (%p)", + VERB10("'%s' (%d) ears on %s:%d%s (%p; data:%p); Here rdv: %p; Remote rdv: %p; Comm %p", SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(), - host, sock->port, sock->meas ? " (mode meas)" : "", sock); + SIMIX_host_get_name(SIMIX_host_self()), sock->port, + sock->meas ? " (mode meas)" : "", sock,data, + (data->server==SIMIX_process_self())?data->rdv_server:data->rdv_client, + (data->server==SIMIX_process_self())?data->rdv_client:data->rdv_server, + data->comm_recv); } @@ -231,7 +213,7 @@ void gras_trp_sg_socket_close(gras_socket_t sock) /* server mode socket. Unregister it from 'OS' tables */ xbt_dynar_foreach(hd->ports, cpt, pr) { DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports)); - if (pr.port == sock->port) { + if (pr->port == sock->port) { xbt_dynar_cursor_rm(hd->ports, &cpt); XBT_OUT; return; @@ -268,16 +250,21 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock, gras_msg_t msg; /* message to send */ #endif - gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *) sock->data; + gras_trp_sg_sock_data_t sock_data = (gras_trp_sg_sock_data_t) sock->data; xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); /* creates simix action and waits its ends, waits in the sender host condition */ - DEBUG4("send chunk from %s to %s:%d (size=%ld)", - SIMIX_host_get_name(SIMIX_host_self()), - SIMIX_host_get_name(sock_data->to_host), sock->peer_port, size); + if (XBT_LOG_ISENABLED(gras_trp_sg,xbt_log_priority_debug)) { + smx_process_t remote_dude = (sock_data->server==SIMIX_process_self())?(sock_data->client):(sock_data->server); + smx_host_t remote_host = SIMIX_process_get_host(remote_dude); + DEBUG4("send chunk from %s to %s:%d (size=%ld)", + SIMIX_host_get_name(SIMIX_host_self()), + SIMIX_host_get_name(remote_host), + sock->peer_port, size); + } //SIMIX_network_send(sock_data->rdv,size,1,-1,NULL,0,NULL,NULL); THROW_UNIMPLEMENTED; } diff --git a/src/gras/Virtu/virtu_sg.h b/src/gras/Virtu/virtu_sg.h index f62517ccac..94be34591b 100644 --- a/src/gras/Virtu/virtu_sg.h +++ b/src/gras/Virtu/virtu_sg.h @@ -16,11 +16,10 @@ typedef struct { int port; /* list of ports used by a server socket */ - int meas; /* (boolean) the channel is for measurements or for messages */ - smx_process_t process; /* process listening */ - smx_rdv_t rdv; /* rendez-vous point to the listener */ -// gras_socket_t socket; FIXME KILLME -} gras_sg_portrec_t; + int meas:1; /* (boolean) the channel is for measurements or for messages */ + smx_process_t server; + smx_rdv_t rdv; +} s_gras_sg_portrec_t, *gras_sg_portrec_t; /* Data for each host */ typedef struct { @@ -32,16 +31,13 @@ typedef struct { /* data for each socket (FIXME: find a better location for that)*/ typedef struct { - smx_process_t from_process; - smx_process_t to_process; - - smx_host_t to_host; /* Who's on other side */ + smx_process_t server; + smx_process_t client; smx_rdv_t rdv_server; /* The rendez-vous point to use */ smx_rdv_t rdv_client; /* The rendez-vous point to use */ - int im_server:1; - smx_comm_t comm_recv; /* The comm of irecv on receiving sockets */ -} gras_trp_sg_sock_data_t; + smx_comm_t comm_recv; /* The comm of irecv on receiver side */ +} s_gras_trp_sg_sock_data_t,*gras_trp_sg_sock_data_t; void *gras_libdata_by_name_from_remote(const char *name, smx_process_t p); -- 2.20.1