X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/2479b96521c45f40637d8e4482c301b677266443..b0b60e5b550acd203efd832a918aa0f3a43b4a8f:/src/gras/Transport/transport_plugin_buf.c diff --git a/src/gras/Transport/transport_plugin_buf.c b/src/gras/Transport/transport_plugin_buf.c index d640ab7562..ed64bc3244 100644 --- a/src/gras/Transport/transport_plugin_buf.c +++ b/src/gras/Transport/transport_plugin_buf.c @@ -13,33 +13,36 @@ #include "portable.h" #include "xbt/misc.h" #include "xbt/sysdep.h" +#include "xbt/ex.h" #include "transport_private.h" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_buf,transport, "Generic buffered transport (works on top of TCP or SG)"); + +static gras_trp_plugin_t _buf_super; + /*** *** Prototypes ***/ void hexa_print(const char*name, unsigned char *data, int size); /* in gras.c */ -xbt_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self, - gras_socket_t sock); -xbt_error_t gras_trp_buf_socket_server(gras_trp_plugin_t *self, - gras_socket_t sock); -xbt_error_t gras_trp_buf_socket_accept(gras_socket_t sock, - gras_socket_t *dst); +void gras_trp_buf_socket_client(gras_trp_plugin_t self, + gras_socket_t sock); +void gras_trp_buf_socket_server(gras_trp_plugin_t self, + gras_socket_t sock); +gras_socket_t gras_trp_buf_socket_accept(gras_socket_t sock); void gras_trp_buf_socket_close(gras_socket_t sd); -xbt_error_t gras_trp_buf_chunk_send(gras_socket_t sd, - const char *data, - long int size); +void gras_trp_buf_chunk_send(gras_socket_t sd, + const char *data, + unsigned long int size); -xbt_error_t gras_trp_buf_chunk_recv(gras_socket_t sd, - char *data, - long int size); -xbt_error_t gras_trp_buf_flush(gras_socket_t sock); +void gras_trp_buf_chunk_recv(gras_socket_t sd, + char *data, + unsigned long int size); +void gras_trp_buf_flush(gras_socket_t sock); /*** @@ -47,7 +50,7 @@ xbt_error_t gras_trp_buf_flush(gras_socket_t sock); ***/ typedef struct { - gras_trp_plugin_t *super; + int junk; } gras_trp_buf_plug_data_t; /*** @@ -86,18 +89,16 @@ void gras_trp_buf_init_sock(gras_socket_t sock) { sock->bufdata = data; } - /*** *** Code ***/ -xbt_error_t -gras_trp_buf_setup(gras_trp_plugin_t *plug) { - xbt_error_t errcode; +void +gras_trp_buf_setup(gras_trp_plugin_t plug) { gras_trp_buf_plug_data_t *data =xbt_new(gras_trp_buf_plug_data_t,1); XBT_IN; - TRY(gras_trp_plugin_get_by_name(gras_if_RL() ? "tcp" : "sg", - &(data->super))); + _buf_super = gras_trp_plugin_get_by_name(gras_if_RL() ? "tcp" : "sg"); + DEBUG1("Derivate a buffer plugin from %s",gras_if_RL() ? "tcp" : "sg"); plug->socket_client = gras_trp_buf_socket_client; @@ -112,21 +113,15 @@ gras_trp_buf_setup(gras_trp_plugin_t *plug) { plug->data = (void*)data; plug->exit = NULL; - - return no_error; } -xbt_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self, - /* OUT */ gras_socket_t sock){ - xbt_error_t errcode; - gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)self->data)->super; +void gras_trp_buf_socket_client(gras_trp_plugin_t self, + /* OUT */ gras_socket_t sock){ XBT_IN; - TRY(super->socket_client(super,sock)); + _buf_super->socket_client(_buf_super,sock); sock->plugin = self; gras_trp_buf_init_sock(sock); - - return no_error; } /** @@ -134,34 +129,29 @@ xbt_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self, * * Open a socket used to receive messages. */ -xbt_error_t gras_trp_buf_socket_server(gras_trp_plugin_t *self, - /* OUT */ gras_socket_t sock){ - xbt_error_t errcode; - gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)self->data)->super; +void gras_trp_buf_socket_server(gras_trp_plugin_t self, + /* OUT */ gras_socket_t sock){ XBT_IN; - TRY(super->socket_server(super,sock)); + _buf_super->socket_server(_buf_super,sock); sock->plugin = self; gras_trp_buf_init_sock(sock); - return no_error; } -xbt_error_t -gras_trp_buf_socket_accept(gras_socket_t sock, - gras_socket_t *dst) { - xbt_error_t errcode; - gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super; +gras_socket_t +gras_trp_buf_socket_accept(gras_socket_t sock) { + + gras_socket_t res; XBT_IN; - TRY(super->socket_accept(sock,dst)); - (*dst)->plugin = sock->plugin; - gras_trp_buf_init_sock(*dst); + res = _buf_super->socket_accept(sock); + res->plugin = sock->plugin; + gras_trp_buf_init_sock(res); XBT_OUT; - return no_error; + return res; } void gras_trp_buf_socket_close(gras_socket_t sock){ - gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super; gras_trp_bufdata_t *data=sock->bufdata; XBT_IN; @@ -181,20 +171,19 @@ void gras_trp_buf_socket_close(gras_socket_t sock){ free(data->out.data); free(data); - super->socket_close(sock); + _buf_super->socket_close(sock); } /** * gras_trp_buf_chunk_send: * - * Send data on a TCP socket + * Send data on a buffered socket */ -xbt_error_t +void gras_trp_buf_chunk_send(gras_socket_t sock, const char *chunk, - long int size) { + unsigned long int size) { - xbt_error_t errcode; gras_trp_bufdata_t *data=(gras_trp_bufdata_t*)sock->bufdata; int chunk_pos=0; @@ -218,28 +207,26 @@ gras_trp_buf_chunk_send(gras_socket_t sock, data->out.size,size-chunk_pos,size,(int)chunk_pos,chunk); if (data->out.size == data->buffsize) /* out of space. Flush it */ - TRY(gras_trp_buf_flush(sock)); + gras_trp_buf_flush(sock); } XBT_OUT; - return no_error; } /** * gras_trp_buf_chunk_recv: * - * Receive data on a TCP socket. + * Receive data on a buffered socket. */ -xbt_error_t +void gras_trp_buf_chunk_recv(gras_socket_t sock, char *chunk, - long int size) { + unsigned long int size) { - xbt_error_t errcode; - gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super; + xbt_ex_t e; gras_trp_bufdata_t *data=sock->bufdata; long int chunck_pos = 0; - + /* Let underneath plugin check for direction, we work even in duplex */ xbt_assert0(sock, "Cannot recv on an NULL socket"); xbt_assert0(size >= 0, "Cannot receive a negative amount of data"); @@ -254,14 +241,19 @@ gras_trp_buf_chunk_recv(gras_socket_t sock, int nextsize; if (gras_if_RL()) { DEBUG0("Recv the size"); - TRY(super->chunk_recv(sock,(char*)&nextsize, 4)); + TRY { + _buf_super->chunk_recv(sock,(char*)&nextsize, 4); + } CATCH(e) { + RETHROW3("Unable to get the chunk size on %p (peer = %s:%d): %s", + sock,gras_socket_peer_name(sock),gras_socket_peer_port(sock)); + } data->in.size = (int)ntohl(nextsize); VERB1("Recv the chunk (size=%d)",data->in.size); } else { data->in.size = -1; } - TRY(super->chunk_recv(sock, data->in.data, data->in.size)); + _buf_super->chunk_recv(sock, data->in.data, data->in.size); if (gras_if_RL()) { data->in.pos=0; @@ -272,7 +264,7 @@ gras_trp_buf_chunk_recv(gras_socket_t sock, VERB3("Got the chunk (size=%d+4 for the size ifself)='%.*s'", data->in.size-4, data->in.size,data->in.data); if (XBT_LOG_ISENABLED(trp_buf,xbt_log_priority_debug)) - hexa_print("chunck received",data->in.data,data->in.size); + hexa_print("chunck received",(unsigned char *) data->in.data,data->in.size); } } @@ -290,7 +282,6 @@ gras_trp_buf_chunk_recv(gras_socket_t sock, } XBT_OUT; - return no_error; } /** @@ -298,20 +289,18 @@ gras_trp_buf_chunk_recv(gras_socket_t sock, * * Make sure the data is sent */ -xbt_error_t +void gras_trp_buf_flush(gras_socket_t sock) { - xbt_error_t errcode; int size; - gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super; gras_trp_bufdata_t *data=sock->bufdata; - - XBT_IN; + XBT_IN; + DEBUG0("Flush"); if (XBT_LOG_ISENABLED(trp_buf,xbt_log_priority_debug)) - hexa_print("chunck to send ",data->out.data,data->out.size); + hexa_print("chunck to send ",(unsigned char *) data->out.data,data->out.size); if ((data->out.size - data->out.pos) == (gras_if_RL()?0:4) ) { /* 4 first bytes=size in SG mode*/ DEBUG2("Nothing to flush (size=%d; pos=%d)",data->out.size,data->out.pos); - return no_error; + return; } size = (int)data->out.size - data->out.pos; @@ -319,7 +308,7 @@ gras_trp_buf_flush(gras_socket_t sock) { gras_socket_peer_name(sock),gras_socket_peer_port(sock)); if (gras_if_RL()) { size = (int)htonl(size); - TRY(super->chunk_send(sock,(char*) &size, 4)); + _buf_super->chunk_send(sock,(char*) &size, 4); } else { memcpy(data->out.data, &size, 4); } @@ -327,10 +316,9 @@ gras_trp_buf_flush(gras_socket_t sock) { DEBUG3("Send the chunk (size=%d) to %s:%d",data->out.size, gras_socket_peer_name(sock),gras_socket_peer_port(sock)); - TRY(super->chunk_send(sock, data->out.data, data->out.size)); + _buf_super->chunk_send(sock, data->out.data, data->out.size); VERB1("Chunk sent (size=%d)",data->out.size); if (XBT_LOG_ISENABLED(trp_buf,xbt_log_priority_debug)) - hexa_print("chunck sent ",data->out.data,data->out.size); + hexa_print("chunck sent ",(unsigned char *) data->out.data,data->out.size); data->out.size = gras_if_RL()?0:4; - return no_error; }