From: mquinson Date: Tue, 6 Jul 2004 08:10:09 +0000 (+0000) Subject: It works. Isn't that great? X-Git-Tag: v3.3~5160 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/3e6198594720a26230708e18265ad47f8788d421?hp=e7f57f7d31cdd823f4915aaf54442cadea13cb54 It works. Isn't that great? git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@186 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/src/gras/Transport/sg_transport.c b/src/gras/Transport/sg_transport.c index 917d8af9a9..b17f5fc27c 100644 --- a/src/gras/Transport/sg_transport.c +++ b/src/gras/Transport/sg_transport.c @@ -37,12 +37,17 @@ gras_trp_select(double timeout, gras_trp_plugin_t *trp; gras_socket_t *sock_iter; /* iterating over all sockets */ - int cursor; /* iterating over all sockets */ + int cursor,cpt; - int r_pid, cpt; + gras_sg_portrec_t pr; /* iterating to find the chanel of expeditor */ + + int r_pid; gras_hostdata_t *remote_hd; - DEBUG1("select with timeout=%d",timeout); + DEBUG3("select on %s@%s with timeout=%d", + MSG_process_get_name(MSG_process_self()), + MSG_host_get_name(MSG_host_self()), + timeout); do { r_pid = MSG_task_probe_from((m_channel_t) pd->chan); if (r_pid >= 0) { @@ -78,27 +83,59 @@ gras_trp_select(double timeout, if (!(sockdata = malloc(sizeof(gras_trp_sg_sock_data_t)))) RAISE_MALLOC; - sockdata->from_PID = r_pid; - sockdata->to_PID = MSG_process_self_PID(); - sockdata->to_host = MSG_process_get_host(MSG_process_from_PID(r_pid)); + sockdata->from_PID = MSG_process_self_PID(); + sockdata->to_PID = r_pid; + sockdata->to_host = MSG_process_get_host(MSG_process_from_PID(r_pid)); (*dst)->data = sockdata; - (*dst)->peer_port = -1; (*dst)->peer_name = strdup(MSG_host_get_name(sockdata->to_host)); remote_hd=(gras_hostdata_t *)MSG_host_get_data(sockdata->to_host); gras_assert0(remote_hd,"Run gras_process_init!!"); sockdata->to_chan = -1; - for (cpt=0; cpt< GRAS_MAX_CHANNEL; cpt++) - if (r_pid == remote_hd->proc[cpt]) - sockdata->to_chan = cpt; - - gras_assert0(sockdata->to_chan>0, + (*dst)->peer_port = -10; + for (cursor=0; cursorproc[cursor] == r_pid) { + sockdata->to_chan = cursor; + DEBUG2("Chan %d on %s is for my pal", + cursor,(*dst)->peer_name); + + gras_dynar_foreach(remote_hd->ports, cpt, pr) { + if (sockdata->to_chan == pr.tochan) { + if (pr.raw) { + DEBUG0("Damn, it's raw"); + continue; + } + + (*dst)->peer_port = pr.port; + DEBUG1("Cool, it points to port %d", pr.port); + break; + } else { + DEBUG2("Wrong port (tochan=%d, looking for %d)\n", + pr.tochan,sockdata->to_chan); + } + } + if ((*dst)->peer_port == -10) { + /* was raw */ + sockdata->to_chan = -1; + } else { + /* found it, don't let it override by raw */ + break; + } + } + } + gras_assert0(sockdata->to_chan != -1, "Got a message from a process without channel"); return no_error; } else { + /* + DEBUG2("Select on %s@%s did not find anything yet", + MSG_process_get_name(MSG_process_self()), + MSG_host_get_name(MSG_host_self())); + */ + // MSG_process_sleep(1); MSG_process_sleep(0.01); } } while (gras_time()-startTime < timeout diff --git a/src/gras/Transport/transport_plugin_sg.c b/src/gras/Transport/transport_plugin_sg.c index 0e4a950f9c..6cd4396cb4 100644 --- a/src/gras/Transport/transport_plugin_sg.c +++ b/src/gras/Transport/transport_plugin_sg.c @@ -64,13 +64,13 @@ typedef struct { static gras_error_t find_port(gras_hostdata_t *hd, int port, gras_sg_portrec_t *hpd) { int cpt; - gras_sg_portrec_t pd; + gras_sg_portrec_t pr; gras_assert0(hd,"Please run gras_process_init on each process"); - gras_dynar_foreach(hd->ports, cpt, pd) { - if (pd.port == port) { - memcpy(hpd,&pd,sizeof(gras_sg_portrec_t)); + gras_dynar_foreach(hd->ports, cpt, pr) { + if (pr.port == port) { + memcpy(hpd,&pr,sizeof(gras_sg_portrec_t)); return no_error; } } @@ -152,6 +152,7 @@ gras_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self, data->to_chan = pr.tochan; sock->data = data; + sock->incoming = 1; DEBUG6("%s (PID %d) connects in %s mode to %s:%d (to_PID=%d)", MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(), @@ -215,19 +216,21 @@ gras_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self, void gras_trp_sg_socket_close(gras_socket_t *sock){ gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self()); int cpt; - gras_sg_portrec_t *pr; + + gras_sg_portrec_t pr; if (!sock) return; gras_assert0(hd,"Please run gras_process_init on each process"); - free(sock->data); + if (sock->data) + free(sock->data); if (sock->incoming) { /* server mode socket. Un register it from 'OS' tables */ gras_dynar_foreach(hd->ports, cpt, pr) { - if (pr->port == sock->port) { + DEBUG2("Check pr %d of %d", cpt, gras_dynar_length(hd->ports)); + if (pr.port == sock->port) { gras_dynar_cursor_rm(hd->ports, &cpt); - return; } } @@ -262,6 +265,9 @@ gras_error_t gras_trp_sg_chunk_send(gras_socket_t *sock, task=MSG_task_create(name,0,((double)size)/(1024.0*1024.0),task_data); + DEBUG4("send chunk %s from %s to %s on channel %d", + name, MSG_host_get_name(MSG_host_self()), + MSG_host_get_name(sock_data->to_host), sock_data->to_chan); if (MSG_task_put(task, sock_data->to_host,sock_data->to_chan) != MSG_OK) { RAISE0(system_error,"Problem during the MSG_task_put"); } @@ -276,7 +282,11 @@ gras_error_t gras_trp_sg_chunk_recv(gras_socket_t *sock, m_task_t task=NULL; sg_task_data_t *task_data; + gras_trp_sg_sock_data_t *sock_data = sock->data; + DEBUG3("recv chunk on %s from %s on channel %d", + MSG_host_get_name(MSG_host_self()), + MSG_host_get_name(sock_data->to_host), sock_data->to_chan); if (MSG_task_get(&task, (sock->raw ? pd->rawChan : pd->chan)) != MSG_OK) RAISE0(unknown_error,"Error in MSG_task_get()"); diff --git a/src/gras/Virtu/sg_process.c b/src/gras/Virtu/sg_process.c index ec571ec29a..63ba131654 100644 --- a/src/gras/Virtu/sg_process.c +++ b/src/gras/Virtu/sg_process.c @@ -17,6 +17,7 @@ gras_process_init() { gras_error_t errcode; gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self()); gras_procdata_t *pd; + gras_sg_portrec_t prraw,pr; int i; if (!(pd=(gras_procdata_t *)malloc(sizeof(gras_procdata_t)))) @@ -50,6 +51,12 @@ gras_process_init() { pd->chan = i; hd->proc[ i ] = MSG_process_self_PID(); + /* regiter it to the ports structure */ + pr.port = -1; + pr.tochan = i; + pr.raw = 0; + TRY(gras_dynar_push(hd->ports,&pr)); + /* take a free RAW channel for this process */ for (i=0; iproc[i]; i++); if (i == GRAS_MAX_CHANNEL) { @@ -61,6 +68,12 @@ gras_process_init() { hd->proc[ i ] = MSG_process_self_PID(); + /* regiter it to the ports structure */ + prraw.port = -1; + prraw.tochan = i; + prraw.raw = 1; + TRY(gras_dynar_push(hd->ports,&prraw)); + VERB2("Creating process '%s' (%d)", MSG_process_get_name(MSG_process_self()), MSG_process_self_PID());