From: mquinson Date: Mon, 30 May 2005 15:45:36 +0000 (+0000) Subject: embeed the buffer size within the buffer itself on SG. That way, send are atomic... X-Git-Tag: v3.3~4054 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/a6a92abfb622b4e667e6853a5f39f6f003007588?hp=284832ed09bcd8f66de2c3e6287fb11bb10dc9be embeed the buffer size within the buffer itself on SG. That way, send are atomic and cannot get intermixed anymore (at least the ones which are less than 100k) git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@1301 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/src/gras/Transport/transport_plugin_buf.c b/src/gras/Transport/transport_plugin_buf.c index bbfe9dda12..7439836096 100644 --- a/src/gras/Transport/transport_plugin_buf.c +++ b/src/gras/Transport/transport_plugin_buf.c @@ -21,6 +21,8 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_buf,transport, /*** *** Prototypes ***/ +hexa_print(const char*name, unsigned char *data, int size); /* in gras.c */ + xbt_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self, gras_socket_t sock); xbt_error_t gras_trp_buf_socket_server(gras_trp_plugin_t *self, @@ -74,9 +76,12 @@ void gras_trp_buf_init_sock(gras_socket_t sock) { data->in.data = xbt_malloc(data->buffsize); data->in.pos = 0; /* useless, indeed, since size==pos */ - data->out.size = 0; + /* In SG, the 4 first bytes are for the chunk size as htonl'ed, so that we can send it in one shoot. + * This is mandatory in SG because all emissions go to the same channel, so if we split them, + * they can get mixed. */ + data->out.size = gras_if_RL()?0:4; data->out.data = xbt_malloc(data->buffsize); - data->out.pos = 0; + data->out.pos = data->out.size; sock->bufdata = data; } @@ -246,15 +251,31 @@ gras_trp_buf_chunk_recv(gras_socket_t sock, if (data->in.size == data->in.pos) { /* out of data. Get more */ int nextsize; - DEBUG0("Recv the size"); - TRY(super->chunk_recv(sock,(char*)&nextsize, 4)); - data->in.size = (int)ntohl(nextsize); - - VERB1("Recv the chunk (size=%d)",data->in.size); + if (gras_if_RL()) { + DEBUG0("Recv the size"); + TRY(super->chunk_recv(sock,(char*)&nextsize, 4)); + data->in.size = (int)ntohl(nextsize); + VERB1("Recv the chunk (size=%d)",data->in.size); + } else { + data->in.size = -1; + } + TRY(super->chunk_recv(sock, data->in.data, data->in.size)); - data->in.pos=0; + + if (gras_if_RL()) { + data->in.pos=0; + } else { + memcpy((char*)&nextsize,data->in.data,4); + data->in.size = (int)ntohl(nextsize)+4; + data->in.pos=4; + VERB3("Got the chunk (size=%d+4 for the size ifself)='%.*s'",data->in.size-4, + data->in.size,data->in.data); + if (XBT_LOG_ISENABLED(trp_buf,xbt_log_priority_debug)) + hexa_print("chunck received",data->in.data,data->in.size); + } + } - + thissize = min(size-chunck_pos , data->in.size - data->in.pos); DEBUG2("Get the chars %d..%ld out of the buffer", data->in.pos, @@ -284,18 +305,30 @@ gras_trp_buf_flush(gras_socket_t sock) { gras_trp_bufdata_t *data=sock->bufdata; XBT_IN; - if (! (data->out.size-data->out.pos) ) { - DEBUG0("Nothing to flush"); + DEBUG0("Flush"); + if (XBT_LOG_ISENABLED(trp_buf,xbt_log_priority_debug)) + hexa_print("chunck to send",data->out.data,data->out.size); + if ((data->out.size - data->out.pos) == (gras_if_RL()?0:4) ) { /* 4 first bytes=size in SG mode*/ + DEBUG2("Nothing to flush (size=%d; pos=%d)",data->out.size,data->out.pos); return no_error; } - size = (int)htonl(data->out.size-data->out.pos); - DEBUG1("Send the size (=%d)",data->out.size-data->out.pos); - TRY(super->chunk_send(sock,(char*) &size, 4)); + size = (int)htonl(data->out.size - data->out.pos); + DEBUG4("%s the size (=%d) to %s:%d",(gras_if_RL()?"Send":"Embeed"),data->out.size-data->out.pos, + gras_socket_peer_name(sock),gras_socket_peer_port(sock)); + if (gras_if_RL()) { + TRY(super->chunk_send(sock,(char*) &size, 4)); + } else { + memcpy(data->out.data, &size, 4); + } + - DEBUG1("Send the chunk (size=%d)",data->out.size); + DEBUG3("Send the chunk (size=%d) to %s:%d",data->out.size, + gras_socket_peer_name(sock),gras_socket_peer_port(sock)); TRY(super->chunk_send(sock, data->out.data, data->out.size)); VERB1("Chunk sent (size=%d)",data->out.size); + if (XBT_LOG_ISENABLED(trp_buf,xbt_log_priority_debug)) + hexa_print("chunck sent",data->out.data,data->out.size); data->out.size = 0; return no_error; } diff --git a/src/gras/Transport/transport_plugin_sg.c b/src/gras/Transport/transport_plugin_sg.c index 90897171f2..7b0425d269 100644 --- a/src/gras/Transport/transport_plugin_sg.c +++ b/src/gras/Transport/transport_plugin_sg.c @@ -22,6 +22,8 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_sg,transport,"SimGrid pseudo-transport"); /*** *** Prototypes ***/ +hexa_print(unsigned char *data, int size); /* in gras.c */ + /* retrieve the port record associated to a numerical port on an host */ static xbt_error_t find_port(gras_hostdata_t *hd, int port, gras_sg_portrec_t *hpd); @@ -278,13 +280,24 @@ xbt_error_t gras_trp_sg_chunk_recv(gras_socket_t sock, DEBUG1("Got chuck %s",MSG_task_get_name(task)); task_data = MSG_task_get_data(task); - if (task_data->size != size) - RAISE5(mismatch_error, - "Got %d bytes when %ld where expected (in %s->%s:%d)", - task_data->size, size, - MSG_host_get_name(sock_data->to_host), - MSG_host_get_name(MSG_host_self()), sock_data->to_chan); - memcpy(data,task_data->data,size); + if (size != -1) { + if (task_data->size != size) + RAISE5(mismatch_error, + "Got %d bytes when %ld where expected (in %s->%s:%d)", + task_data->size, size, + MSG_host_get_name(sock_data->to_host), + MSG_host_get_name(MSG_host_self()), sock_data->to_chan); + memcpy(data,task_data->data,size); + } else { + /* damn, the size is embeeded at the begining of the chunk */ + int netsize; + + memcpy((char*)&netsize,task_data->data,4); + netsize = (int)ntohl(netsize); + DEBUG1("netsize embeeded = %d",netsize); + + memcpy(data,task_data->data,netsize+4); + } free(task_data->data); free(task_data);