Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
KILKILKIL
[simgrid.git] / src / gras / Transport / transport.c
1 /* $Id$ */
2
3 /* transport - low level communication                                      */
4
5 /* Authors: Martin Quinson                                                  */
6 /* Copyright (C) 2004 Martin Quinson.                                       */
7
8 /* This program is free software; you can redistribute it and/or modify it
9    under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "gras/Transport/transport_private.h"
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(transport,gras,"Conveying bytes over the network");
14 XBT_LOG_NEW_SUBCATEGORY(raw_trp,transport,"Conveying bytes over the network without formating");
15
16 static xbt_dict_t _gras_trp_plugins;      /* All registered plugins */
17 static void gras_trp_plugin_free(void *p); /* free one of the plugins */
18
19 static void
20 gras_trp_plugin_new(const char *name, gras_trp_setup_t setup) {
21   xbt_error_t errcode;
22
23   gras_trp_plugin_t *plug = xbt_new0(gras_trp_plugin_t, 1);
24   
25   DEBUG1("Create plugin %s",name);
26
27   plug->name=xbt_strdup(name);
28
29   errcode = setup(plug);
30   switch (errcode) {
31   case mismatch_error:
32     /* SG plugin return mismatch when in RL mode (and vice versa) */
33     xbt_free(plug->name);
34     xbt_free(plug);
35     break;
36
37   case no_error:
38     xbt_dict_set(_gras_trp_plugins,
39                   name, plug, gras_trp_plugin_free);
40     break;
41
42   default:
43     DIE_IMPOSSIBLE;
44   }
45
46 }
47
48 void gras_trp_init(void){
49   /* make room for all plugins */
50   _gras_trp_plugins=xbt_dict_new();
51
52   /* Add them */
53   gras_trp_plugin_new("tcp", gras_trp_tcp_setup);
54   gras_trp_plugin_new("file",gras_trp_file_setup);
55   gras_trp_plugin_new("sg",gras_trp_sg_setup);
56
57   /* buf is composed, so it must come after the others */
58   gras_trp_plugin_new("buf", gras_trp_buf_setup);
59
60 }
61
62 void
63 gras_trp_exit(void){
64   xbt_dict_free(&_gras_trp_plugins);
65 }
66
67
68 void gras_trp_plugin_free(void *p) {
69   gras_trp_plugin_t *plug = p;
70
71   if (plug) {
72     if (plug->exit) {
73       plug->exit(plug);
74     } else if (plug->data) {
75       DEBUG1("Plugin %s lacks exit(). Free data anyway.",plug->name);
76       xbt_free(plug->data);
77     }
78
79     xbt_free(plug->name);
80     xbt_free(plug);
81   }
82 }
83
84
85 /**
86  * gras_trp_socket_new:
87  *
88  * Malloc a new socket, and initialize it with defaults
89  */
90 void gras_trp_socket_new(int incoming,
91                          gras_socket_t *dst) {
92
93   gras_socket_t sock=xbt_new0(s_gras_socket_t,1);
94
95   DEBUG1("Create a new socket (%p)", (void*)sock);
96
97   sock->plugin = NULL;
98   sock->sd     = -1;
99   sock->data   = NULL;
100
101   sock->incoming  = incoming ? 1:0;
102   sock->outgoing  = incoming ? 0:1;
103   sock->accepting = incoming ? 1:0;
104
105   sock->port      = -1;
106   sock->peer_port = -1;
107   sock->peer_name = NULL;
108   sock->raw = 0;
109
110   *dst = sock;
111
112   xbt_dynar_push(gras_socketset_get(),dst);
113 }
114
115
116 /**
117  * gras_socket_server_ext:
118  *
119  * Opens a server socket and make it ready to be listened to.
120  * In real life, you'll get a TCP socket.
121  */
122 xbt_error_t
123 gras_socket_server_ext(unsigned short port,
124                        
125                        unsigned long int bufSize,
126                        int raw,
127                        
128                        /* OUT */ gras_socket_t *dst) {
129  
130   xbt_error_t errcode;
131   gras_trp_plugin_t *trp;
132   gras_socket_t sock;
133
134   *dst = NULL;
135
136   DEBUG2("Create a server socket from plugin %s on port %d",
137          gras_if_RL() ? "tcp" : "sg",
138          port);
139   TRY(gras_trp_plugin_get_by_name("buf",&trp));
140
141   /* defaults settings */
142   gras_trp_socket_new(1,&sock);
143   sock->plugin= trp;
144   sock->port=port;
145   sock->bufSize = bufSize;
146   sock->raw = raw;
147
148   /* Call plugin socket creation function */
149   errcode = trp->socket_server(trp, sock);
150   DEBUG3("in=%c out=%c accept=%c",
151          sock->incoming?'y':'n', 
152          sock->outgoing?'y':'n',
153          sock->accepting?'y':'n');
154
155   if (errcode != no_error) {
156     xbt_free(sock);
157     return errcode;
158   }
159
160   *dst = sock;
161
162   return no_error;
163 }
164    
165 /**
166  * gras_socket_client_ext:
167  *
168  * Opens a client socket to a remote host.
169  * In real life, you'll get a TCP socket.
170  */
171 xbt_error_t
172 gras_socket_client_ext(const char *host,
173                        unsigned short port,
174                        
175                        unsigned long int bufSize,
176                        int raw,
177                        
178                        /* OUT */ gras_socket_t *dst) {
179  
180   xbt_error_t errcode;
181   gras_trp_plugin_t *trp;
182   gras_socket_t sock;
183
184   *dst = NULL;
185
186   TRY(gras_trp_plugin_get_by_name("buf",&trp));
187
188   DEBUG1("Create a client socket from plugin %s",gras_if_RL() ? "tcp" : "sg");
189   /* defaults settings */
190   gras_trp_socket_new(0,&sock);
191   sock->plugin= trp;
192   sock->peer_port = port;
193   sock->peer_name = (char*)strdup(host?host:"localhost");
194   sock->bufSize = bufSize;
195   sock->raw = raw;
196
197   /* plugin-specific */
198   errcode= (*trp->socket_client)(trp, sock);
199   DEBUG3("in=%c out=%c accept=%c",
200          sock->incoming?'y':'n', 
201          sock->outgoing?'y':'n',
202          sock->accepting?'y':'n');
203
204   if (errcode != no_error) {
205     xbt_free(sock);
206     return errcode;
207   }
208
209   *dst = sock;
210
211   return no_error;
212 }
213
214 /**
215  * gras_socket_server:
216  *
217  * Opens a server socket and make it ready to be listened to.
218  * In real life, you'll get a TCP socket.
219  */
220 xbt_error_t
221 gras_socket_server(unsigned short port,
222                    /* OUT */ gras_socket_t *dst) {
223    return gras_socket_server_ext(port,32,0,dst);
224 }
225
226 /**
227  * gras_socket_client:
228  *
229  * Opens a client socket to a remote host.
230  * In real life, you'll get a TCP socket.
231  */
232 xbt_error_t
233 gras_socket_client(const char *host,
234                    unsigned short port,
235                    /* OUT */ gras_socket_t *dst) {
236    return gras_socket_client_ext(host,port,32,0,dst);
237 }
238
239
240 void gras_socket_close(gras_socket_t sock) {
241   xbt_dynar_t sockets = gras_socketset_get();
242   gras_socket_t sock_iter;
243   int cursor;
244
245   /* FIXME: Issue an event when the socket is closed */
246   if (sock) {
247     xbt_dynar_foreach(sockets,cursor,sock_iter) {
248       if (sock == sock_iter) {
249         xbt_dynar_cursor_rm(sockets,&cursor);
250         if ( sock->plugin->socket_close) 
251           (* sock->plugin->socket_close)(sock);
252
253         /* free the memory */
254         if (sock->peer_name)
255           xbt_free(sock->peer_name);
256         xbt_free(sock);
257         return;
258       }
259     }
260     WARN0("Ignoring request to free an unknown socket");
261   }
262 }
263
264 /**
265  * gras_trp_chunk_send:
266  *
267  * Send a bunch of bytes from on socket
268  */
269 xbt_error_t
270 gras_trp_chunk_send(gras_socket_t sd,
271                     char *data,
272                     long int size) {
273   xbt_assert1(sd->outgoing,
274                "Socket not suited for data send (outgoing=%c)",
275                sd->outgoing?'y':'n');
276   xbt_assert1(sd->plugin->chunk_send,
277                "No function chunk_send on transport plugin %s",
278                sd->plugin->name);
279   return (*sd->plugin->chunk_send)(sd,data,size);
280 }
281 /**
282  * gras_trp_chunk_recv:
283  *
284  * Receive a bunch of bytes from a socket
285  */
286 xbt_error_t 
287 gras_trp_chunk_recv(gras_socket_t sd,
288                     char *data,
289                     long int size) {
290   xbt_assert0(sd->incoming,
291                "Socket not suited for data receive");
292   xbt_assert1(sd->plugin->chunk_recv,
293                "No function chunk_recv on transport plugin %s",
294                sd->plugin->name);
295   return (sd->plugin->chunk_recv)(sd,data,size);
296 }
297
298 /**
299  * gras_trp_flush:
300  *
301  * Make sure all pending communications are done
302  */
303 xbt_error_t 
304 gras_trp_flush(gras_socket_t sd) {
305   return (sd->plugin->flush)(sd);
306 }
307
308 xbt_error_t
309 gras_trp_plugin_get_by_name(const char *name,
310                             gras_trp_plugin_t **dst){
311
312   return xbt_dict_get(_gras_trp_plugins,name,(void**)dst);
313 }
314
315 int   gras_socket_my_port  (gras_socket_t sock) {
316   return sock->port;
317 }
318 int   gras_socket_peer_port(gras_socket_t sock) {
319   return sock->peer_port;
320 }
321 char *gras_socket_peer_name(gras_socket_t sock) {
322   return sock->peer_name;
323 }
324
325 xbt_error_t gras_socket_raw_send(gras_socket_t peer, 
326                                   unsigned int timeout,
327                                   unsigned long int exp_size, 
328                                   unsigned long int msg_size) {
329   xbt_error_t errcode;
330   char *chunk = xbt_malloc(msg_size);
331   int exp_sofar;
332    
333   xbt_assert0(peer->raw,"Asked to send raw data on a regular socket");
334   for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
335      CDEBUG5(raw_trp,"Sent %d of %lu (msg_size=%ld) to %s:%d",
336              exp_sofar,exp_size,msg_size,
337              gras_socket_peer_name(peer), gras_socket_peer_port(peer));
338      TRY(gras_trp_chunk_send(peer,chunk,msg_size));
339   }
340   CDEBUG5(raw_trp,"Sent %d of %lu (msg_size=%ld) to %s:%d",
341           exp_sofar,exp_size,msg_size,
342           gras_socket_peer_name(peer), gras_socket_peer_port(peer));
343              
344   xbt_free(chunk);
345   return no_error;/* gras_socket_raw_exchange(peer,1,timeout,expSize,msgSize);    */
346 }
347
348 xbt_error_t gras_socket_raw_recv(gras_socket_t peer, 
349                                   unsigned int timeout,
350                                   unsigned long int exp_size, 
351                                   unsigned long int msg_size){
352   
353   xbt_error_t errcode;
354   char *chunk = xbt_malloc(msg_size);
355   int exp_sofar;
356
357   xbt_assert0(peer->raw,"Asked to recveive raw data on a regular socket\n");
358   for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
359      CDEBUG5(raw_trp,"Recvd %d of %lu (msg_size=%ld) from %s:%d",
360              exp_sofar,exp_size,msg_size,
361              gras_socket_peer_name(peer), gras_socket_peer_port(peer));
362      TRY(gras_trp_chunk_recv(peer,chunk,msg_size));
363   }
364   CDEBUG5(raw_trp,"Recvd %d of %lu (msg_size=%ld) from %s:%d",
365           exp_sofar,exp_size,msg_size,
366           gras_socket_peer_name(peer), gras_socket_peer_port(peer));
367
368   xbt_free(chunk);
369   return no_error;/* gras_socket_raw_exchange(peer,0,timeout,expSize,msgSize);    */
370 }