X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/a18de5d84898a36d9bbdadf2683990da025a60e4..ae66e43b95b26467c1cb9df271e83f51d3d7147b:/src/gras/Transport/transport_plugin_sg.c diff --git a/src/gras/Transport/transport_plugin_sg.c b/src/gras/Transport/transport_plugin_sg.c index b19a9f837b..e7b59a480a 100644 --- a/src/gras/Transport/transport_plugin_sg.c +++ b/src/gras/Transport/transport_plugin_sg.c @@ -13,9 +13,8 @@ #include "xbt/ex.h" -#include "msg/msg.h" - -#include "transport_private.h" +#include "gras/Msg/msg_private.h" +#include "gras/Transport/transport_private.h" #include "gras/Virtu/virtu_sg.h" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_sg,gras_trp,"SimGrid pseudo-transport"); @@ -96,16 +95,16 @@ void gras_trp_sg_socket_client(gras_trp_plugin_t self, /* OUT */ gras_socket_t sock){ xbt_ex_t e; - m_host_t peer; + smx_host_t peer; gras_hostdata_t *hd; gras_trp_sg_sock_data_t *data; gras_sg_portrec_t pr; /* make sure this socket will reach someone */ - if (!(peer=MSG_get_host_by_name(sock->peer_name))) + if (!(peer=SIMIX_host_get_by_name(sock->peer_name))) THROW1(mismatch_error,0,"Can't connect to %s: no such host.\n",sock->peer_name); - if (!(hd=(gras_hostdata_t *)MSG_host_get_data(peer))) + if (!(hd=(gras_hostdata_t *)SIMIX_host_get_data(peer))) THROW1(mismatch_error,0, "can't connect to %s: no process on this host", sock->peer_name); @@ -132,40 +131,41 @@ void gras_trp_sg_socket_client(gras_trp_plugin_t self, "can't connect to %s:%d in meas mode, the process listen " "in regular mode on this port",sock->peer_name,sock->peer_port); } - /* create the socket */ data = xbt_new(gras_trp_sg_sock_data_t,1); - data->from_PID = MSG_process_self_PID(); - data->to_PID = hd->proc[ pr.tochan ]; + data->from_process = SIMIX_process_self(); + data->to_process = pr.process; data->to_host = peer; - data->to_chan = pr.tochan; - + + /* initialize mutex and condition of the socket */ + data->mutex = SIMIX_mutex_init(); + data->cond = SIMIX_cond_init(); + data->to_socket = pr.socket; + sock->data = data; sock->incoming = 1; - DEBUG6("%s (PID %d) connects in %s mode to %s:%d (to_PID=%d)", - MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(), + DEBUG5("%s (PID %ld) connects in %s mode to %s:%d", + SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(), sock->meas?"meas":"regular", - sock->peer_name,sock->peer_port,data->to_PID); + sock->peer_name,sock->peer_port); } void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock){ - gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self()); - gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id); + gras_hostdata_t *hd=(gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); gras_sg_portrec_t pr; gras_trp_sg_sock_data_t *data; volatile int found; - const char *host=MSG_host_get_name(MSG_host_self()); + const char *host=SIMIX_host_get_name(SIMIX_host_self()); xbt_ex_t e; xbt_assert0(hd,"Please run gras_process_init on each process"); sock->accepting = 0; /* no such nuisance in SG */ - found = 0; TRY { find_port(hd,sock->port,&pr); @@ -182,53 +182,61 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self, "can't listen on address %s:%d: port already in use.", host,sock->port); - pr.tochan = sock->meas ? pd->measChan : pd->chan; pr.port = sock->port; pr.meas = sock->meas; + pr.socket = sock; + pr.process = SIMIX_process_self(); xbt_dynar_push(hd->ports,&pr); /* Create the socket */ data = xbt_new(gras_trp_sg_sock_data_t,1); - data->from_PID = -1; - data->to_PID = MSG_process_self_PID(); - data->to_host = MSG_host_self(); - data->to_chan = pd->chan; + data->from_process = SIMIX_process_self(); + data->to_process = NULL; + data->to_host = SIMIX_host_self(); + data->cond = SIMIX_cond_init(); + data->mutex = SIMIX_mutex_init(); + sock->data = data; - VERB6("'%s' (%d) ears on %s:%d%s (%p)", - MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(), + VERB6("'%s' (%ld) ears on %s:%d%s (%p)", + SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(), host,sock->port,sock->meas? " (mode meas)":"",sock); + } void gras_trp_sg_socket_close(gras_socket_t sock){ - gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self()); + gras_hostdata_t *hd=(gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); int cpt; - - gras_sg_portrec_t pr; + gras_sg_portrec_t pr; XBT_IN1(" (sock=%p)",sock); if (!sock) return; + xbt_assert0(hd,"Please run gras_process_init on each process"); - if (sock->data) - free(sock->data); + if (sock->data) { + SIMIX_cond_destroy(((gras_trp_sg_sock_data_t*)sock->data)->cond); + SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t*)sock->data)->mutex); + free(sock->data); + } - if (sock->incoming && sock->port >= 0) { + if (sock->incoming && !sock->outgoing && sock->port >= 0) { /* server mode socket. Unregister it from 'OS' tables */ xbt_dynar_foreach(hd->ports, cpt, pr) { DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports)); if (pr.port == sock->port) { - xbt_dynar_cursor_rm(hd->ports, &cpt); - XBT_OUT; - return; + xbt_dynar_cursor_rm(hd->ports, &cpt); + XBT_OUT; + return; } } WARN2("socket_close called on the unknown incoming socket %p (port=%d)", sock,sock->port); } XBT_OUT; + } typedef struct { @@ -246,72 +254,141 @@ void gras_trp_sg_chunk_send(gras_socket_t sock, void gras_trp_sg_chunk_send_raw(gras_socket_t sock, const char *data, unsigned long int size) { - m_task_t task=NULL; - static unsigned int count=0; char name[256]; - gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)sock->data; - sg_task_data_t *task_data; - + static unsigned int count=0; + + smx_action_t act; /* simix action */ + gras_trp_sg_sock_data_t *sock_data; + gras_trp_sg_sock_data_t *remote_sock_data; + gras_hostdata_t *hd; + gras_hostdata_t *remote_hd; + gras_trp_procdata_t trp_remote_proc; + gras_msg_procdata_t msg_remote_proc; + gras_msg_t msg; /* message to send */ + + sock_data = (gras_trp_sg_sock_data_t *)sock->data; + remote_sock_data = ((gras_trp_sg_sock_data_t *)sock->data)->to_socket->data; + hd = (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); + remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host); + xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); + sprintf(name,"Chunk[%d]",count++); + /*initialize gras message */ + msg = xbt_new(s_gras_msg_t,1); + msg->expe = sock; + msg->payl_size=size; - task_data=xbt_new(sg_task_data_t,1); - task_data->size = size; if (data) { - task_data->data=(void*)xbt_malloc(size); - memcpy(task_data->data,data,size); + msg->payl=(void*)xbt_malloc(size); + memcpy(msg->payl,data,size); } else { - task_data->data = NULL; + msg->payl = NULL; } - task=MSG_task_create(name,0,((double)size),task_data); - + /* put message on msg_queue */ + msg_remote_proc = (gras_msg_procdata_t)gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process); + xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg); + + /* wake-up the receiver */ + trp_remote_proc = (gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process); + SIMIX_cond_signal(trp_remote_proc->cond); + + SIMIX_mutex_lock(remote_sock_data->mutex); + //SIMIX_mutex_lock(remote_hd->mutex_port[sock->peer_port]); + /* wait for the receiver */ + SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex); + //SIMIX_cond_wait(remote_hd->cond_port[sock->peer_port], remote_hd->mutex_port[sock->peer_port]); + + /* creates simix action and waits its ends, waits in the sender host condition*/ DEBUG5("send chunk %s from %s to %s:%d (size=%ld)", - name, MSG_host_get_name(MSG_host_self()), - MSG_host_get_name(sock_data->to_host), sock_data->to_chan,size); - if (MSG_task_put_with_timeout(task, sock_data->to_host,sock_data->to_chan,60.0) != MSG_OK) { - THROW0(system_error,0,"Problem during the MSG_task_put with timeout 60"); - } + name, SIMIX_host_get_name(SIMIX_host_self()), + SIMIX_host_get_name(sock_data->to_host), sock->peer_port,size); + + act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host, name, size, -1); + /* + SIMIX_register_action_to_condition(act,remote_hd->cond_port[sock->peer_port]); + SIMIX_register_condition_to_action(act,remote_hd->cond_port[sock->peer_port]); + */ + SIMIX_register_action_to_condition(act,remote_sock_data->cond); + SIMIX_register_condition_to_action(act,remote_sock_data->cond); + + SIMIX_host_get_name(sock_data->to_host),SIMIX_process_get_name(sock_data->to_process), + + SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex); + //SIMIX_cond_wait(remote_hd->cond_port[sock->peer_port], remote_hd->mutex_port[sock->peer_port]); + /* error treatmeant */ + + /* cleanup structures */ + SIMIX_action_destroy(act); + + SIMIX_mutex_unlock(remote_sock_data->mutex); + //SIMIX_mutex_unlock(remote_hd->mutex_port[sock->peer_port]); + SIMIX_cond_signal(remote_sock_data->cond); + //SIMIX_cond_signal(remote_hd->cond_port[sock->peer_port]); } int gras_trp_sg_chunk_recv(gras_socket_t sock, char *data, unsigned long int size){ gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id); - - m_task_t task=NULL; - sg_task_data_t *task_data; - gras_trp_sg_sock_data_t *sock_data = sock->data; + gras_trp_sg_sock_data_t *sock_data; + gras_hostdata_t *remote_hd; + gras_hostdata_t *local_hd; + gras_msg_t msg_got; + gras_msg_procdata_t msg_procdata = (gras_msg_procdata_t)gras_libdata_by_name("gras_msg"); xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); - XBT_IN; - DEBUG4("recv chunk on %s -> %s:%d (size=%ld)", - MSG_host_get_name(sock_data->to_host), - MSG_host_get_name(MSG_host_self()), sock_data->to_chan, size); - if (MSG_task_get_with_time_out(&task, - (sock->meas ? pd->measChan : pd->chan), - 60) != MSG_OK) - THROW0(system_error,0,"Error in MSG_task_get()"); - DEBUG1("Got chuck %s",MSG_task_get_name(task)); - - task_data = MSG_task_get_data(task); - if (task_data->size != size) + + SIMIX_mutex_lock(pd->mutex); + if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) { + SIMIX_cond_wait_timeout(pd->cond,pd->mutex,60); + } + if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) { + THROW0(timeout_error,0,"Timeout"); + } + SIMIX_mutex_unlock(pd->mutex); + + sock_data = (gras_trp_sg_sock_data_t *)sock->data; + DEBUG3("Remote host %s, Remote Port: %d Local port %d", SIMIX_host_get_name(sock_data->to_host), sock->peer_port, sock->port); + remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host); + local_hd = (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); + + + + msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue); + + SIMIX_mutex_lock(sock_data->mutex); + //SIMIX_mutex_lock(local_hd->mutex_port[sock->port]); +/* ok, I'm here, you can continue the communication */ + SIMIX_cond_signal(sock_data->cond); + //SIMIX_cond_signal(local_hd->cond_port[sock->port]); + +/* wait for communication end */ + SIMIX_cond_wait(sock_data->cond,sock_data->mutex); + //SIMIX_cond_wait(local_hd->cond_port[sock->port],local_hd->mutex_port[sock->port]); + + + if (msg_got->payl_size != size) THROW5(mismatch_error,0, "Got %d bytes when %ld where expected (in %s->%s:%d)", - task_data->size, size, - MSG_host_get_name(sock_data->to_host), - MSG_host_get_name(MSG_host_self()), sock_data->to_chan); - if (data) - memcpy(data,task_data->data,size); - if (task_data->data) - free(task_data->data); - free(task_data); - - if (MSG_task_destroy(task) != MSG_OK) - THROW0(system_error,0,"Error in MSG_task_destroy()"); - - XBT_OUT; - return size; + msg_got->payl_size, size, + SIMIX_host_get_name(sock_data->to_host), + SIMIX_host_get_name(SIMIX_host_self()), sock->peer_port); + if (data) { + memcpy(data,msg_got->payl,size); + } + if (msg_got->payl) + xbt_free(msg_got->payl); + xbt_free(msg_got); + SIMIX_cond_wait(sock_data->cond,sock_data->mutex); + SIMIX_mutex_unlock(sock_data->mutex); + //SIMIX_mutex_unlock(local_hd->mutex_port[sock->port]); + /* + SIMIX_cond_wait(local_hd->cond_port[sock->port],local_hd->mutex_port[sock->port]); + SIMIX_mutex_unlock(local_hd->mutex_port[sock->port]); + */ + return 0; }