Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
s/TRY/TRYOLD/ I'd like to introduce a TRY macro in the exception mecanism, but this...
[simgrid.git] / src / gras / Transport / transport.c
1 /* $Id$ */
2
3 /* transport - low level communication                                      */
4
5 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "portable.h"
11 #include "gras/Transport/transport_private.h"
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(transport,gras,"Conveying bytes over the network");
14 XBT_LOG_NEW_SUBCATEGORY(trp_meas,transport,"Conveying bytes over the network without formating for perf measurements");
15 static short int _gras_trp_started = 0;
16
17 static xbt_dict_t _gras_trp_plugins;      /* All registered plugins */
18 static void gras_trp_plugin_free(void *p); /* free one of the plugins */
19
20 static void
21 gras_trp_plugin_new(const char *name, gras_trp_setup_t setup) {
22   xbt_error_t errcode;
23
24   gras_trp_plugin_t plug = xbt_new0(s_gras_trp_plugin_t, 1);
25   
26   DEBUG1("Create plugin %s",name);
27
28   plug->name=xbt_strdup(name);
29
30   errcode = setup(plug);
31   switch (errcode) {
32   case mismatch_error:
33     /* SG plugin return mismatch when in RL mode (and vice versa) */
34     free(plug->name);
35     free(plug);
36     break;
37
38   case no_error:
39     xbt_dict_set(_gras_trp_plugins,
40                   name, plug, gras_trp_plugin_free);
41     break;
42
43   default:
44     DIE_IMPOSSIBLE;
45   }
46
47 }
48
49 void gras_trp_init(void){
50   if (!_gras_trp_started) {
51      /* make room for all plugins */
52      _gras_trp_plugins=xbt_dict_new();
53
54 #ifdef HAVE_WINSOCK2_H
55      /* initialize the windows mechanism */
56      {  
57         WORD wVersionRequested;
58         WSADATA wsaData;
59         
60         wVersionRequested = MAKEWORD( 2, 0 );
61         xbt_assert0(WSAStartup( wVersionRequested, &wsaData ) == 0,
62                     "Cannot find a usable WinSock DLL");
63         
64         /* Confirm that the WinSock DLL supports 2.0.*/
65         /* Note that if the DLL supports versions greater    */
66         /* than 2.0 in addition to 2.0, it will still return */
67         /* 2.0 in wVersion since that is the version we      */
68         /* requested.                                        */
69         
70         xbt_assert0(LOBYTE( wsaData.wVersion ) == 2 &&
71                     HIBYTE( wsaData.wVersion ) == 0,
72                     "Cannot find a usable WinSock DLL");
73         INFO0("Found and initialized winsock2");
74      }       /* The WinSock DLL is acceptable. Proceed. */
75 #elif HAVE_WINSOCK_H
76      {       WSADATA wsaData;
77         xbt_assert0(WSAStartup( 0x0101, &wsaData ) == 0,
78                     "Cannot find a usable WinSock DLL");
79         INFO0("Found and initialized winsock");
80      }
81 #endif
82    
83      /* Add plugins */
84      gras_trp_plugin_new("tcp", gras_trp_tcp_setup);
85      gras_trp_plugin_new("file",gras_trp_file_setup);
86      gras_trp_plugin_new("sg",gras_trp_sg_setup);
87
88      /* buf is composed, so it must come after the others */
89      gras_trp_plugin_new("buf", gras_trp_buf_setup);
90   }
91    
92   _gras_trp_started++;
93 }
94
95 void
96 gras_trp_exit(void){
97   xbt_dynar_t sockets = gras_socketset_get();
98   gras_socket_t sock_iter;
99   int cursor;
100
101    if (_gras_trp_started == 0) {
102       return;
103    }
104    
105    if ( --_gras_trp_started == 0 ) {
106 #ifdef HAVE_WINSOCK_H
107       if ( WSACleanup() == SOCKET_ERROR ) {
108          if ( WSAGetLastError() == WSAEINPROGRESS ) {
109             WSACancelBlockingCall();
110             WSACleanup();
111          }
112         }
113 #endif
114
115       /* Close all the sockets */
116       xbt_dynar_foreach(sockets,cursor,sock_iter) {
117         VERB1("Closing the socket %p left open on exit. Maybe a socket leak?",
118               sock_iter);
119         gras_socket_close(sock_iter);
120       }
121       
122       /* Delete the plugins */
123       xbt_dict_free(&_gras_trp_plugins);
124    }
125 }
126
127
128 void gras_trp_plugin_free(void *p) {
129   gras_trp_plugin_t plug = p;
130
131   if (plug) {
132     if (plug->exit) {
133       plug->exit(plug);
134     } else if (plug->data) {
135       DEBUG1("Plugin %s lacks exit(). Free data anyway.",plug->name);
136       free(plug->data);
137     }
138
139     free(plug->name);
140     free(plug);
141   }
142 }
143
144
145 /**
146  * gras_trp_socket_new:
147  *
148  * Malloc a new socket, and initialize it with defaults
149  */
150 void gras_trp_socket_new(int incoming,
151                          gras_socket_t *dst) {
152
153   gras_socket_t sock=xbt_new0(s_gras_socket_t,1);
154
155   DEBUG1("Create a new socket (%p)", (void*)sock);
156
157   sock->plugin = NULL;
158
159   sock->incoming  = incoming ? 1:0;
160   sock->outgoing  = incoming ? 0:1;
161   sock->accepting = incoming ? 1:0;
162   sock->meas = 0;
163
164   sock->sd     = -1;
165   sock->port      = -1;
166   sock->peer_port = -1;
167   sock->peer_name = NULL;
168
169   sock->data   = NULL;
170   sock->bufdata = NULL;
171   
172   *dst = sock;
173
174   xbt_dynar_push(gras_socketset_get(),dst);
175   XBT_OUT;
176 }
177
178
179 /**
180  * gras_socket_server_ext:
181  *
182  * Opens a server socket and make it ready to be listened to.
183  * In real life, you'll get a TCP socket.
184  */
185 xbt_error_t
186 gras_socket_server_ext(unsigned short port,
187                        
188                        unsigned long int bufSize,
189                        int measurement,
190                        
191                        /* OUT */ gras_socket_t *dst) {
192  
193   xbt_error_t errcode;
194   gras_trp_plugin_t trp;
195   gras_socket_t sock;
196
197   *dst = NULL;
198
199   DEBUG2("Create a server socket from plugin %s on port %d",
200          gras_if_RL() ? "tcp" : "sg",
201          port);
202   TRYOLD(gras_trp_plugin_get_by_name((measurement? (gras_if_RL() ? "tcp" : "sg")
203                                               :"buf"),
204                                   &trp));
205
206   /* defaults settings */
207   gras_trp_socket_new(1,&sock);
208   sock->plugin= trp;
209   sock->port=port;
210   sock->bufSize = bufSize;
211   sock->meas = measurement;
212
213   /* Call plugin socket creation function */
214   DEBUG1("Prepare socket with plugin (fct=%p)",trp->socket_server);
215   errcode = trp->socket_server(trp, sock);
216   DEBUG3("in=%c out=%c accept=%c",
217          sock->incoming?'y':'n', 
218          sock->outgoing?'y':'n',
219          sock->accepting?'y':'n');
220
221   if (errcode != no_error) {
222     free(sock);
223     return errcode;
224   }
225
226   *dst = sock;
227
228   return no_error;
229 }
230    
231 /**
232  * gras_socket_client_ext:
233  *
234  * Opens a client socket to a remote host.
235  * In real life, you'll get a TCP socket.
236  */
237 xbt_error_t
238 gras_socket_client_ext(const char *host,
239                        unsigned short port,
240                        
241                        unsigned long int bufSize,
242                        int measurement,
243                        
244                        /* OUT */ gras_socket_t *dst) {
245  
246   xbt_error_t errcode;
247   gras_trp_plugin_t trp;
248   gras_socket_t sock;
249
250   *dst = NULL;
251
252   TRYOLD(gras_trp_plugin_get_by_name((measurement? (gras_if_RL() ? "tcp" : "sg")
253                                               :"buf"),
254                                   &trp));
255
256   DEBUG1("Create a client socket from plugin %s",gras_if_RL() ? "tcp" : "sg");
257   /* defaults settings */
258   gras_trp_socket_new(0,&sock);
259   sock->plugin= trp;
260   sock->peer_port = port;
261   sock->peer_name = (char*)strdup(host?host:"localhost");
262   sock->bufSize = bufSize;
263   sock->meas = measurement;
264
265   /* plugin-specific */
266   errcode= (*trp->socket_client)(trp, sock);
267   DEBUG3("in=%c out=%c accept=%c",
268          sock->incoming?'y':'n', 
269          sock->outgoing?'y':'n',
270          sock->accepting?'y':'n');
271
272   if (errcode != no_error) {
273     free(sock);
274     return errcode;
275   }
276
277   *dst = sock;
278
279   return no_error;
280 }
281
282 /**
283  * gras_socket_server:
284  *
285  * Opens a server socket and make it ready to be listened to.
286  * In real life, you'll get a TCP socket.
287  */
288 xbt_error_t
289 gras_socket_server(unsigned short port,
290                    /* OUT */ gras_socket_t *dst) {
291    return gras_socket_server_ext(port,32,0,dst);
292 }
293
294 /**
295  * gras_socket_client:
296  *
297  * Opens a client socket to a remote host.
298  * In real life, you'll get a TCP socket.
299  */
300 xbt_error_t
301 gras_socket_client(const char *host,
302                    unsigned short port,
303                    /* OUT */ gras_socket_t *dst) {
304    return gras_socket_client_ext(host,port,32,0,dst);
305 }
306
307
308 void gras_socket_close(gras_socket_t sock) {
309   xbt_dynar_t sockets = gras_socketset_get();
310   gras_socket_t sock_iter;
311   int cursor;
312
313   XBT_IN;
314   /* FIXME: Issue an event when the socket is closed */
315   if (sock) {
316     xbt_dynar_foreach(sockets,cursor,sock_iter) {
317       if (sock == sock_iter) {
318         xbt_dynar_cursor_rm(sockets,&cursor);
319         if (sock->plugin->socket_close) 
320           (* sock->plugin->socket_close)(sock);
321
322         /* free the memory */
323         if (sock->peer_name)
324           free(sock->peer_name);
325         free(sock);
326         XBT_OUT;
327         return;
328       }
329     }
330     WARN1("Ignoring request to free an unknown socket (%p)",sock);
331   }
332   XBT_OUT;
333 }
334
335 /**
336  * gras_trp_chunk_send:
337  *
338  * Send a bunch of bytes from on socket
339  */
340 xbt_error_t
341 gras_trp_chunk_send(gras_socket_t sd,
342                     char *data,
343                     long int size) {
344   xbt_assert1(sd->outgoing,
345                "Socket not suited for data send (outgoing=%c)",
346                sd->outgoing?'y':'n');
347   xbt_assert1(sd->plugin->chunk_send,
348                "No function chunk_send on transport plugin %s",
349                sd->plugin->name);
350   return (*sd->plugin->chunk_send)(sd,data,size);
351 }
352 /**
353  * gras_trp_chunk_recv:
354  *
355  * Receive a bunch of bytes from a socket
356  */
357 xbt_error_t 
358 gras_trp_chunk_recv(gras_socket_t sd,
359                     char *data,
360                     long int size) {
361   xbt_assert0(sd->incoming,
362                "Socket not suited for data receive");
363   xbt_assert1(sd->plugin->chunk_recv,
364                "No function chunk_recv on transport plugin %s",
365                sd->plugin->name);
366   return (sd->plugin->chunk_recv)(sd,data,size);
367 }
368
369 /**
370  * gras_trp_flush:
371  *
372  * Make sure all pending communications are done
373  */
374 xbt_error_t 
375 gras_trp_flush(gras_socket_t sd) {
376   return (sd->plugin->flush)(sd);
377 }
378
379 xbt_error_t
380 gras_trp_plugin_get_by_name(const char *name,
381                             gras_trp_plugin_t *dst){
382
383   return xbt_dict_get(_gras_trp_plugins,name,(void**)dst);
384 }
385
386 int   gras_socket_my_port  (gras_socket_t sock) {
387   return sock->port;
388 }
389 int   gras_socket_peer_port(gras_socket_t sock) {
390   return sock->peer_port;
391 }
392 char *gras_socket_peer_name(gras_socket_t sock) {
393   return sock->peer_name;
394 }
395
396 /** \brief Check if the provided socket is a measurement one (or a regular one) */
397 int gras_socket_is_meas(gras_socket_t sock) {
398   return sock->meas;
399 }
400
401 /** \brief Send a chunk of (random) data over a measurement socket 
402  *
403  * @param peer measurement socket to use for the experiment
404  * @param timeout timeout (in seconds)
405  * @param exp_size total amount of data to send (in bytes).
406  * @param msg_size size of each chunk sent over the socket (in bytes).
407  *
408  * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on 
409  * each side of the socket should be paired. 
410  * 
411  * The exchanged data is zeroed to make sure it's initialized, but
412  * there is no way to control what is sent (ie, you cannot use these 
413  * functions to exchange data out of band).
414  */
415 xbt_error_t gras_socket_meas_send(gras_socket_t peer, 
416                                   unsigned int timeout,
417                                   unsigned long int exp_size, 
418                                   unsigned long int msg_size) {
419   xbt_error_t errcode;
420   char *chunk = xbt_malloc0(msg_size);
421   unsigned long int exp_sofar;
422    
423   XBT_IN;
424
425   xbt_assert0(peer->meas,"Asked to send measurement data on a regular socket");
426
427   for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
428      CDEBUG5(trp_meas,"Sent %lu of %lu (msg_size=%ld) to %s:%d",
429              exp_sofar,exp_size,msg_size,
430              gras_socket_peer_name(peer), gras_socket_peer_port(peer));
431      TRYOLD(gras_trp_chunk_send(peer,chunk,msg_size));
432   }
433   CDEBUG5(trp_meas,"Sent %lu of %lu (msg_size=%ld) to %s:%d",
434           exp_sofar,exp_size,msg_size,
435           gras_socket_peer_name(peer), gras_socket_peer_port(peer));
436              
437   free(chunk);
438
439   XBT_OUT;
440   return no_error;
441 }
442
443 /** \brief Receive a chunk of data over a measurement socket 
444  *
445  * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on 
446  * each side of the socket should be paired. 
447  */
448 xbt_error_t gras_socket_meas_recv(gras_socket_t peer, 
449                                   unsigned int timeout,
450                                   unsigned long int exp_size, 
451                                   unsigned long int msg_size){
452   
453   xbt_error_t errcode;
454   char *chunk = xbt_malloc(msg_size);
455   unsigned long int exp_sofar;
456
457   XBT_IN;
458
459   xbt_assert0(peer->meas,"Asked to receive measurement data on a regular socket\n");
460
461   for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) {
462      CDEBUG5(trp_meas,"Recvd %ld of %lu (msg_size=%ld) from %s:%d",
463              exp_sofar,exp_size,msg_size,
464              gras_socket_peer_name(peer), gras_socket_peer_port(peer));
465      TRYOLD(gras_trp_chunk_recv(peer,chunk,msg_size));
466   }
467   CDEBUG5(trp_meas,"Recvd %ld of %lu (msg_size=%ld) from %s:%d",
468           exp_sofar,exp_size,msg_size,
469           gras_socket_peer_name(peer), gras_socket_peer_port(peer));
470
471   free(chunk);
472   XBT_OUT;
473
474   return no_error;
475 }
476
477 /**
478  * \brief Something similar to the good old accept system call. 
479  *
480  * Make sure that there is someone speaking to the provided server socket.
481  * In RL, it does an accept(2) and return the result as last argument. 
482  * In SG, as accepts are useless, it returns the provided argument as result.
483  * You should thus test whether (peer != accepted) before closing both of them.
484  *
485  * You should only call this on measurement sockets. It is automatically 
486  * done for regular sockets, but you usually want more control about 
487  * what's going on with measurement sockets.
488  */
489 xbt_error_t gras_socket_meas_accept(gras_socket_t peer, gras_socket_t *accepted){
490   xbt_error_t errcode;
491   gras_socket_t res;
492   
493   xbt_assert0(peer->meas,
494               "No need to accept on non-measurement sockets (it's automatic)");
495
496   if (!peer->accepting) {
497     /* nothing to accept here */
498     *accepted=peer;
499     return no_error;
500   }
501
502   TRYOLD((peer->plugin->socket_accept)(peer,accepted));
503   (*accepted)->meas = peer->meas;
504   CDEBUG1(trp_meas,"meas_accepted onto %d",(*accepted)->sd);
505
506   return no_error;
507
508
509
510 /*
511  * Creating procdata for this module
512  */
513 static void *gras_trp_procdata_new() {
514    gras_trp_procdata_t res = xbt_new(s_gras_trp_procdata_t,1);
515    
516    res->sockets   = xbt_dynar_new(sizeof(gras_socket_t*), NULL);
517    
518    return (void*)res;
519 }
520
521 /*
522  * Freeing procdata for this module
523  */
524 static void gras_trp_procdata_free(void *data) {
525    gras_trp_procdata_t res = (gras_trp_procdata_t)data;
526    
527    xbt_dynar_free(&( res->sockets ));
528    free(res);
529 }
530
531 /*
532  * Module registration
533  */
534 void gras_trp_register() {
535    gras_procdata_add("gras_trp",gras_trp_procdata_new, gras_trp_procdata_free);
536 }
537
538
539 xbt_dynar_t 
540 gras_socketset_get(void) {
541    /* FIXME: KILLME */
542    return ((gras_trp_procdata_t) gras_libdata_get("gras_trp"))->sockets;
543 }