X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/325e11b2f4756b1912369af33486b0b6513cf315..cbab0c1ae2e08f519824609764b951631c163766:/src/gras_simix/Transport/gras_simix_transport_plugin_sg.c diff --git a/src/gras_simix/Transport/gras_simix_transport_plugin_sg.c b/src/gras_simix/Transport/gras_simix_transport_plugin_sg.c index 14e5674647..0debd25726 100644 --- a/src/gras_simix/Transport/gras_simix_transport_plugin_sg.c +++ b/src/gras_simix/Transport/gras_simix_transport_plugin_sg.c @@ -13,8 +13,7 @@ #include "xbt/ex.h" -//#include "msg/msg.h" - +#include "gras_simix/Msg/gras_simix_msg_private.h" #include "gras_simix_transport_private.h" #include "gras_simix/Virtu/gras_simix_virtu_sg.h" @@ -215,11 +214,13 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self, } void gras_trp_sg_socket_close(gras_socket_t sock){ - + xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_name("gras_trp"))->sockets; + gras_socket_t sock_iter; + int cursor; + int found; gras_hostdata_t *hd=(gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); int cpt; - - gras_sg_portrec_t pr; + gras_sg_portrec_t pr; XBT_IN1(" (sock=%p)",sock); @@ -230,19 +231,31 @@ void gras_trp_sg_socket_close(gras_socket_t sock){ if (sock->data) free(sock->data); - SIMIX_cond_destroy(hd->cond_port[sock->port]); - hd->cond_port[sock->port] = NULL; - SIMIX_mutex_destroy(hd->mutex_port[sock->port]); - hd->mutex_port[sock->port] = NULL; + /* search for a socket in the list that is using the mutex and condition. It can happen because we create 2 sockets to communicate (incomming and outgoing) */ + found = 0; + xbt_dynar_foreach(sockets,cursor,sock_iter) { + if (sock_iter->port == sock->port) { + found = 1; + break; + } + } + /* if not found, it is the last socket opened in this port and we can free the mutex and condition */ + if (!found) { + SIMIX_cond_destroy(hd->cond_port[sock->port]); + hd->cond_port[sock->port] = NULL; + SIMIX_mutex_destroy(hd->mutex_port[sock->port]); + hd->mutex_port[sock->port] = NULL; + } + - if (sock->incoming && sock->port >= 0) { + if (sock->incoming && !sock->outgoing && sock->port >= 0) { /* server mode socket. Unregister it from 'OS' tables */ xbt_dynar_foreach(hd->ports, cpt, pr) { DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports)); if (pr.port == sock->port) { - xbt_dynar_cursor_rm(hd->ports, &cpt); - XBT_OUT; - return; + xbt_dynar_cursor_rm(hd->ports, &cpt); + XBT_OUT; + return; } } WARN2("socket_close called on the unknown incoming socket %p (port=%d)", @@ -267,9 +280,69 @@ void gras_trp_sg_chunk_send(gras_socket_t sock, void gras_trp_sg_chunk_send_raw(gras_socket_t sock, const char *data, unsigned long int size) { + char name[256]; + static unsigned int count=0; + + smx_action_t act; /* simix action */ + gras_trp_sg_sock_data_t *sock_data; + gras_hostdata_t *hd; + gras_hostdata_t *remote_hd; + gras_trp_procdata_t trp_remote_proc; + gras_msg_procdata_t msg_remote_proc; + gras_msg_t msg; /* message to send */ + + sock_data = (gras_trp_sg_sock_data_t *)sock->data; + hd = (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); + remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host); + + xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); + + + sprintf(name,"Chunk[%d]",count++); + /*initialize gras message */ + msg = xbt_new(s_gras_msg_t,1); + msg->expe = sock; + msg->payl_size=size; + + if (data) { + msg->payl=(void*)xbt_malloc(size); + memcpy(msg->payl,data,size); + } else { + msg->payl = NULL; + } + + /* put message on msg_queue */ + msg_remote_proc = (gras_msg_procdata_t)gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process); + xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg); + + /* wake-up the receiver */ + trp_remote_proc = (gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process); + SIMIX_cond_signal(trp_remote_proc->cond); + + SIMIX_mutex_lock(remote_hd->mutex_port[sock->peer_port]); + /* wait for the receiver */ + SIMIX_cond_wait(remote_hd->cond_port[sock->peer_port], remote_hd->mutex_port[sock->peer_port]); + + /* creates simix action and waits its ends, waits in the sender host condition*/ + DEBUG5("send chunk %s from %s to %s:%d (size=%ld)", + name, SIMIX_host_get_name(SIMIX_host_self()), + SIMIX_host_get_name(sock_data->to_host), sock->peer_port,size); + + act = SIMIX_action_communicate(sock_data->to_host, SIMIX_host_self(),name, size, -1); + SIMIX_register_action_to_condition(act,remote_hd->cond_port[sock->peer_port]); + SIMIX_register_condition_to_action(act,remote_hd->cond_port[sock->peer_port]); + + SIMIX_host_get_name(sock_data->to_host),SIMIX_process_get_name(sock_data->to_process), + + SIMIX_cond_wait(remote_hd->cond_port[sock->peer_port], remote_hd->mutex_port[sock->peer_port]); + /* error treatmeant */ + + /* cleanup structures */ + SIMIX_action_destroy(act); + SIMIX_mutex_unlock(remote_hd->mutex_port[sock->peer_port]); + SIMIX_cond_signal(remote_hd->cond_port[sock->peer_port]); /* m_task_t task=NULL; - static unsigned int count=0; char name[256]; gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)sock->data; sg_task_data_t *task_data; @@ -300,6 +373,56 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock, int gras_trp_sg_chunk_recv(gras_socket_t sock, char *data, unsigned long int size){ + gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id); + gras_trp_sg_sock_data_t *sock_data; + gras_hostdata_t *remote_hd; + gras_hostdata_t *local_hd; + gras_msg_t msg_got; + gras_msg_procdata_t msg_procdata = (gras_msg_procdata_t)gras_libdata_by_name("gras_msg"); + + xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); + + SIMIX_mutex_lock(pd->mutex); + if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) { + SIMIX_cond_wait_timeout(pd->cond,pd->mutex,60); + } + if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) { + THROW0(timeout_error,0,"Timeout"); + } + SIMIX_mutex_unlock(pd->mutex); + + sock_data = (gras_trp_sg_sock_data_t *)sock->data; + DEBUG3("Remote host %s, Remote Port: %d Local port %d", SIMIX_host_get_name(sock_data->to_host), sock->peer_port, sock->port); + remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host); + local_hd = (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); + + + + msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue); + + SIMIX_mutex_lock(local_hd->mutex_port[sock->port]); +/* ok, I'm here, you can continue the communication */ + SIMIX_cond_signal(local_hd->cond_port[sock->port]); + +/* wait for communication end */ + SIMIX_cond_wait(local_hd->cond_port[sock->port],local_hd->mutex_port[sock->port]); + + + if (msg_got->payl_size != size) + THROW5(mismatch_error,0, + "Got %d bytes when %ld where expected (in %s->%s:%d)", + msg_got->payl_size, size, + SIMIX_host_get_name(sock_data->to_host), + SIMIX_host_get_name(SIMIX_host_self()), sock->peer_port); + if (data) { + memcpy(data,msg_got->payl,size); + } + if (msg_got->payl) + xbt_free(msg_got->payl); + xbt_free(msg_got); + SIMIX_cond_wait(local_hd->cond_port[sock->port],local_hd->mutex_port[sock->port]); + SIMIX_mutex_unlock(local_hd->mutex_port[sock->port]); + /* gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);