Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
46638bec979dabfd8a581c1c2cd0e15e19d95d34
[simgrid.git] / src / gras / Transport / transport_plugin_buf.c
1 /* $Id$ */
2
3 /* buf trp (transport) - buffered transport using the TCP one            */
4
5 /* Authors: Martin Quinson                                                  */
6 /* Copyright (C) 2004 Martin Quinson.                                       */
7
8 /* This program is free software; you can redistribute it and/or modify it
9    under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include <netinet/in.h>   /* htonl/ntohl */
12 #include <stdlib.h>
13 #include <string.h>       /* memset */
14
15 #include "gras_private.h"
16 #include "transport_private.h"
17
18 GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(trp_buf,transport,
19       "Generic buffered transport (works on top of TCP or SG)");
20
21 /***
22  *** Prototypes 
23  ***/
24 gras_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self,
25                                         /* OUT */ gras_socket_t *sock);
26 gras_error_t gras_trp_buf_socket_server(gras_trp_plugin_t *self,
27                                         /* OUT */ gras_socket_t *sock);
28 gras_error_t gras_trp_buf_socket_accept(gras_socket_t  *sock,
29                                         gras_socket_t **dst);
30
31 void         gras_trp_buf_socket_close(gras_socket_t *sd);
32   
33 gras_error_t gras_trp_buf_chunk_send(gras_socket_t *sd,
34                                      const char *data,
35                                      long int size);
36
37 gras_error_t gras_trp_buf_chunk_recv(gras_socket_t *sd,
38                                      char *data,
39                                      long int size);
40 gras_error_t gras_trp_buf_flush(gras_socket_t  *sock);
41
42
43 /***
44  *** Specific plugin part
45  ***/
46
47 typedef struct {
48   gras_trp_plugin_t *super;
49 } gras_trp_buf_plug_data_t;
50
51 /***
52  *** Specific socket part
53  ***/
54
55 typedef struct {
56   uint32_t size;
57   char *data;
58   int pos; /* for receive; not exchanged over the net */
59 } gras_trp_buf_t;
60
61 struct gras_trp_bufdata_{
62   gras_trp_buf_t in;
63   gras_trp_buf_t out;
64   int buffsize;
65 };
66
67 gras_error_t gras_trp_buf_init_sock(gras_socket_t *sock) {
68   gras_trp_bufdata_t *data=gras_new(gras_trp_bufdata_t,1);
69   
70   GRAS_IN;
71   if (!data)
72     RAISE_MALLOC;
73   data->in.size  = 0;
74   data->buffsize = 100 * 1024 ; /* 100k */ 
75
76   if (!(data->in.data = (char*)gras_malloc(data->buffsize)))
77     RAISE_MALLOC;
78   data->in.pos   = 0; /* useless, indeed, since size==pos */
79   data->out.size = 0;
80   if (!(data->out.data = (char*)gras_malloc(data->buffsize)))
81     RAISE_MALLOC;
82   data->out.pos  = 0;
83   sock->bufdata = data;
84   return no_error;
85 }
86
87
88 /***
89  *** Code
90  ***/
91 gras_error_t
92 gras_trp_buf_setup(gras_trp_plugin_t *plug) {
93   gras_error_t errcode;
94   gras_trp_buf_plug_data_t *data =gras_new(gras_trp_buf_plug_data_t,1);
95   if (!data)
96     RAISE_MALLOC;
97
98   GRAS_IN;
99   TRY(gras_trp_plugin_get_by_name(gras_if_RL() ? "tcp" : "sg",
100                                   &(data->super)));
101   DEBUG1("Derivate a buffer plugin from %s",gras_if_RL() ? "tcp" : "sg");
102
103   plug->socket_client = gras_trp_buf_socket_client;
104   plug->socket_server = gras_trp_buf_socket_server;
105   plug->socket_accept = gras_trp_buf_socket_accept;
106   plug->socket_close  = gras_trp_buf_socket_close;
107
108   plug->chunk_send    = gras_trp_buf_chunk_send;
109   plug->chunk_recv    = gras_trp_buf_chunk_recv;
110
111   plug->flush         = gras_trp_buf_flush;
112
113   plug->data = (void*)data;
114   plug->exit = NULL;
115   
116   return no_error;
117 }
118
119 gras_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self,
120                                         /* OUT */ gras_socket_t *sock){
121   gras_error_t errcode;
122   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)self->data)->super;
123
124   GRAS_IN;
125   TRY(super->socket_client(super,sock));
126   sock->plugin = self;
127   TRY(gras_trp_buf_init_sock(sock));
128     
129   return no_error;
130 }
131
132 /**
133  * gras_trp_buf_socket_server:
134  *
135  * Open a socket used to receive messages.
136  */
137 gras_error_t gras_trp_buf_socket_server(gras_trp_plugin_t *self,
138                                         /* OUT */ gras_socket_t *sock){
139   gras_error_t errcode;
140   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)self->data)->super;
141
142   GRAS_IN;
143   TRY(super->socket_server(super,sock));
144   sock->plugin = self;
145   TRY(gras_trp_buf_init_sock(sock));
146   return no_error;
147 }
148
149 gras_error_t
150 gras_trp_buf_socket_accept(gras_socket_t  *sock,
151                            gras_socket_t **dst) {
152   gras_error_t errcode;
153   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
154       
155   GRAS_IN;
156   TRY(super->socket_accept(sock,dst));
157   (*dst)->plugin = sock->plugin;
158   TRY(gras_trp_buf_init_sock(*dst));
159   return no_error;
160 }
161
162 void gras_trp_buf_socket_close(gras_socket_t *sock){
163   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
164   gras_trp_bufdata_t *data=sock->bufdata;
165
166   GRAS_IN;
167   if (data->in.size || data->out.size)
168     gras_trp_buf_flush(sock);
169   if (data->in.data)
170     free(data->in.data);
171   if (data->out.data)
172     free(data->out.data);
173   free(data);
174
175   super->socket_close(sock);
176 }
177
178 /**
179  * gras_trp_buf_chunk_send:
180  *
181  * Send data on a TCP socket
182  */
183 gras_error_t 
184 gras_trp_buf_chunk_send(gras_socket_t *sock,
185                         const char *chunk,
186                         long int size) {
187
188   gras_error_t errcode;
189   gras_trp_bufdata_t *data=(gras_trp_bufdata_t*)sock->bufdata;
190   int chunk_pos=0;
191
192   GRAS_IN;
193   /* Let underneath plugin check for direction, we work even in duplex */
194   gras_assert0(size >= 0, "Cannot send a negative amount of data");
195
196   while (chunk_pos < size) {
197     /* size of the chunck to receive in that shot */
198     long int thissize = min(size-chunk_pos,data->buffsize - data->out.size);
199     DEBUG5("Set the chars %d..%ld into the buffer (size=%ld, ctn='%.*s')",
200            (int)data->out.size,
201            ((int)data->out.size) + thissize -1,
202            size, chunk_pos, chunk);
203
204     memcpy(data->out.data + data->out.size, chunk + chunk_pos, thissize);
205
206     data->out.size += thissize;
207     chunk_pos      += thissize;
208     DEBUG5("New pos = %d; Still to send = %ld of %ld; ctn sofar='%.*s'",
209            data->out.size,size-chunk_pos,size,(int)chunk_pos,chunk);
210
211     if (data->out.size == data->buffsize) /* out of space. Flush it */
212       TRY(gras_trp_buf_flush(sock));
213   }
214
215   GRAS_OUT;
216   return no_error;
217 }
218
219 /**
220  * gras_trp_buf_chunk_recv:
221  *
222  * Receive data on a TCP socket.
223  */
224 gras_error_t 
225 gras_trp_buf_chunk_recv(gras_socket_t *sock,
226                         char *chunk,
227                         long int size) {
228
229   gras_error_t errcode;
230   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
231   gras_trp_bufdata_t *data=sock->bufdata;
232   long int chunck_pos = 0;
233
234   /* Let underneath plugin check for direction, we work even in duplex */
235   gras_assert0(sock, "Cannot recv on an NULL socket");
236   gras_assert0(size >= 0, "Cannot receive a negative amount of data");
237   
238   GRAS_IN;
239
240   while (chunck_pos < size) {
241     /* size of the chunck to receive in that shot */
242     long int thissize;
243
244     if (data->in.size == data->in.pos) { /* out of data. Get more */
245       uint32_t nextsize;
246       DEBUG0("Recv the size");
247       TRY(super->chunk_recv(sock,(char*)&nextsize, 4));
248       data->in.size = ntohl(nextsize);
249
250       VERB1("Recv the chunk (size=%d)",data->in.size);
251       TRY(super->chunk_recv(sock, data->in.data, data->in.size));
252       data->in.pos=0;
253     }
254     
255     thissize = min(size-chunck_pos ,  data->in.size - data->in.pos);
256     DEBUG2("Get the chars %d..%ld out of the buffer",
257            data->in.pos,
258            data->in.pos + thissize - 1);
259     memcpy(chunk+chunck_pos, data->in.data + data->in.pos, thissize);
260
261     data->in.pos += thissize;
262     chunck_pos   += thissize;
263     DEBUG5("New pos = %d; Still to receive = %ld of %ld. Ctn so far='%.*s'",
264            data->in.pos,size - chunck_pos,size,(int)chunck_pos,chunk);
265   }
266
267   GRAS_OUT;
268   return no_error;
269 }
270
271 /**
272  * gras_trp_buf_flush:
273  *
274  * Make sure the data is sent
275  */
276 gras_error_t 
277 gras_trp_buf_flush(gras_socket_t *sock) {
278   gras_error_t errcode;
279   uint32_t size;
280   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
281   gras_trp_bufdata_t *data=sock->bufdata;
282
283   GRAS_IN;
284   size = htonl(data->out.size);
285   DEBUG1("Send the size (=%d)",data->out.size);
286   TRY(super->chunk_send(sock,(char*) &size, 4));
287
288   DEBUG1("Send the chunk (size=%d)",data->out.size);
289   TRY(super->chunk_send(sock, data->out.data, data->out.size));
290   VERB1("Chunk sent (size=%d)",data->out.size);
291   data->out.size = 0;
292   return no_error;
293 }