X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/856060282bcef7e9f575fd1b2d33071afdd58502..5178c3d78c741bc724a439f407d441300355c40d:/src/gras/Transport/transport_plugin_sg.c diff --git a/src/gras/Transport/transport_plugin_sg.c b/src/gras/Transport/transport_plugin_sg.c index d90c36f92b..b3335b8499 100644 --- a/src/gras/Transport/transport_plugin_sg.c +++ b/src/gras/Transport/transport_plugin_sg.c @@ -13,13 +13,11 @@ #include "xbt/ex.h" -#include "msg/msg.h" - -#include "transport_private.h" +#include "gras/Msg/msg_private.h" +#include "gras/Transport/transport_private.h" #include "gras/Virtu/virtu_sg.h" -XBT_LOG_EXTERNAL_CATEGORY(transport); -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_sg,transport,"SimGrid pseudo-transport"); +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_sg,gras_trp,"SimGrid pseudo-transport"); /*** *** Prototypes @@ -97,16 +95,17 @@ void gras_trp_sg_socket_client(gras_trp_plugin_t self, /* OUT */ gras_socket_t sock){ xbt_ex_t e; - m_host_t peer; + smx_host_t peer; gras_hostdata_t *hd; gras_trp_sg_sock_data_t *data; gras_sg_portrec_t pr; /* make sure this socket will reach someone */ - if (!(peer=MSG_get_host_by_name(sock->peer_name))) - THROW1(mismatch_error,0,"Can't connect to %s: no such host.\n",sock->peer_name); + if (!(peer=SIMIX_host_get_by_name(sock->peer_name))) + THROW1(mismatch_error,0, + "Can't connect to %s: no such host.\n",sock->peer_name); - if (!(hd=(gras_hostdata_t *)MSG_host_get_data(peer))) + if (!(hd=(gras_hostdata_t *)SIMIX_host_get_data(peer))) THROW1(mismatch_error,0, "can't connect to %s: no process on this host", sock->peer_name); @@ -126,47 +125,48 @@ void gras_trp_sg_socket_client(gras_trp_plugin_t self, if (pr.meas && !sock->meas) { THROW2(mismatch_error,0, "can't connect to %s:%d in regular mode, the process listen " - "in meas mode on this port",sock->peer_name,sock->peer_port); + "in measurement mode on this port",sock->peer_name,sock->peer_port); } if (!pr.meas && sock->meas) { THROW2(mismatch_error,0, - "can't connect to %s:%d in meas mode, the process listen " + "can't connect to %s:%d in measurement mode, the process listen " "in regular mode on this port",sock->peer_name,sock->peer_port); } - /* create the socket */ data = xbt_new(gras_trp_sg_sock_data_t,1); - data->from_PID = MSG_process_self_PID(); - data->to_PID = hd->proc[ pr.tochan ]; + data->from_process = SIMIX_process_self(); + data->to_process = pr.process; data->to_host = peer; - data->to_chan = pr.tochan; - + + /* initialize mutex and condition of the socket */ + data->mutex = SIMIX_mutex_init(); + data->cond = SIMIX_cond_init(); + data->to_socket = pr.socket; + sock->data = data; sock->incoming = 1; - DEBUG6("%s (PID %d) connects in %s mode to %s:%d (to_PID=%d)", - MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(), - sock->meas?"meas":"regular", - sock->peer_name,sock->peer_port,data->to_PID); + DEBUG5("%s (PID %d) connects in %s mode to %s:%d", + SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(), + sock->meas?"meas":"regular", + sock->peer_name,sock->peer_port); } void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock){ - gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self()); - gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id); + gras_hostdata_t *hd=(gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); gras_sg_portrec_t pr; gras_trp_sg_sock_data_t *data; - int found; + volatile int found; - const char *host=MSG_host_get_name(MSG_host_self()); + const char *host=SIMIX_host_get_name(SIMIX_host_self()); xbt_ex_t e; xbt_assert0(hd,"Please run gras_process_init on each process"); sock->accepting = 0; /* no such nuisance in SG */ - found = 0; TRY { find_port(hd,sock->port,&pr); @@ -183,53 +183,60 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self, "can't listen on address %s:%d: port already in use.", host,sock->port); - pr.tochan = sock->meas ? pd->measChan : pd->chan; - pr.port = sock->port; - pr.meas = sock->meas; + pr.port = sock->port; + pr.meas = sock->meas; + pr.socket = sock; + pr.process = SIMIX_process_self(); xbt_dynar_push(hd->ports,&pr); /* Create the socket */ data = xbt_new(gras_trp_sg_sock_data_t,1); - data->from_PID = -1; - data->to_PID = MSG_process_self_PID(); - data->to_host = MSG_host_self(); - data->to_chan = pd->chan; + data->from_process = SIMIX_process_self(); + data->to_process = NULL; + data->to_host = SIMIX_host_self(); + data->cond = SIMIX_cond_init(); + data->mutex = SIMIX_mutex_init(); + sock->data = data; VERB6("'%s' (%d) ears on %s:%d%s (%p)", - MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(), - host,sock->port,sock->meas? " (mode meas)":"",sock); + SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(), + host,sock->port,sock->meas? " (mode meas)":"",sock); + } void gras_trp_sg_socket_close(gras_socket_t sock){ - gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self()); + gras_hostdata_t *hd=(gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self()); int cpt; - - gras_sg_portrec_t pr; + gras_sg_portrec_t pr; XBT_IN1(" (sock=%p)",sock); if (!sock) return; + xbt_assert0(hd,"Please run gras_process_init on each process"); - if (sock->data) + if (sock->data) { + SIMIX_cond_destroy(((gras_trp_sg_sock_data_t*)sock->data)->cond); + SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t*)sock->data)->mutex); free(sock->data); + } - if (sock->incoming && sock->port >= 0) { + if (sock->incoming && !sock->outgoing && sock->port >= 0) { /* server mode socket. Unregister it from 'OS' tables */ xbt_dynar_foreach(hd->ports, cpt, pr) { DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports)); if (pr.port == sock->port) { xbt_dynar_cursor_rm(hd->ports, &cpt); XBT_OUT; - return; + return; } } WARN2("socket_close called on the unknown incoming socket %p (port=%d)", sock,sock->port); } - XBT_OUT; + XBT_OUT; } typedef struct { @@ -247,72 +254,116 @@ void gras_trp_sg_chunk_send(gras_socket_t sock, void gras_trp_sg_chunk_send_raw(gras_socket_t sock, const char *data, unsigned long int size) { - m_task_t task=NULL; - static unsigned int count=0; char name[256]; - gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)sock->data; - sg_task_data_t *task_data; - - xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); + static unsigned int count=0; + + smx_action_t act; /* simix action */ + gras_trp_sg_sock_data_t *sock_data; + gras_trp_procdata_t trp_remote_proc; + gras_msg_procdata_t msg_remote_proc; + gras_msg_t msg; /* message to send */ + + sock_data = (gras_trp_sg_sock_data_t *)sock->data; + xbt_assert0(sock->meas, + "SG chunk exchange shouldn't be used on non-measurement sockets"); + + SIMIX_mutex_lock(sock_data->mutex); sprintf(name,"Chunk[%d]",count++); + /*initialize gras message */ + msg = xbt_new(s_gras_msg_t,1); + msg->expe = sock; + msg->payl_size=size; - task_data=xbt_new(sg_task_data_t,1); - task_data->size = size; if (data) { - task_data->data=(void*)xbt_malloc(size); - memcpy(task_data->data,data,size); + msg->payl=(void*)xbt_malloc(size); + memcpy(msg->payl,data,size); } else { - task_data->data = NULL; + msg->payl = NULL; } + + + /* put his socket on the selectable socket queue */ + trp_remote_proc = (gras_trp_procdata_t) + gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process); + xbt_queue_push(trp_remote_proc->meas_selectable_sockets,&sock); - task=MSG_task_create(name,0,((double)size)/(1024.0*1024.0),task_data); + /* put message on msg_queue */ + msg_remote_proc = (gras_msg_procdata_t) + gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process); + xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas,msg); + + /* wait for the receiver */ + SIMIX_cond_wait(sock_data->cond,sock_data->mutex); + + /* creates simix action and waits its ends, waits in the sender host + condition*/ DEBUG5("send chunk %s from %s to %s:%d (size=%ld)", - name, MSG_host_get_name(MSG_host_self()), - MSG_host_get_name(sock_data->to_host), sock_data->to_chan,size); - if (MSG_task_put(task, sock_data->to_host,sock_data->to_chan) != MSG_OK) { - THROW0(system_error,0,"Problem during the MSG_task_put"); - } + name, SIMIX_host_get_name(SIMIX_host_self()), + SIMIX_host_get_name(sock_data->to_host), sock->peer_port,size); + + act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host, + name, size, -1); + SIMIX_register_action_to_condition(act,sock_data->cond); + SIMIX_cond_wait(sock_data->cond,sock_data->mutex); + SIMIX_unregister_action_to_condition(act,sock_data->cond); + /* error treatmeant (FIXME)*/ + + /* cleanup structures */ + SIMIX_action_destroy(act); + + SIMIX_mutex_unlock(sock_data->mutex); } int gras_trp_sg_chunk_recv(gras_socket_t sock, - char *data, - unsigned long int size){ - gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id); - - m_task_t task=NULL; - sg_task_data_t *task_data; - gras_trp_sg_sock_data_t *sock_data = sock->data; - - xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); - XBT_IN; - DEBUG4("recv chunk on %s -> %s:%d (size=%ld)", - MSG_host_get_name(sock_data->to_host), - MSG_host_get_name(MSG_host_self()), sock_data->to_chan, size); - if (MSG_task_get_with_time_out(&task, - (sock->meas ? pd->measChan : pd->chan), - 60) != MSG_OK) - THROW0(system_error,0,"Error in MSG_task_get()"); - DEBUG1("Got chuck %s",MSG_task_get_name(task)); - - task_data = MSG_task_get_data(task); - if (task_data->size != size) + char *data, + unsigned long int size){ + gras_trp_sg_sock_data_t *sock_data; + gras_trp_sg_sock_data_t *remote_sock_data; + gras_socket_t remote_socket= NULL; + gras_msg_t msg_got; + gras_msg_procdata_t msg_procdata = + (gras_msg_procdata_t)gras_libdata_by_name("gras_msg"); + gras_trp_procdata_t trp_proc = + (gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id); + + xbt_assert0(sock->meas, + "SG chunk exchange shouldn't be used on non-measurement sockets"); + xbt_queue_shift_timed(trp_proc->meas_selectable_sockets, + &remote_socket, 60); + + if (remote_socket == NULL) { + THROW0(timeout_error,0,"Timeout"); + } + + remote_sock_data = (gras_trp_sg_sock_data_t *)remote_socket->data; + msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas); + + sock_data = (gras_trp_sg_sock_data_t *)sock->data; + + /* ok, I'm here, you can continue the communication */ + SIMIX_cond_signal(remote_sock_data->cond); + + SIMIX_mutex_lock(remote_sock_data->mutex); + /* wait for communication end */ + SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex); + + if (msg_got->payl_size != size) THROW5(mismatch_error,0, "Got %d bytes when %ld where expected (in %s->%s:%d)", - task_data->size, size, - MSG_host_get_name(sock_data->to_host), - MSG_host_get_name(MSG_host_self()), sock_data->to_chan); - if (data) - memcpy(data,task_data->data,size); - if (task_data->data) - free(task_data->data); - free(task_data); + msg_got->payl_size, size, + SIMIX_host_get_name(sock_data->to_host), + SIMIX_host_get_name(SIMIX_host_self()), sock->peer_port); - if (MSG_task_destroy(task) != MSG_OK) - THROW0(system_error,0,"Error in MSG_task_destroy()"); + if (data) + memcpy(data,msg_got->payl,size); + + if (msg_got->payl) + xbt_free(msg_got->payl); - XBT_OUT; - return size; + xbt_free(msg_got); + SIMIX_mutex_unlock(remote_sock_data->mutex); + return 0; }