From: donassbr Date: Thu, 3 May 2007 14:35:09 +0000 (+0000) Subject: Functions gras_trp_sg_chunk_recv and send added. X-Git-Tag: v3.3~1880 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/6be27148babf9cba58fcc85eb7ea8859783018d2 Functions gras_trp_sg_chunk_recv and send added. Some memories leaks solved (socket destroy). But, I still don't know why the simulation time is different. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3480 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/src/gras_simix/Msg/gras_simix_sg_msg.c b/src/gras_simix/Msg/gras_simix_sg_msg.c index bc263f92fa..89e544b10f 100644 --- a/src/gras_simix/Msg/gras_simix_sg_msg.c +++ b/src/gras_simix/Msg/gras_simix_sg_msg.c @@ -103,33 +103,6 @@ void gras_msg_send_ext(gras_socket_t sock, VERB0("Message sent"); -/* - if (XBT_LOG_ISENABLED(gras_msg,xbt_log_priority_verbose)) { - asprintf(&name,"type:'%s';kind:'%s';ID %lu from %s:%d to %s:%d", - msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID, - gras_os_myname(),gras_os_myport(), - gras_socket_peer_name(sock), gras_socket_peer_port(sock)); - task=MSG_task_create(name,0, - ((double)whole_payload_size),msg); - free(name); - } else { - task=MSG_task_create(msg->type->name,0, - ((double)whole_payload_size),msg); - } - - sock->bufdata = msg; - SIMIX_cond_signal(gras_libdata_by_name_from_remote("trp", sock->data->to_process)->cond); - DEBUG1("Prepare to send a message to %s", - MSG_host_get_name (sock_data->to_host)); - if (MSG_task_put_with_timeout(task, sock_data->to_host,sock_data->to_chan,60.0) != MSG_OK) - THROW0(system_error,0,"Problem during the MSG_task_put with timeout 60"); - - VERB5("Sent to %s(%d) a message type '%s' kind '%s' ID %lu", - MSG_host_get_name(sock_data->to_host),sock_data->to_PID, - msg->type->name, - e_gras_msg_kind_names[msg->kind], - msg->ID); -*/ } /* * receive the next message on the given socket. diff --git a/src/gras_simix/Transport/gras_simix_sg_transport.c b/src/gras_simix/Transport/gras_simix_sg_transport.c index b7c326b3e2..6066347fd6 100644 --- a/src/gras_simix/Transport/gras_simix_sg_transport.c +++ b/src/gras_simix/Transport/gras_simix_sg_transport.c @@ -63,19 +63,17 @@ gras_socket_t gras_trp_select(double timeout) { /* Ok, got something. Open a socket back to the expeditor */ /* Try to reuse an already openned socket to that expeditor */ + DEBUG1("Open sockets size %lu",xbt_dynar_length(pd->sockets)); xbt_dynar_foreach(pd->sockets,cursor,sock_iter) { DEBUG1("Consider %p as outgoing socket to expeditor",sock_iter); if (sock_iter->meas || !sock_iter->outgoing) continue; - //DEBUG4("sock_iter %p port %d active %p port %d",((gras_trp_sg_sock_data_t*)sock_iter->data)->to_process,sock_iter->peer_port,((gras_trp_sg_sock_data_t*)pd->active_socket->data)->from_process, pd->active_socket->port); - //DEBUG1("\nFrom process %p", ((gras_trp_sg_sock_data_t*)pd->active_socket->data)->from_process); if ((sock_iter->peer_port == active_socket->port) && (((gras_trp_sg_sock_data_t*)sock_iter->data)->to_host == SIMIX_process_get_host(((gras_trp_sg_sock_data_t*)active_socket->data)->from_process))) { SIMIX_mutex_unlock(pd->mutex); return sock_iter; } - } /* Socket to expeditor not created yet */ diff --git a/src/gras_simix/Transport/gras_simix_transport.c b/src/gras_simix/Transport/gras_simix_transport.c index beff0097b0..f2db951003 100644 --- a/src/gras_simix/Transport/gras_simix_transport.c +++ b/src/gras_simix/Transport/gras_simix_transport.c @@ -97,10 +97,10 @@ void gras_trp_init(void){ void gras_trp_exit(void){ - xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets; - gras_socket_t sock_iter; - int cursor; - + //xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets; + //gras_socket_t sock_iter; + //int cursor; + DEBUG1("gras_trp valor %d",_gras_trp_started); if (_gras_trp_started == 0) { return; } @@ -115,13 +115,15 @@ gras_trp_exit(void){ } #endif - /* Close all the sockets */ + /* Close all the sockets, moved to process_close */ + /* + DEBUG1("sockets pointer %p",sockets); xbt_dynar_foreach(sockets,cursor,sock_iter) { VERB1("Closing the socket %p left open on exit. Maybe a socket leak?", sock_iter); gras_socket_close(sock_iter); } - + */ /* Delete the plugins */ xbt_dict_free(&_gras_trp_plugins); } @@ -360,9 +362,11 @@ void gras_socket_close(gras_socket_t sock) { } /* FIXME: Issue an event when the socket is closed */ + DEBUG1("sockets pointer before %p",sockets); if (sock) { xbt_dynar_foreach(sockets,cursor,sock_iter) { if (sock == sock_iter) { + DEBUG2("remove sock cursor %d dize %lu\n",cursor,xbt_dynar_length(sockets)); xbt_dynar_cursor_rm(sockets,&cursor); if (sock->plugin->socket_close) (* sock->plugin->socket_close)(sock); @@ -611,7 +615,6 @@ void gras_trp_socketset_dump(const char *name) { */ int gras_trp_libdata_id; void gras_trp_register() { - DEBUG0("\ntrp add\n"); gras_trp_libdata_id = gras_procdata_add("gras_trp",gras_trp_procdata_new, gras_trp_procdata_free); } diff --git a/src/gras_simix/Transport/gras_simix_transport_plugin_sg.c b/src/gras_simix/Transport/gras_simix_transport_plugin_sg.c index 70660f553b..df870548f6 100644 --- a/src/gras_simix/Transport/gras_simix_transport_plugin_sg.c +++ b/src/gras_simix/Transport/gras_simix_transport_plugin_sg.c @@ -13,8 +13,7 @@ #include "xbt/ex.h" -//#include "msg/msg.h" - +#include "gras_simix/Msg/gras_simix_msg_private.h" #include "gras_simix_transport_private.h" #include "gras_simix/Virtu/gras_simix_virtu_sg.h" @@ -215,7 +214,10 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self, } void gras_trp_sg_socket_close(gras_socket_t sock){ - + xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_name("gras_trp"))->sockets; + gras_socket_t sock_iter; + int cursor; + int found; gras_hostdata_t *hd=(gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); int cpt; gras_sg_portrec_t pr; @@ -228,21 +230,32 @@ void gras_trp_sg_socket_close(gras_socket_t sock){ if (sock->data) free(sock->data); - -/* - SIMIX_cond_destroy(hd->cond_port[sock->port]); - hd->cond_port[sock->port] = NULL; - SIMIX_mutex_destroy(hd->mutex_port[sock->port]); - hd->mutex_port[sock->port] = NULL; -*/ + + /* search for a socket in the list that is using the mutex and condition. It can happen because we create 2 sockets to communicate (incomming and outgoing) */ + found = 0; + xbt_dynar_foreach(sockets,cursor,sock_iter) { + if (sock_iter->port == sock->port) { + found = 1; + break; + } + } + /* if not found, it is the last socket opened in this port and we can free the mutex and condition */ + if (!found) { + SIMIX_cond_destroy(hd->cond_port[sock->port]); + hd->cond_port[sock->port] = NULL; + SIMIX_mutex_destroy(hd->mutex_port[sock->port]); + hd->mutex_port[sock->port] = NULL; + } + + if (sock->incoming && !sock->outgoing && sock->port >= 0) { /* server mode socket. Unregister it from 'OS' tables */ xbt_dynar_foreach(hd->ports, cpt, pr) { DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports)); if (pr.port == sock->port) { - xbt_dynar_cursor_rm(hd->ports, &cpt); - XBT_OUT; - return; + xbt_dynar_cursor_rm(hd->ports, &cpt); + XBT_OUT; + return; } } WARN2("socket_close called on the unknown incoming socket %p (port=%d)", @@ -267,9 +280,69 @@ void gras_trp_sg_chunk_send(gras_socket_t sock, void gras_trp_sg_chunk_send_raw(gras_socket_t sock, const char *data, unsigned long int size) { + char name[256]; + static unsigned int count=0; + + smx_action_t act; /* simix action */ + gras_trp_sg_sock_data_t *sock_data; + gras_hostdata_t *hd; + gras_hostdata_t *remote_hd; + gras_trp_procdata_t trp_remote_proc; + gras_msg_procdata_t msg_remote_proc; + gras_msg_t msg; /* message to send */ + + sock_data = (gras_trp_sg_sock_data_t *)sock->data; + hd = (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); + remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host); + + xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); + + + sprintf(name,"Chunk[%d]",count++); + /*initialize gras message */ + msg = xbt_new(s_gras_msg_t,1); + msg->expe = sock; + msg->payl_size=size; + + if (data) { + msg->payl=(void*)xbt_malloc(size); + memcpy(msg->payl,data,size); + } else { + msg->payl = NULL; + } + + /* put message on msg_queue */ + msg_remote_proc = (gras_msg_procdata_t)gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process); + xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg); + + /* wake-up the receiver */ + trp_remote_proc = (gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process); + SIMIX_cond_signal(trp_remote_proc->cond); + + SIMIX_mutex_lock(remote_hd->mutex_port[sock->peer_port]); + /* wait for the receiver */ + SIMIX_cond_wait(remote_hd->cond_port[sock->peer_port], remote_hd->mutex_port[sock->peer_port]); + + /* creates simix action and waits its ends, waits in the sender host condition*/ + DEBUG5("send chunk %s from %s to %s:%d (size=%ld)", + name, SIMIX_host_get_name(SIMIX_host_self()), + SIMIX_host_get_name(sock_data->to_host), sock->peer_port,size); + + act = SIMIX_action_communicate(sock_data->to_host, SIMIX_host_self(),name, size, -1); + SIMIX_register_action_to_condition(act,remote_hd->cond_port[sock->peer_port]); + SIMIX_register_condition_to_action(act,remote_hd->cond_port[sock->peer_port]); + + SIMIX_host_get_name(sock_data->to_host),SIMIX_process_get_name(sock_data->to_process), + + SIMIX_cond_wait(remote_hd->cond_port[sock->peer_port], remote_hd->mutex_port[sock->peer_port]); + /* error treatmeant */ + + /* cleanup structures */ + SIMIX_action_destroy(act); + SIMIX_mutex_unlock(remote_hd->mutex_port[sock->peer_port]); + SIMIX_cond_signal(remote_hd->cond_port[sock->peer_port]); /* m_task_t task=NULL; - static unsigned int count=0; char name[256]; gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)sock->data; sg_task_data_t *task_data; @@ -300,6 +373,56 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock, int gras_trp_sg_chunk_recv(gras_socket_t sock, char *data, unsigned long int size){ + gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id); + gras_trp_sg_sock_data_t *sock_data; + gras_hostdata_t *remote_hd; + gras_hostdata_t *local_hd; + gras_msg_t msg_got; + gras_msg_procdata_t msg_procdata = (gras_msg_procdata_t)gras_libdata_by_name("gras_msg"); + + xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); + + SIMIX_mutex_lock(pd->mutex); + if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) { + SIMIX_cond_wait_timeout(pd->cond,pd->mutex,60); + } + if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) { + THROW0(timeout_error,0,"Timeout"); + } + SIMIX_mutex_unlock(pd->mutex); + + sock_data = (gras_trp_sg_sock_data_t *)sock->data; + DEBUG3("Remote host %s, Remote Port: %d Local port %d", SIMIX_host_get_name(sock_data->to_host), sock->peer_port, sock->port); + remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host); + local_hd = (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); + + + + msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue); + + SIMIX_mutex_lock(local_hd->mutex_port[sock->port]); +/* ok, I'm here, you can continuate the communication */ + SIMIX_cond_signal(local_hd->cond_port[sock->port]); + +/* wait for communication end */ + SIMIX_cond_wait(local_hd->cond_port[sock->port],local_hd->mutex_port[sock->port]); + + + if (msg_got->payl_size != size) + THROW5(mismatch_error,0, + "Got %d bytes when %ld where expected (in %s->%s:%d)", + msg_got->payl_size, size, + SIMIX_host_get_name(sock_data->to_host), + SIMIX_host_get_name(SIMIX_host_self()), sock->peer_port); + if (data) { + memcpy(data,msg_got->payl,size); + } + if (msg_got->payl) + xbt_free(msg_got->payl); + xbt_free(msg_got); + SIMIX_cond_wait(local_hd->cond_port[sock->port],local_hd->mutex_port[sock->port]); + SIMIX_mutex_unlock(local_hd->mutex_port[sock->port]); + /* gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id); diff --git a/src/gras_simix/Virtu/gras_simix_sg_process.c b/src/gras_simix/Virtu/gras_simix_sg_process.c index acb01ff7a6..4a0189bf4d 100644 --- a/src/gras_simix/Virtu/gras_simix_sg_process.c +++ b/src/gras_simix/Virtu/gras_simix_sg_process.c @@ -23,11 +23,8 @@ gras_process_init() { gras_hostdata_t *hd=(gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); gras_procdata_t *pd=xbt_new0(gras_procdata_t,1); gras_trp_procdata_t trp_pd; - //gras_sg_portrec_t prmeas,pr; - //int i; - - SIMIX_process_set_data(SIMIX_process_self(),(void*)pd); + SIMIX_process_set_data(SIMIX_process_self(),(void*)pd); gras_procdata_init(); @@ -38,8 +35,6 @@ gras_process_init() { hd->refcount = 1; hd->ports = xbt_dynar_new(sizeof(gras_sg_portrec_t),NULL); - // memset(hd->proc, 0, sizeof(hd->proc[0]) * XBT_MAX_CHANNEL); - for (i=0;i<65536;i++) { hd->cond_port[i] =NULL; hd->mutex_port[i] =NULL; @@ -60,41 +55,6 @@ gras_process_init() { trp_pd->mutex = SIMIX_mutex_init(); trp_pd->cond = SIMIX_cond_init(); trp_pd->active_socket = xbt_fifo_new(); - /* take a free channel for this process */ - /* - trp_pd = (gras_trp_procdata_t)gras_libdata_by_name("gras_trp"); - for (i=0; iproc[i]; i++); - if (i == XBT_MAX_CHANNEL) - THROW2(system_error,0, - "Can't add a new process on %s, because all channels are already in use. Please increase MAX CHANNEL (which is %d for now) and recompile GRAS.", - MSG_host_get_name(MSG_host_self()),XBT_MAX_CHANNEL); - - trp_pd->chan = i; - hd->proc[ i ] = MSG_process_self_PID(); -*/ - /* regiter it to the ports structure */ - // pr.port = -1; - //pr.tochan = i; - //pr.meas = 0; - //xbt_dynar_push(hd->ports,&pr); - - /* take a free meas channel for this process */ - /* - for (i=0; iproc[i]; i++); - if (i == XBT_MAX_CHANNEL) { - THROW2(system_error,0, - "Can't add a new process on %s, because all channels are already in use. Please increase MAX CHANNEL (which is %d for now) and recompile GRAS.", - MSG_host_get_name(MSG_host_self()),XBT_MAX_CHANNEL); - } - trp_pd->measChan = i; - - hd->proc[ i ] = MSG_process_self_PID(); -*/ - /* register it to the ports structure */ - //prmeas.port = -1; - //prmeas.tochan = i; - //prmeas.meas = 1; - //xbt_dynar_push(hd->ports,&prmeas); VERB2("Creating process '%s' (%ld)", SIMIX_process_get_name(SIMIX_process_self()), @@ -104,6 +64,9 @@ gras_process_init() { void gras_process_exit() { int i; + xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_name("gras_trp"))->sockets; + gras_socket_t sock_iter; + int cursor; gras_hostdata_t *hd=(gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); gras_procdata_t *pd=(gras_procdata_t*)SIMIX_process_get_data(SIMIX_process_self()); @@ -126,20 +89,12 @@ gras_process_exit() { WARN1("process %ld terminated, but some messages are still queued", gras_os_getpid()); -/* - for (cpt=0; cpt< XBT_MAX_CHANNEL; cpt++) - if (myPID == hd->proc[cpt]) - hd->proc[cpt] = 0; -*/ - -/* remove ports from host, maybe i can do it on the socket destroy function */ - /* - xbt_dynar_foreach(hd->ports, cpt, pr) { - if (pr.port == trp_pd->chan || pr.port == trp_pd->measChan) { - xbt_dynar_cursor_rm(hd->ports, &cpt); - } - }*/ - + /* if each process has its sockets list, we need to close them when the process finish */ + xbt_dynar_foreach(sockets,cursor,sock_iter) { + VERB1("Closing the socket %p left open on exit. Maybe a socket leak?", + sock_iter); + gras_socket_close(sock_iter); + } if ( ! --(hd->refcount)) { xbt_dynar_free(&hd->ports); for (i=0; i<65536; i++) {