X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/25d6952dcf52e6ffc1e4a402af0e2d3c17ac3935..778f65057da68465382593cd036b6ee59ada54e9:/src/gras/Transport/transport_plugin_sg.c diff --git a/src/gras/Transport/transport_plugin_sg.c b/src/gras/Transport/transport_plugin_sg.c index e777b589a7..b19a9f837b 100644 --- a/src/gras/Transport/transport_plugin_sg.c +++ b/src/gras/Transport/transport_plugin_sg.c @@ -18,13 +18,11 @@ #include "transport_private.h" #include "gras/Virtu/virtu_sg.h" -XBT_LOG_EXTERNAL_CATEGORY(transport); -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_sg,transport,"SimGrid pseudo-transport"); +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_sg,gras_trp,"SimGrid pseudo-transport"); /*** *** Prototypes ***/ -void hexa_print(unsigned char *data, int size); /* in gras.c */ /* retrieve the port record associated to a numerical port on an host */ static void find_port(gras_hostdata_t *hd, int port, gras_sg_portrec_t *hpd); @@ -36,11 +34,15 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self, /* OUT */ gras_socket_t sock); void gras_trp_sg_socket_close(gras_socket_t sd); +void gras_trp_sg_chunk_send_raw(gras_socket_t sd, + const char *data, + unsigned long int size); void gras_trp_sg_chunk_send(gras_socket_t sd, const char *data, - unsigned long int size); + unsigned long int size, + int stable_ignored); -void gras_trp_sg_chunk_recv(gras_socket_t sd, +int gras_trp_sg_chunk_recv(gras_socket_t sd, char *data, unsigned long int size); @@ -83,8 +85,9 @@ gras_trp_sg_setup(gras_trp_plugin_t plug) { plug->socket_server = gras_trp_sg_socket_server; plug->socket_close = gras_trp_sg_socket_close; - plug->chunk_send = gras_trp_sg_chunk_send; - plug->chunk_recv = gras_trp_sg_chunk_recv; + plug->raw_send = gras_trp_sg_chunk_send_raw; + plug->send = gras_trp_sg_chunk_send; + plug->raw_recv = plug->recv = gras_trp_sg_chunk_recv; plug->flush = NULL; /* nothing cached */ } @@ -150,10 +153,10 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock){ gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self()); - gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_get("gras_trp"); + gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id); gras_sg_portrec_t pr; gras_trp_sg_sock_data_t *data; - int found; + volatile int found; const char *host=MSG_host_get_name(MSG_host_self()); @@ -235,71 +238,80 @@ typedef struct { void gras_trp_sg_chunk_send(gras_socket_t sock, const char *data, - unsigned long int size) { + unsigned long int size, + int stable_ignored) { + gras_trp_sg_chunk_send_raw(sock,data,size); +} + +void gras_trp_sg_chunk_send_raw(gras_socket_t sock, + const char *data, + unsigned long int size) { m_task_t task=NULL; static unsigned int count=0; char name[256]; gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)sock->data; sg_task_data_t *task_data; + xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); + sprintf(name,"Chunk[%d]",count++); task_data=xbt_new(sg_task_data_t,1); - task_data->data=(void*)xbt_malloc(size); task_data->size = size; - memcpy(task_data->data,data,size); + if (data) { + task_data->data=(void*)xbt_malloc(size); + memcpy(task_data->data,data,size); + } else { + task_data->data = NULL; + } - task=MSG_task_create(name,0,((double)size)/(1024.0*1024.0),task_data); + task=MSG_task_create(name,0,((double)size),task_data); DEBUG5("send chunk %s from %s to %s:%d (size=%ld)", name, MSG_host_get_name(MSG_host_self()), MSG_host_get_name(sock_data->to_host), sock_data->to_chan,size); - if (MSG_task_put(task, sock_data->to_host,sock_data->to_chan) != MSG_OK) { - THROW0(system_error,0,"Problem during the MSG_task_put"); + if (MSG_task_put_with_timeout(task, sock_data->to_host,sock_data->to_chan,60.0) != MSG_OK) { + THROW0(system_error,0,"Problem during the MSG_task_put with timeout 60"); } } -void gras_trp_sg_chunk_recv(gras_socket_t sock, +int gras_trp_sg_chunk_recv(gras_socket_t sock, char *data, unsigned long int size){ - gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_get("gras_trp"); + gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id); m_task_t task=NULL; sg_task_data_t *task_data; gras_trp_sg_sock_data_t *sock_data = sock->data; + xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets"); XBT_IN; DEBUG4("recv chunk on %s -> %s:%d (size=%ld)", MSG_host_get_name(sock_data->to_host), MSG_host_get_name(MSG_host_self()), sock_data->to_chan, size); - if (MSG_task_get(&task, (sock->meas ? pd->measChan : pd->chan)) != MSG_OK) + if (MSG_task_get_with_time_out(&task, + (sock->meas ? pd->measChan : pd->chan), + 60) != MSG_OK) THROW0(system_error,0,"Error in MSG_task_get()"); DEBUG1("Got chuck %s",MSG_task_get_name(task)); task_data = MSG_task_get_data(task); - if (size != -1) { - if (task_data->size != size) - THROW5(mismatch_error,0, - "Got %d bytes when %ld where expected (in %s->%s:%d)", - task_data->size, size, - MSG_host_get_name(sock_data->to_host), - MSG_host_get_name(MSG_host_self()), sock_data->to_chan); - memcpy(data,task_data->data,size); - } else { - /* damn, the size is embeeded at the begining of the chunk */ - int netsize; - - memcpy((char*)&netsize,task_data->data,4); - DEBUG1("netsize embeeded = %d",netsize); - - memcpy(data,task_data->data,netsize+4); - } - free(task_data->data); + if (task_data->size != size) + THROW5(mismatch_error,0, + "Got %d bytes when %ld where expected (in %s->%s:%d)", + task_data->size, size, + MSG_host_get_name(sock_data->to_host), + MSG_host_get_name(MSG_host_self()), sock_data->to_chan); + if (data) + memcpy(data,task_data->data,size); + if (task_data->data) + free(task_data->data); free(task_data); if (MSG_task_destroy(task) != MSG_OK) THROW0(system_error,0,"Error in MSG_task_destroy()"); XBT_OUT; + return size; }