Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
New functionality: possibility for libraries to register globals on each process...
[simgrid.git] / src / gras / Transport / transport_plugin_buf.c
1 /* $Id$ */
2
3 /* buf trp (transport) - buffered transport using the TCP one            */
4
5 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include <stdlib.h>
11 #include <string.h>       /* memset */
12
13 #include "portable.h"
14 #include "xbt/misc.h"
15 #include "xbt/sysdep.h"
16 #include "transport_private.h"
17
18 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_buf,transport,
19       "Generic buffered transport (works on top of TCP or SG)");
20
21 /***
22  *** Prototypes 
23  ***/
24 xbt_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self,
25                                         gras_socket_t sock);
26 xbt_error_t gras_trp_buf_socket_server(gras_trp_plugin_t *self,
27                                         gras_socket_t sock);
28 xbt_error_t gras_trp_buf_socket_accept(gras_socket_t sock,
29                                         gras_socket_t *dst);
30
31 void         gras_trp_buf_socket_close(gras_socket_t sd);
32   
33 xbt_error_t gras_trp_buf_chunk_send(gras_socket_t sd,
34                                      const char *data,
35                                      long int size);
36
37 xbt_error_t gras_trp_buf_chunk_recv(gras_socket_t sd,
38                                      char *data,
39                                      long int size);
40 xbt_error_t gras_trp_buf_flush(gras_socket_t sock);
41
42
43 /***
44  *** Specific plugin part
45  ***/
46
47 typedef struct {
48   gras_trp_plugin_t *super;
49 } gras_trp_buf_plug_data_t;
50
51 /***
52  *** Specific socket part
53  ***/
54
55 typedef struct {
56   int size;
57   char *data;
58   int pos; /* for receive; not exchanged over the net */
59 } gras_trp_buf_t;
60
61 struct gras_trp_bufdata_{
62   gras_trp_buf_t in;
63   gras_trp_buf_t out;
64   int buffsize;
65 };
66
67 void gras_trp_buf_init_sock(gras_socket_t sock) {
68   gras_trp_bufdata_t *data=xbt_new(gras_trp_bufdata_t,1);
69   
70   XBT_IN;
71   data->buffsize = 100 * 1024 ; /* 100k */ 
72
73   data->in.size  = 0;
74   data->in.data  = xbt_malloc(data->buffsize);
75   data->in.pos   = 0; /* useless, indeed, since size==pos */
76    
77   data->out.size = 0;
78   data->out.data = xbt_malloc(data->buffsize);
79   data->out.pos  = 0;
80    
81   sock->bufdata = data;
82 }
83
84
85 /***
86  *** Code
87  ***/
88 xbt_error_t
89 gras_trp_buf_setup(gras_trp_plugin_t *plug) {
90   xbt_error_t errcode;
91   gras_trp_buf_plug_data_t *data =xbt_new(gras_trp_buf_plug_data_t,1);
92
93   XBT_IN;
94   TRY(gras_trp_plugin_get_by_name(gras_if_RL() ? "tcp" : "sg",
95                                   &(data->super)));
96   DEBUG1("Derivate a buffer plugin from %s",gras_if_RL() ? "tcp" : "sg");
97
98   plug->socket_client = gras_trp_buf_socket_client;
99   plug->socket_server = gras_trp_buf_socket_server;
100   plug->socket_accept = gras_trp_buf_socket_accept;
101   plug->socket_close  = gras_trp_buf_socket_close;
102
103   plug->chunk_send    = gras_trp_buf_chunk_send;
104   plug->chunk_recv    = gras_trp_buf_chunk_recv;
105
106   plug->flush         = gras_trp_buf_flush;
107
108   plug->data = (void*)data;
109   plug->exit = NULL;
110   
111   return no_error;
112 }
113
114 xbt_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self,
115                                         /* OUT */ gras_socket_t sock){
116   xbt_error_t errcode;
117   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)self->data)->super;
118
119   XBT_IN;
120   TRY(super->socket_client(super,sock));
121   sock->plugin = self;
122   gras_trp_buf_init_sock(sock);
123     
124   return no_error;
125 }
126
127 /**
128  * gras_trp_buf_socket_server:
129  *
130  * Open a socket used to receive messages.
131  */
132 xbt_error_t gras_trp_buf_socket_server(gras_trp_plugin_t *self,
133                                         /* OUT */ gras_socket_t sock){
134   xbt_error_t errcode;
135   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)self->data)->super;
136
137   XBT_IN;
138   TRY(super->socket_server(super,sock));
139   sock->plugin = self;
140   gras_trp_buf_init_sock(sock);
141   return no_error;
142 }
143
144 xbt_error_t
145 gras_trp_buf_socket_accept(gras_socket_t  sock,
146                            gras_socket_t *dst) {
147   xbt_error_t errcode;
148   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
149       
150   XBT_IN;
151   TRY(super->socket_accept(sock,dst));
152   (*dst)->plugin = sock->plugin;
153   gras_trp_buf_init_sock(*dst);
154   XBT_OUT;
155   return no_error;
156 }
157
158 void gras_trp_buf_socket_close(gras_socket_t sock){
159   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
160   gras_trp_bufdata_t *data=sock->bufdata;
161
162   XBT_IN;
163   if (data->in.size || data->out.size)
164     gras_trp_buf_flush(sock);
165   if (data->in.data)
166     xbt_free(data->in.data);
167   if (data->out.data)
168     xbt_free(data->out.data);
169   xbt_free(data);
170
171   super->socket_close(sock);
172 }
173
174 /**
175  * gras_trp_buf_chunk_send:
176  *
177  * Send data on a TCP socket
178  */
179 xbt_error_t 
180 gras_trp_buf_chunk_send(gras_socket_t sock,
181                         const char *chunk,
182                         long int size) {
183
184   xbt_error_t errcode;
185   gras_trp_bufdata_t *data=(gras_trp_bufdata_t*)sock->bufdata;
186   int chunk_pos=0;
187
188   XBT_IN;
189   /* Let underneath plugin check for direction, we work even in duplex */
190   xbt_assert0(size >= 0, "Cannot send a negative amount of data");
191
192   while (chunk_pos < size) {
193     /* size of the chunck to receive in that shot */
194     long int thissize = min(size-chunk_pos,data->buffsize - data->out.size);
195     DEBUG5("Set the chars %d..%ld into the buffer (size=%ld, ctn='%.*s')",
196            (int)data->out.size,
197            ((int)data->out.size) + thissize -1,
198            size, chunk_pos, chunk);
199
200     memcpy(data->out.data + data->out.size, chunk + chunk_pos, thissize);
201
202     data->out.size += thissize;
203     chunk_pos      += thissize;
204     DEBUG5("New pos = %d; Still to send = %ld of %ld; ctn sofar='%.*s'",
205            data->out.size,size-chunk_pos,size,(int)chunk_pos,chunk);
206
207     if (data->out.size == data->buffsize) /* out of space. Flush it */
208       TRY(gras_trp_buf_flush(sock));
209   }
210
211   XBT_OUT;
212   return no_error;
213 }
214
215 /**
216  * gras_trp_buf_chunk_recv:
217  *
218  * Receive data on a TCP socket.
219  */
220 xbt_error_t 
221 gras_trp_buf_chunk_recv(gras_socket_t sock,
222                         char *chunk,
223                         long int size) {
224
225   xbt_error_t errcode;
226   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
227   gras_trp_bufdata_t *data=sock->bufdata;
228   long int chunck_pos = 0;
229
230   /* Let underneath plugin check for direction, we work even in duplex */
231   xbt_assert0(sock, "Cannot recv on an NULL socket");
232   xbt_assert0(size >= 0, "Cannot receive a negative amount of data");
233   
234   XBT_IN;
235
236   while (chunck_pos < size) {
237     /* size of the chunck to receive in that shot */
238     long int thissize;
239
240     if (data->in.size == data->in.pos) { /* out of data. Get more */
241       int nextsize;
242       DEBUG0("Recv the size");
243       TRY(super->chunk_recv(sock,(char*)&nextsize, 4));
244       data->in.size = (int)ntohl(nextsize);
245
246       VERB1("Recv the chunk (size=%d)",data->in.size);
247       TRY(super->chunk_recv(sock, data->in.data, data->in.size));
248       data->in.pos=0;
249     }
250     
251     thissize = min(size-chunck_pos ,  data->in.size - data->in.pos);
252     DEBUG2("Get the chars %d..%ld out of the buffer",
253            data->in.pos,
254            data->in.pos + thissize - 1);
255     memcpy(chunk+chunck_pos, data->in.data + data->in.pos, thissize);
256
257     data->in.pos += thissize;
258     chunck_pos   += thissize;
259     DEBUG5("New pos = %d; Still to receive = %ld of %ld. Ctn so far='%.*s'",
260            data->in.pos,size - chunck_pos,size,(int)chunck_pos,chunk);
261   }
262
263   XBT_OUT;
264   return no_error;
265 }
266
267 /**
268  * gras_trp_buf_flush:
269  *
270  * Make sure the data is sent
271  */
272 xbt_error_t 
273 gras_trp_buf_flush(gras_socket_t sock) {
274   xbt_error_t errcode;
275   int size;
276   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
277   gras_trp_bufdata_t *data=sock->bufdata;
278
279   XBT_IN;
280   size = (int)htonl(data->out.size);
281   DEBUG1("Send the size (=%d)",data->out.size);
282   TRY(super->chunk_send(sock,(char*) &size, 4));
283
284   DEBUG1("Send the chunk (size=%d)",data->out.size);
285   TRY(super->chunk_send(sock, data->out.data, data->out.size));
286   VERB1("Chunk sent (size=%d)",data->out.size);
287   data->out.size = 0;
288   return no_error;
289 }