Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
The change Arnaud wanted so much: Big Star Eradication (plus some more, check the...
[simgrid.git] / src / gras / Transport / transport.c
1 /* $Id$ */
2
3 /* transport - low level communication                                      */
4
5 /* Authors: Martin Quinson                                                  */
6 /* Copyright (C) 2004 Martin Quinson.                                       */
7
8 /* This program is free software; you can redistribute it and/or modify it
9    under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "gras/Transport/transport_private.h"
12
13 GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(transport,gras,"Conveying bytes over the network");
14 GRAS_LOG_NEW_SUBCATEGORY(raw_trp,transport,"Conveying bytes over the network without formating");
15
16 static gras_dict_t _gras_trp_plugins;      /* All registered plugins */
17 static void gras_trp_plugin_free(void *p); /* free one of the plugins */
18
19 static void gras_trp_socket_free(void *s); /* free one socket */
20
21 static void
22 gras_trp_plugin_new(const char *name, gras_trp_setup_t setup) {
23   gras_error_t errcode;
24
25   gras_trp_plugin_t *plug = gras_new0(gras_trp_plugin_t, 1);
26   
27   DEBUG1("Create plugin %s",name);
28
29   plug->name=gras_strdup(name);
30
31   errcode = setup(plug);
32   switch (errcode) {
33   case mismatch_error:
34     /* SG plugin return mismatch when in RL mode (and vice versa) */
35     gras_free(plug->name);
36     gras_free(plug);
37     break;
38
39   case no_error:
40     gras_dict_set(_gras_trp_plugins,
41                   name, plug, gras_trp_plugin_free);
42     break;
43
44   default:
45     DIE_IMPOSSIBLE;
46   }
47
48 }
49
50 void gras_trp_init(void){
51   /* make room for all plugins */
52   _gras_trp_plugins=gras_dict_new();
53
54   /* Add them */
55   gras_trp_plugin_new("tcp", gras_trp_tcp_setup);
56   gras_trp_plugin_new("file",gras_trp_file_setup);
57   gras_trp_plugin_new("sg",gras_trp_sg_setup);
58
59   /* buf is composed, so it must come after the others */
60   gras_trp_plugin_new("buf", gras_trp_buf_setup);
61
62 }
63
64 void
65 gras_trp_exit(void){
66   gras_dict_free(&_gras_trp_plugins);
67 }
68
69
70 void gras_trp_plugin_free(void *p) {
71   gras_trp_plugin_t *plug = p;
72
73   if (plug) {
74     if (plug->exit) {
75       plug->exit(plug);
76     } else if (plug->data) {
77       DEBUG1("Plugin %s lacks exit(). Free data anyway.",plug->name);
78       gras_free(plug->data);
79     }
80
81     gras_free(plug->name);
82     gras_free(plug);
83   }
84 }
85
86
87 /**
88  * gras_trp_socket_new:
89  *
90  * Malloc a new socket, and initialize it with defaults
91  */
92 void gras_trp_socket_new(int incoming,
93                          gras_socket_t *dst) {
94
95   gras_socket_t sock=gras_new0(s_gras_socket_t,1);
96
97   DEBUG1("Create a new socket (%p)", (void*)sock);
98
99   sock->plugin = NULL;
100   sock->sd     = -1;
101   sock->data   = NULL;
102
103   sock->incoming  = incoming ? 1:0;
104   sock->outgoing  = incoming ? 0:1;
105   sock->accepting = incoming ? 1:0;
106
107   sock->port      = -1;
108   sock->peer_port = -1;
109   sock->peer_name = NULL;
110   sock->raw = 0;
111
112   *dst = sock;
113
114   gras_dynar_push(gras_socketset_get(),dst);
115 }
116
117
118 /**
119  * gras_socket_server_ext:
120  *
121  * Opens a server socket and make it ready to be listened to.
122  * In real life, you'll get a TCP socket.
123  */
124 gras_error_t
125 gras_socket_server_ext(unsigned short port,
126                        
127                        unsigned long int bufSize,
128                        int raw,
129                        
130                        /* OUT */ gras_socket_t *dst) {
131  
132   gras_error_t errcode;
133   gras_trp_plugin_t *trp;
134   gras_socket_t sock;
135
136   *dst = NULL;
137
138   DEBUG2("Create a server socket from plugin %s on port %d",
139          gras_if_RL() ? "tcp" : "sg",
140          port);
141   TRY(gras_trp_plugin_get_by_name("buf",&trp));
142
143   /* defaults settings */
144   gras_trp_socket_new(1,&sock);
145   sock->plugin= trp;
146   sock->port=port;
147   sock->bufSize = bufSize;
148   sock->raw = raw;
149
150   /* Call plugin socket creation function */
151   errcode = trp->socket_server(trp, sock);
152   DEBUG3("in=%c out=%c accept=%c",
153          sock->incoming?'y':'n', 
154          sock->outgoing?'y':'n',
155          sock->accepting?'y':'n');
156
157   if (errcode != no_error) {
158     gras_free(sock);
159     return errcode;
160   }
161
162   *dst = sock;
163
164   return no_error;
165 }
166    
167 /**
168  * gras_socket_client_ext:
169  *
170  * Opens a client socket to a remote host.
171  * In real life, you'll get a TCP socket.
172  */
173 gras_error_t
174 gras_socket_client_ext(const char *host,
175                        unsigned short port,
176                        
177                        unsigned long int bufSize,
178                        int raw,
179                        
180                        /* OUT */ gras_socket_t *dst) {
181  
182   gras_error_t errcode;
183   gras_trp_plugin_t *trp;
184   gras_socket_t sock;
185
186   *dst = NULL;
187
188   TRY(gras_trp_plugin_get_by_name("buf",&trp));
189
190   DEBUG1("Create a client socket from plugin %s",gras_if_RL() ? "tcp" : "sg");
191   /* defaults settings */
192   gras_trp_socket_new(0,&sock);
193   sock->plugin= trp;
194   sock->peer_port = port;
195   sock->peer_name = (char*)strdup(host?host:"localhost");
196   sock->bufSize = bufSize;
197   sock->raw = raw;
198
199   /* plugin-specific */
200   errcode= (*trp->socket_client)(trp, sock);
201   DEBUG3("in=%c out=%c accept=%c",
202          sock->incoming?'y':'n', 
203          sock->outgoing?'y':'n',
204          sock->accepting?'y':'n');
205
206   if (errcode != no_error) {
207     gras_free(sock);
208     return errcode;
209   }
210
211   *dst = sock;
212
213   return no_error;
214 }
215
216 /**
217  * gras_socket_server:
218  *
219  * Opens a server socket and make it ready to be listened to.
220  * In real life, you'll get a TCP socket.
221  */
222 gras_error_t
223 gras_socket_server(unsigned short port,
224                    /* OUT */ gras_socket_t *dst) {
225    return gras_socket_server_ext(port,32,0,dst);
226 }
227
228 /**
229  * gras_socket_client:
230  *
231  * Opens a client socket to a remote host.
232  * In real life, you'll get a TCP socket.
233  */
234 gras_error_t
235 gras_socket_client(const char *host,
236                    unsigned short port,
237                    /* OUT */ gras_socket_t *dst) {
238    return gras_socket_client_ext(host,port,32,0,dst);
239 }
240
241
242 void gras_socket_close(gras_socket_t sock) {
243   gras_dynar_t sockets = gras_socketset_get();
244   gras_socket_t sock_iter;
245   int cursor;
246
247   /* FIXME: Issue an event when the socket is closed */
248   if (sock) {
249     gras_dynar_foreach(sockets,cursor,sock_iter) {
250       if (sock == sock_iter) {
251         gras_dynar_cursor_rm(sockets,&cursor);
252         if ( sock->plugin->socket_close) 
253           (* sock->plugin->socket_close)(sock);
254
255         /* free the memory */
256         if (sock->peer_name)
257           gras_free(sock->peer_name);
258         gras_free(sock);
259         return;
260       }
261     }
262     WARN0("Ignoring request to free an unknown socket");
263   }
264 }
265
266 /**
267  * gras_trp_chunk_send:
268  *
269  * Send a bunch of bytes from on socket
270  */
271 gras_error_t
272 gras_trp_chunk_send(gras_socket_t sd,
273                     char *data,
274                     long int size) {
275   gras_assert1(sd->outgoing,
276                "Socket not suited for data send (outgoing=%c)",
277                sd->outgoing?'y':'n');
278   gras_assert1(sd->plugin->chunk_send,
279                "No function chunk_send on transport plugin %s",
280                sd->plugin->name);
281   return (*sd->plugin->chunk_send)(sd,data,size);
282 }
283 /**
284  * gras_trp_chunk_recv:
285  *
286  * Receive a bunch of bytes from a socket
287  */
288 gras_error_t 
289 gras_trp_chunk_recv(gras_socket_t sd,
290                     char *data,
291                     long int size) {
292   gras_assert0(sd->incoming,
293                "Socket not suited for data receive");
294   gras_assert1(sd->plugin->chunk_recv,
295                "No function chunk_recv on transport plugin %s",
296                sd->plugin->name);
297   return (sd->plugin->chunk_recv)(sd,data,size);
298 }
299
300 /**
301  * gras_trp_flush:
302  *
303  * Make sure all pending communications are done
304  */
305 gras_error_t 
306 gras_trp_flush(gras_socket_t sd) {
307   return (sd->plugin->flush)(sd);
308 }
309
310 gras_error_t
311 gras_trp_plugin_get_by_name(const char *name,
312                             gras_trp_plugin_t **dst){
313
314   return gras_dict_get(_gras_trp_plugins,name,(void**)dst);
315 }
316
317 int   gras_socket_my_port  (gras_socket_t sock) {
318   return sock->port;
319 }
320 int   gras_socket_peer_port(gras_socket_t sock) {
321   return sock->peer_port;
322 }
323 char *gras_socket_peer_name(gras_socket_t sock) {
324   return sock->peer_name;
325 }
326
327 gras_error_t gras_socket_raw_send(gras_socket_t peer, 
328                                   unsigned int timeout,
329                                   unsigned long int exp_size, 
330                                   unsigned long int msg_size) {
331   gras_error_t errcode;
332   char *chunk = gras_malloc(msg_size);
333   int exp_sofar;
334    
335   gras_assert0(peer->raw,"Asked to send raw data on a regular socket");
336   for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
337      CDEBUG5(raw_trp,"Sent %d of %lu (msg_size=%ld) to %s:%d",
338              exp_sofar,exp_size,msg_size,
339              gras_socket_peer_name(peer), gras_socket_peer_port(peer));
340      TRY(gras_trp_chunk_send(peer,chunk,msg_size));
341   }
342   CDEBUG5(raw_trp,"Sent %d of %lu (msg_size=%ld) to %s:%d",
343           exp_sofar,exp_size,msg_size,
344           gras_socket_peer_name(peer), gras_socket_peer_port(peer));
345              
346   gras_free(chunk);
347   return no_error;//gras_socket_raw_exchange(peer,1,timeout,expSize,msgSize);   
348 }
349
350 gras_error_t gras_socket_raw_recv(gras_socket_t peer, 
351                                   unsigned int timeout,
352                                   unsigned long int exp_size, 
353                                   unsigned long int msg_size){
354   
355   gras_error_t errcode;
356   char *chunk = gras_malloc(msg_size);
357   int exp_sofar;
358
359   gras_assert0(peer->raw,"Asked to recveive raw data on a regular socket\n");
360   for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
361      CDEBUG5(raw_trp,"Recvd %d of %lu (msg_size=%ld) from %s:%d",
362              exp_sofar,exp_size,msg_size,
363              gras_socket_peer_name(peer), gras_socket_peer_port(peer));
364      TRY(gras_trp_chunk_recv(peer,chunk,msg_size));
365   }
366   CDEBUG5(raw_trp,"Recvd %d of %lu (msg_size=%ld) from %s:%d",
367           exp_sofar,exp_size,msg_size,
368           gras_socket_peer_name(peer), gras_socket_peer_port(peer));
369
370   gras_free(chunk);
371   return no_error;//gras_socket_raw_exchange(peer,0,timeout,expSize,msgSize);   
372 }