X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/bf8ef4cdb70ddc05c6b9b01337c21a66552ce878..a1a3a7bbd6433b0dab8dad372b9a2e0b2719cc8f:/src/gras/Transport/transport_plugin_sg.c diff --git a/src/gras/Transport/transport_plugin_sg.c b/src/gras/Transport/transport_plugin_sg.c index e849299a16..c12cb01f85 100644 --- a/src/gras/Transport/transport_plugin_sg.c +++ b/src/gras/Transport/transport_plugin_sg.c @@ -22,6 +22,8 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_sg,transport,"SimGrid pseudo-transport"); /*** *** Prototypes ***/ +void hexa_print(unsigned char *data, int size); /* in gras.c */ + /* retrieve the port record associated to a numerical port on an host */ static xbt_error_t find_port(gras_hostdata_t *hd, int port, gras_sg_portrec_t *hpd); @@ -34,12 +36,12 @@ xbt_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self, void gras_trp_sg_socket_close(gras_socket_t sd); xbt_error_t gras_trp_sg_chunk_send(gras_socket_t sd, - const char *data, - long int size); + const char *data, + unsigned long int size); xbt_error_t gras_trp_sg_chunk_recv(gras_socket_t sd, - char *data, - long int size); + char *data, + unsigned long int size); /*** *** Specific plugin part @@ -118,14 +120,14 @@ xbt_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self, sock->peer_name,sock->peer_port); } - if (pr.raw && !sock->raw) { + if (pr.meas && !sock->meas) { RAISE2(mismatch_error, "can't connect to %s:%d in regular mode, the process listen " - "in raw mode on this port",sock->peer_name,sock->peer_port); + "in meas mode on this port",sock->peer_name,sock->peer_port); } - if (!pr.raw && sock->raw) { + if (!pr.meas && sock->meas) { RAISE2(mismatch_error, - "can't connect to %s:%d in raw mode, the process listen " + "can't connect to %s:%d in meas mode, the process listen " "in regular mode on this port",sock->peer_name,sock->peer_port); } @@ -141,7 +143,7 @@ xbt_error_t gras_trp_sg_socket_client(gras_trp_plugin_t *self, DEBUG6("%s (PID %d) connects in %s mode to %s:%d (to_PID=%d)", MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(), - sock->raw?"RAW":"regular", + sock->meas?"meas":"regular", sock->peer_name,sock->peer_port,data->to_PID); return no_error; @@ -153,7 +155,7 @@ xbt_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self, xbt_error_t errcode; gras_hostdata_t *hd=(gras_hostdata_t *)MSG_host_get_data(MSG_host_self()); - gras_procdata_t *pd=gras_procdata_get(); + gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_get("gras_trp"); gras_sg_portrec_t pr; gras_trp_sg_sock_data_t *data; @@ -172,9 +174,9 @@ xbt_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self, break; case mismatch_error: /* Port not used so far. Do it */ - pr.tochan = sock->raw ? pd->rawChan : pd->chan; + pr.tochan = sock->meas ? pd->measChan : pd->chan; pr.port = sock->port; - pr.raw = sock->raw; + pr.meas = sock->meas; xbt_dynar_push(hd->ports,&pr); break; @@ -193,7 +195,7 @@ xbt_error_t gras_trp_sg_socket_server(gras_trp_plugin_t *self, VERB6("'%s' (%d) ears on %s:%d%s (%p)", MSG_process_get_name(MSG_process_self()), MSG_process_self_PID(), - host,sock->port,sock->raw? " (mode RAW)":"",sock); + host,sock->port,sock->meas? " (mode meas)":"",sock); return no_error; } @@ -204,23 +206,28 @@ void gras_trp_sg_socket_close(gras_socket_t sock){ gras_sg_portrec_t pr; + XBT_IN1(" (sock=%p)",sock); + if (!sock) return; xbt_assert0(hd,"Please run gras_process_init on each process"); if (sock->data) - xbt_free(sock->data); + free(sock->data); - if (sock->incoming) { - /* server mode socket. Un register it from 'OS' tables */ + if (sock->incoming && sock->port >= 0) { + /* server mode socket. Unregister it from 'OS' tables */ xbt_dynar_foreach(hd->ports, cpt, pr) { DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports)); if (pr.port == sock->port) { xbt_dynar_cursor_rm(hd->ports, &cpt); - return; + XBT_OUT; + return; } } - WARN0("socket_close called on an unknown socket"); + WARN2("socket_close called on the unknown incoming socket %p (port=%d)", + sock,sock->port); } + XBT_OUT; } typedef struct { @@ -229,8 +236,8 @@ typedef struct { } sg_task_data_t; xbt_error_t gras_trp_sg_chunk_send(gras_socket_t sock, - const char *data, - long int size) { + const char *data, + unsigned long int size) { m_task_t task=NULL; static unsigned int count=0; char name[256]; @@ -257,9 +264,9 @@ xbt_error_t gras_trp_sg_chunk_send(gras_socket_t sock, } xbt_error_t gras_trp_sg_chunk_recv(gras_socket_t sock, - char *data, - long int size){ - gras_procdata_t *pd=gras_procdata_get(); + char *data, + unsigned long int size){ + gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_get("gras_trp"); m_task_t task=NULL; sg_task_data_t *task_data; @@ -269,19 +276,30 @@ xbt_error_t gras_trp_sg_chunk_recv(gras_socket_t sock, DEBUG4("recv chunk on %s -> %s:%d (size=%ld)", MSG_host_get_name(sock_data->to_host), MSG_host_get_name(MSG_host_self()), sock_data->to_chan, size); - if (MSG_task_get(&task, (sock->raw ? pd->rawChan : pd->chan)) != MSG_OK) + if (MSG_task_get(&task, (sock->meas ? pd->measChan : pd->chan)) != MSG_OK) RAISE0(unknown_error,"Error in MSG_task_get()"); + DEBUG1("Got chuck %s",MSG_task_get_name(task)); task_data = MSG_task_get_data(task); - if (task_data->size != size) - RAISE5(mismatch_error, - "Got %d bytes when %ld where expected (in %s->%s:%d)", - task_data->size, size, - MSG_host_get_name(sock_data->to_host), - MSG_host_get_name(MSG_host_self()), sock_data->to_chan); - memcpy(data,task_data->data,size); - xbt_free(task_data->data); - xbt_free(task_data); + if (size != -1) { + if (task_data->size != size) + RAISE5(mismatch_error, + "Got %d bytes when %ld where expected (in %s->%s:%d)", + task_data->size, size, + MSG_host_get_name(sock_data->to_host), + MSG_host_get_name(MSG_host_self()), sock_data->to_chan); + memcpy(data,task_data->data,size); + } else { + /* damn, the size is embeeded at the begining of the chunk */ + int netsize; + + memcpy((char*)&netsize,task_data->data,4); + DEBUG1("netsize embeeded = %d",netsize); + + memcpy(data,task_data->data,netsize+4); + } + free(task_data->data); + free(task_data); if (MSG_task_destroy(task) != MSG_OK) RAISE0(unknown_error,"Error in MSG_task_destroy()"); @@ -290,8 +308,9 @@ xbt_error_t gras_trp_sg_chunk_recv(gras_socket_t sock, return no_error; } -/* Data exchange over raw sockets */ -xbt_error_t gras_socket_raw_exchange(gras_socket_t peer, +#if 0 +/* Data exchange over measurement sockets */ +xbt_error_t gras_socket_meas_exchange(gras_socket_t peer, int sender, unsigned int timeout, unsigned long int expSize, @@ -302,7 +321,7 @@ xbt_error_t gras_socket_raw_exchange(gras_socket_t peer, char name[256]; gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)peer->data; - gras_procdata_t *pd=gras_procdata_get(); + gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_get("gras_trp"); double startTime; startTime=gras_os_time(); /* used only in sender mode */ @@ -311,11 +330,11 @@ xbt_error_t gras_socket_raw_exchange(gras_socket_t peer, if (sender) { - sprintf(name,"Raw data[%d]",count++); + sprintf(name,"meas data[%d]",count++); task=MSG_task_create(name,0,((double)msgSize)/(1024.0*1024.0),NULL); - DEBUG5("%f:%s: gras_socket_raw_send(%f %s -> %s) BEGIN", + DEBUG5("%f:%s: gras_socket_meas_send(%f %s -> %s) BEGIN", gras_os_time(), MSG_process_get_name(MSG_process_self()), ((double)msgSize)/(1024.0*1024.0), MSG_host_get_name( MSG_host_self()), peer->peer_name); @@ -323,7 +342,7 @@ xbt_error_t gras_socket_raw_exchange(gras_socket_t peer, if (MSG_task_put(task, sock_data->to_host,sock_data->to_chan) != MSG_OK) RAISE0(system_error,"Problem during the MSG_task_put()"); - DEBUG5("%f:%s: gras_socket_raw_send(%f %s -> %s) END", + DEBUG5("%f:%s: gras_socket_meas_send(%f %s -> %s) END", gras_os_time(), MSG_process_get_name(MSG_process_self()), ((double)msgSize)/(1024.0*1024.0), MSG_host_get_name( MSG_host_self()), peer->peer_name); @@ -331,11 +350,11 @@ xbt_error_t gras_socket_raw_exchange(gras_socket_t peer, } else { /* we are receiver, simulate a select */ task=NULL; - DEBUG2("%f:%s: gras_socket_raw_recv() BEGIN\n", + DEBUG2("%f:%s: gras_socket_meas_recv() BEGIN\n", gras_os_time(), MSG_process_get_name(MSG_process_self())); do { - if (MSG_task_Iprobe((m_channel_t) pd->rawChan)) { - if (MSG_task_get(&task, (m_channel_t) pd->rawChan) != MSG_OK) { + if (MSG_task_Iprobe((m_channel_t) pd->measChan)) { + if (MSG_task_get(&task, (m_channel_t) pd->measChan) != MSG_OK) { fprintf(stderr,"GRAS: Error in MSG_task_get()\n"); return unknown_error; } @@ -345,7 +364,7 @@ xbt_error_t gras_socket_raw_exchange(gras_socket_t peer, return unknown_error; } - DEBUG2("%f:%s: gras_socket_raw_recv() END\n", + DEBUG2("%f:%s: gras_socket_meas_recv() END\n", gras_os_time(), MSG_process_get_name(MSG_process_self())); break; } else { @@ -361,3 +380,4 @@ xbt_error_t gras_socket_raw_exchange(gras_socket_t peer, return no_error; } +#endif