Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
moving timer functions so that we can use them for internals
[simgrid.git] / src / gras / Transport / transport_plugin_buf.c
1 /* $Id$ */
2
3 /* buf trp (transport) - buffered transport using the TCP one            */
4
5 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include <stdlib.h>
11 #include <string.h>       /* memset */
12
13 #include "portable.h"
14 #include "xbt/misc.h"
15 #include "xbt/sysdep.h"
16 #include "transport_private.h"
17
18 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(trp_buf,transport,
19       "Generic buffered transport (works on top of TCP or SG)");
20
21 /***
22  *** Prototypes 
23  ***/
24 xbt_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self,
25                                         gras_socket_t sock);
26 xbt_error_t gras_trp_buf_socket_server(gras_trp_plugin_t *self,
27                                         gras_socket_t sock);
28 xbt_error_t gras_trp_buf_socket_accept(gras_socket_t sock,
29                                         gras_socket_t *dst);
30
31 void         gras_trp_buf_socket_close(gras_socket_t sd);
32   
33 xbt_error_t gras_trp_buf_chunk_send(gras_socket_t sd,
34                                      const char *data,
35                                      long int size);
36
37 xbt_error_t gras_trp_buf_chunk_recv(gras_socket_t sd,
38                                      char *data,
39                                      long int size);
40 xbt_error_t gras_trp_buf_flush(gras_socket_t sock);
41
42
43 /***
44  *** Specific plugin part
45  ***/
46
47 typedef struct {
48   gras_trp_plugin_t *super;
49 } gras_trp_buf_plug_data_t;
50
51 /***
52  *** Specific socket part
53  ***/
54
55 typedef struct {
56   int size;
57   char *data;
58   int pos; /* for receive; not exchanged over the net */
59 } gras_trp_buf_t;
60
61 struct gras_trp_bufdata_{
62   gras_trp_buf_t in;
63   gras_trp_buf_t out;
64   int buffsize;
65 };
66
67 void gras_trp_buf_init_sock(gras_socket_t sock) {
68   gras_trp_bufdata_t *data=xbt_new(gras_trp_bufdata_t,1);
69   
70   XBT_IN;
71   data->buffsize = 100 * 1024 ; /* 100k */ 
72
73   data->in.size  = 0;
74   data->in.data  = xbt_malloc(data->buffsize);
75   data->in.pos   = 0; /* useless, indeed, since size==pos */
76    
77   data->out.size = 0;
78   data->out.data = xbt_malloc(data->buffsize);
79   data->out.pos  = 0;
80    
81   sock->bufdata = data;
82 }
83
84
85 /***
86  *** Code
87  ***/
88 xbt_error_t
89 gras_trp_buf_setup(gras_trp_plugin_t *plug) {
90   xbt_error_t errcode;
91   gras_trp_buf_plug_data_t *data =xbt_new(gras_trp_buf_plug_data_t,1);
92
93   XBT_IN;
94   TRY(gras_trp_plugin_get_by_name(gras_if_RL() ? "tcp" : "sg",
95                                   &(data->super)));
96   DEBUG1("Derivate a buffer plugin from %s",gras_if_RL() ? "tcp" : "sg");
97
98   plug->socket_client = gras_trp_buf_socket_client;
99   plug->socket_server = gras_trp_buf_socket_server;
100   plug->socket_accept = gras_trp_buf_socket_accept;
101   plug->socket_close  = gras_trp_buf_socket_close;
102
103   plug->chunk_send    = gras_trp_buf_chunk_send;
104   plug->chunk_recv    = gras_trp_buf_chunk_recv;
105
106   plug->flush         = gras_trp_buf_flush;
107
108   plug->data = (void*)data;
109   plug->exit = NULL;
110   
111   return no_error;
112 }
113
114 xbt_error_t gras_trp_buf_socket_client(gras_trp_plugin_t *self,
115                                         /* OUT */ gras_socket_t sock){
116   xbt_error_t errcode;
117   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)self->data)->super;
118
119   XBT_IN;
120   TRY(super->socket_client(super,sock));
121   sock->plugin = self;
122   gras_trp_buf_init_sock(sock);
123     
124   return no_error;
125 }
126
127 /**
128  * gras_trp_buf_socket_server:
129  *
130  * Open a socket used to receive messages.
131  */
132 xbt_error_t gras_trp_buf_socket_server(gras_trp_plugin_t *self,
133                                         /* OUT */ gras_socket_t sock){
134   xbt_error_t errcode;
135   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)self->data)->super;
136
137   XBT_IN;
138   TRY(super->socket_server(super,sock));
139   sock->plugin = self;
140   gras_trp_buf_init_sock(sock);
141   return no_error;
142 }
143
144 xbt_error_t
145 gras_trp_buf_socket_accept(gras_socket_t  sock,
146                            gras_socket_t *dst) {
147   xbt_error_t errcode;
148   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
149       
150   XBT_IN;
151   TRY(super->socket_accept(sock,dst));
152   (*dst)->plugin = sock->plugin;
153   gras_trp_buf_init_sock(*dst);
154   XBT_OUT;
155   return no_error;
156 }
157
158 void gras_trp_buf_socket_close(gras_socket_t sock){
159   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
160   gras_trp_bufdata_t *data=sock->bufdata;
161
162   XBT_IN;
163   if (data->in.size!=data->in.pos) {
164      WARN1("Socket closed, but %d bytes were unread",data->in.size - data->in.pos);
165   }
166    
167   if (data->out.size!=data->out.pos) {
168     DEBUG2("Flush the socket before closing (in=%d,out=%d)",data->in.size, data->out.size);
169     gras_trp_buf_flush(sock);
170   }
171    
172   if (data->in.data)
173     free(data->in.data);
174   if (data->out.data)
175     free(data->out.data);
176   free(data);
177
178   super->socket_close(sock);
179 }
180
181 /**
182  * gras_trp_buf_chunk_send:
183  *
184  * Send data on a TCP socket
185  */
186 xbt_error_t 
187 gras_trp_buf_chunk_send(gras_socket_t sock,
188                         const char *chunk,
189                         long int size) {
190
191   xbt_error_t errcode;
192   gras_trp_bufdata_t *data=(gras_trp_bufdata_t*)sock->bufdata;
193   int chunk_pos=0;
194
195   XBT_IN;
196   /* Let underneath plugin check for direction, we work even in duplex */
197   xbt_assert0(size >= 0, "Cannot send a negative amount of data");
198
199   while (chunk_pos < size) {
200     /* size of the chunck to receive in that shot */
201     long int thissize = min(size-chunk_pos,data->buffsize - data->out.size);
202     DEBUG5("Set the chars %d..%ld into the buffer (size=%ld, ctn='%.*s')",
203            (int)data->out.size,
204            ((int)data->out.size) + thissize -1,
205            size, chunk_pos, chunk);
206
207     memcpy(data->out.data + data->out.size, chunk + chunk_pos, thissize);
208
209     data->out.size += thissize;
210     chunk_pos      += thissize;
211     DEBUG5("New pos = %d; Still to send = %ld of %ld; ctn sofar='%.*s'",
212            data->out.size,size-chunk_pos,size,(int)chunk_pos,chunk);
213
214     if (data->out.size == data->buffsize) /* out of space. Flush it */
215       TRY(gras_trp_buf_flush(sock));
216   }
217
218   XBT_OUT;
219   return no_error;
220 }
221
222 /**
223  * gras_trp_buf_chunk_recv:
224  *
225  * Receive data on a TCP socket.
226  */
227 xbt_error_t 
228 gras_trp_buf_chunk_recv(gras_socket_t sock,
229                         char *chunk,
230                         long int size) {
231
232   xbt_error_t errcode;
233   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
234   gras_trp_bufdata_t *data=sock->bufdata;
235   long int chunck_pos = 0;
236
237   /* Let underneath plugin check for direction, we work even in duplex */
238   xbt_assert0(sock, "Cannot recv on an NULL socket");
239   xbt_assert0(size >= 0, "Cannot receive a negative amount of data");
240   
241   XBT_IN;
242
243   while (chunck_pos < size) {
244     /* size of the chunck to receive in that shot */
245     long int thissize;
246
247     if (data->in.size == data->in.pos) { /* out of data. Get more */
248       int nextsize;
249       DEBUG0("Recv the size");
250       TRY(super->chunk_recv(sock,(char*)&nextsize, 4));
251       data->in.size = (int)ntohl(nextsize);
252
253       VERB1("Recv the chunk (size=%d)",data->in.size);
254       TRY(super->chunk_recv(sock, data->in.data, data->in.size));
255       data->in.pos=0;
256     }
257     
258     thissize = min(size-chunck_pos ,  data->in.size - data->in.pos);
259     DEBUG2("Get the chars %d..%ld out of the buffer",
260            data->in.pos,
261            data->in.pos + thissize - 1);
262     memcpy(chunk+chunck_pos, data->in.data + data->in.pos, thissize);
263
264     data->in.pos += thissize;
265     chunck_pos   += thissize;
266     DEBUG5("New pos = %d; Still to receive = %ld of %ld. Ctn so far='%.*s'",
267            data->in.pos,size - chunck_pos,size,(int)chunck_pos,chunk);
268   }
269
270   XBT_OUT;
271   return no_error;
272 }
273
274 /**
275  * gras_trp_buf_flush:
276  *
277  * Make sure the data is sent
278  */
279 xbt_error_t 
280 gras_trp_buf_flush(gras_socket_t sock) {
281   xbt_error_t errcode;
282   int size;
283   gras_trp_plugin_t *super=((gras_trp_buf_plug_data_t*)sock->plugin->data)->super;
284   gras_trp_bufdata_t *data=sock->bufdata;
285
286   XBT_IN;
287   if (! (data->out.size-data->out.pos) ) {
288      DEBUG0("Nothing to flush");
289      return no_error;
290   }
291    
292   size = (int)htonl(data->out.size-data->out.pos);
293   DEBUG1("Send the size (=%d)",data->out.size-data->out.pos);
294   TRY(super->chunk_send(sock,(char*) &size, 4));
295
296   DEBUG1("Send the chunk (size=%d)",data->out.size);
297   TRY(super->chunk_send(sock, data->out.data, data->out.size));
298   VERB1("Chunk sent (size=%d)",data->out.size);
299   data->out.size = 0;
300   return no_error;
301 }