Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Renamed any gras stuff that was in xbt and should therefore be called
[simgrid.git] / src / gras / Transport / transport_plugin_buf.c
1 /* $Id$ */
2
3 /* buf trp (transport) - buffered transport using the TCP one            */
4
5 /* Authors: Martin Quinson                                                  */
6 /* Copyright (C) 2004 Martin Quinson.                                       */
7
8 /* This program is free software; you can redistribute it and/or modify it
9    under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include <netinet/in.h>   /* htonl/ntohl */
12 #include <stdlib.h>
13 #include <string.h>       /* memset */
14
15 #include "xbt/misc.h"
16 #include "transport_private.h"
17
18 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_buf,transport,
19       "Generic buffered transport (works on top of TCP or SG)");
20
21 /***
22  *** Prototypes 
23  ***/
24 xbt_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self,
25                                         gras_socket_t sock);
26 xbt_error_t gras_trp_buf_socket_server(gras_trp_plugin_t *self,
27                                         gras_socket_t sock);
28 xbt_error_t gras_trp_buf_socket_accept(gras_socket_t sock,
29                                         gras_socket_t *dst);
30
31 void         gras_trp_buf_socket_close(gras_socket_t sd);
32   
33 xbt_error_t gras_trp_buf_chunk_send(gras_socket_t sd,
34                                      const char *data,
35                                      long int size);
36
37 xbt_error_t gras_trp_buf_chunk_recv(gras_socket_t sd,
38                                      char *data,
39                                      long int size);
40 xbt_error_t gras_trp_buf_flush(gras_socket_t sock);
41
42
43 /***
44  *** Specific plugin part
45  ***/
46
47 typedef struct {
48   gras_trp_plugin_t *super;
49 } gras_trp_buf_plug_data_t;
50
51 /***
52  *** Specific socket part
53  ***/
54
55 typedef struct {
56   uint32_t size;
57   char *data;
58   int pos; /* for receive; not exchanged over the net */
59 } gras_trp_buf_t;
60
61 struct gras_trp_bufdata_{
62   gras_trp_buf_t in;
63   gras_trp_buf_t out;
64   int buffsize;
65 };
66
67 void gras_trp_buf_init_sock(gras_socket_t sock) {
68   gras_trp_bufdata_t *data=xbt_new(gras_trp_bufdata_t,1);
69   
70   XBT_IN;
71   data->buffsize = 100 * 1024 ; /* 100k */ 
72
73   data->in.size  = 0;
74   data->in.data  = xbt_malloc(data->buffsize);
75   data->in.pos   = 0; /* useless, indeed, since size==pos */
76    
77   data->out.size = 0;
78   data->out.data = xbt_malloc(data->buffsize);
79   data->out.pos  = 0;
80    
81   sock->bufdata = data;
82 }
83
84
85 /***
86  *** Code
87  ***/
88 xbt_error_t
89 gras_trp_buf_setup(gras_trp_plugin_t *plug) {
90   xbt_error_t errcode;
91   gras_trp_buf_plug_data_t *data =xbt_new(gras_trp_buf_plug_data_t,1);
92
93   XBT_IN;
94   TRY(gras_trp_plugin_get_by_name(gras_if_RL() ? "tcp" : "sg",
95                                   &(data->super)));
96   DEBUG1("Derivate a buffer plugin from %s",gras_if_RL() ? "tcp" : "sg");
97
98   plug->socket_client = gras_trp_buf_socket_client;
99   plug->socket_server = gras_trp_buf_socket_server;
100   plug->socket_accept = gras_trp_buf_socket_accept;
101   plug->socket_close  = gras_trp_buf_socket_close;
102
103   plug->chunk_send    = gras_trp_buf_chunk_send;
104   plug->chunk_recv    = gras_trp_buf_chunk_recv;
105
106   plug->flush         = gras_trp_buf_flush;
107
108   plug->data = (void*)data;
109   plug->exit = NULL;
110   
111   return no_error;
112 }
113
114 xbt_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self,
115                                         /* OUT */ gras_socket_t sock){
116   xbt_error_t errcode;
117   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)self->data)->super;
118
119   XBT_IN;
120   TRY(super->socket_client(super,sock));
121   sock->plugin = self;
122   gras_trp_buf_init_sock(sock);
123     
124   return no_error;
125 }
126
127 /**
128  * gras_trp_buf_socket_server:
129  *
130  * Open a socket used to receive messages.
131  */
132 xbt_error_t gras_trp_buf_socket_server(gras_trp_plugin_t *self,
133                                         /* OUT */ gras_socket_t sock){
134   xbt_error_t errcode;
135   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)self->data)->super;
136
137   XBT_IN;
138   TRY(super->socket_server(super,sock));
139   sock->plugin = self;
140   gras_trp_buf_init_sock(sock);
141   return no_error;
142 }
143
144 xbt_error_t
145 gras_trp_buf_socket_accept(gras_socket_t  sock,
146                            gras_socket_t *dst) {
147   xbt_error_t errcode;
148   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
149       
150   XBT_IN;
151   TRY(super->socket_accept(sock,dst));
152   (*dst)->plugin = sock->plugin;
153   gras_trp_buf_init_sock(*dst);
154   return no_error;
155 }
156
157 void gras_trp_buf_socket_close(gras_socket_t sock){
158   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
159   gras_trp_bufdata_t *data=sock->bufdata;
160
161   XBT_IN;
162   if (data->in.size || data->out.size)
163     gras_trp_buf_flush(sock);
164   if (data->in.data)
165     xbt_free(data->in.data);
166   if (data->out.data)
167     xbt_free(data->out.data);
168   xbt_free(data);
169
170   super->socket_close(sock);
171 }
172
173 /**
174  * gras_trp_buf_chunk_send:
175  *
176  * Send data on a TCP socket
177  */
178 xbt_error_t 
179 gras_trp_buf_chunk_send(gras_socket_t sock,
180                         const char *chunk,
181                         long int size) {
182
183   xbt_error_t errcode;
184   gras_trp_bufdata_t *data=(gras_trp_bufdata_t*)sock->bufdata;
185   int chunk_pos=0;
186
187   XBT_IN;
188   /* Let underneath plugin check for direction, we work even in duplex */
189   xbt_assert0(size >= 0, "Cannot send a negative amount of data");
190
191   while (chunk_pos < size) {
192     /* size of the chunck to receive in that shot */
193     long int thissize = min(size-chunk_pos,data->buffsize - data->out.size);
194     DEBUG5("Set the chars %d..%ld into the buffer (size=%ld, ctn='%.*s')",
195            (int)data->out.size,
196            ((int)data->out.size) + thissize -1,
197            size, chunk_pos, chunk);
198
199     memcpy(data->out.data + data->out.size, chunk + chunk_pos, thissize);
200
201     data->out.size += thissize;
202     chunk_pos      += thissize;
203     DEBUG5("New pos = %d; Still to send = %ld of %ld; ctn sofar='%.*s'",
204            data->out.size,size-chunk_pos,size,(int)chunk_pos,chunk);
205
206     if (data->out.size == data->buffsize) /* out of space. Flush it */
207       TRY(gras_trp_buf_flush(sock));
208   }
209
210   XBT_OUT;
211   return no_error;
212 }
213
214 /**
215  * gras_trp_buf_chunk_recv:
216  *
217  * Receive data on a TCP socket.
218  */
219 xbt_error_t 
220 gras_trp_buf_chunk_recv(gras_socket_t sock,
221                         char *chunk,
222                         long int size) {
223
224   xbt_error_t errcode;
225   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
226   gras_trp_bufdata_t *data=sock->bufdata;
227   long int chunck_pos = 0;
228
229   /* Let underneath plugin check for direction, we work even in duplex */
230   xbt_assert0(sock, "Cannot recv on an NULL socket");
231   xbt_assert0(size >= 0, "Cannot receive a negative amount of data");
232   
233   XBT_IN;
234
235   while (chunck_pos < size) {
236     /* size of the chunck to receive in that shot */
237     long int thissize;
238
239     if (data->in.size == data->in.pos) { /* out of data. Get more */
240       uint32_t nextsize;
241       DEBUG0("Recv the size");
242       TRY(super->chunk_recv(sock,(char*)&nextsize, 4));
243       data->in.size = ntohl(nextsize);
244
245       VERB1("Recv the chunk (size=%d)",data->in.size);
246       TRY(super->chunk_recv(sock, data->in.data, data->in.size));
247       data->in.pos=0;
248     }
249     
250     thissize = min(size-chunck_pos ,  data->in.size - data->in.pos);
251     DEBUG2("Get the chars %d..%ld out of the buffer",
252            data->in.pos,
253            data->in.pos + thissize - 1);
254     memcpy(chunk+chunck_pos, data->in.data + data->in.pos, thissize);
255
256     data->in.pos += thissize;
257     chunck_pos   += thissize;
258     DEBUG5("New pos = %d; Still to receive = %ld of %ld. Ctn so far='%.*s'",
259            data->in.pos,size - chunck_pos,size,(int)chunck_pos,chunk);
260   }
261
262   XBT_OUT;
263   return no_error;
264 }
265
266 /**
267  * gras_trp_buf_flush:
268  *
269  * Make sure the data is sent
270  */
271 xbt_error_t 
272 gras_trp_buf_flush(gras_socket_t sock) {
273   xbt_error_t errcode;
274   uint32_t size;
275   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
276   gras_trp_bufdata_t *data=sock->bufdata;
277
278   XBT_IN;
279   size = htonl(data->out.size);
280   DEBUG1("Send the size (=%d)",data->out.size);
281   TRY(super->chunk_send(sock,(char*) &size, 4));
282
283   DEBUG1("Send the chunk (size=%d)",data->out.size);
284   TRY(super->chunk_send(sock, data->out.data, data->out.size));
285   VERB1("Chunk sent (size=%d)",data->out.size);
286   data->out.size = 0;
287   return no_error;
288 }