Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
whining at CRITICAL level is not enough. Let's kill every body around
[simgrid.git] / src / gras / Transport / transport.c
1 /* $Id$ */
2
3 /* transport - low level communication                                      */
4
5 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 /***
11  *** Options
12  ***/
13 int gras_opt_trp_nomoredata_on_close=0;
14
15 #include "xbt/ex.h"
16 #include "xbt/peer.h"
17 #include "portable.h"
18 #include "gras/Transport/transport_private.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp,gras,"Conveying bytes over the network");
21 XBT_LOG_NEW_SUBCATEGORY(gras_trp_meas,gras_trp,"Conveying bytes over the network without formating for perf measurements");
22 static short int _gras_trp_started = 0;
23
24 static xbt_dict_t _gras_trp_plugins;      /* All registered plugins */
25 static void gras_trp_plugin_free(void *p); /* free one of the plugins */
26
27 static void
28 gras_trp_plugin_new(const char *name, gras_trp_setup_t setup) {
29   xbt_ex_t e;
30
31   gras_trp_plugin_t plug = xbt_new0(s_gras_trp_plugin_t, 1);
32   
33   DEBUG1("Create plugin %s",name);
34
35   plug->name=xbt_strdup(name);
36
37   TRY {
38     setup(plug);
39   } CATCH(e) {
40     if (e.category == mismatch_error) {
41       /* SG plugin raise mismatch when in RL mode (and vice versa) */
42       free(plug->name);
43       free(plug);
44       plug=NULL;
45       xbt_ex_free(e);
46     } else {
47       RETHROW;
48     }
49   }
50
51   if (plug)
52     xbt_dict_set(_gras_trp_plugins, name, plug, gras_trp_plugin_free);
53 }
54
55 void gras_trp_init(void){
56   if (!_gras_trp_started) {
57      /* make room for all plugins */
58      _gras_trp_plugins=xbt_dict_new();
59
60 #ifdef HAVE_WINSOCK2_H
61      /* initialize the windows mechanism */
62      {  
63         WORD wVersionRequested;
64         WSADATA wsaData;
65         
66         wVersionRequested = MAKEWORD( 2, 0 );
67         xbt_assert0(WSAStartup( wVersionRequested, &wsaData ) == 0,
68                     "Cannot find a usable WinSock DLL");
69         
70         /* Confirm that the WinSock DLL supports 2.0.*/
71         /* Note that if the DLL supports versions greater    */
72         /* than 2.0 in addition to 2.0, it will still return */
73         /* 2.0 in wVersion since that is the version we      */
74         /* requested.                                        */
75         
76         xbt_assert0(LOBYTE( wsaData.wVersion ) == 2 &&
77                     HIBYTE( wsaData.wVersion ) == 0,
78                     "Cannot find a usable WinSock DLL");
79         INFO0("Found and initialized winsock2");
80      }       /* The WinSock DLL is acceptable. Proceed. */
81 #elif HAVE_WINSOCK_H
82      {       WSADATA wsaData;
83         xbt_assert0(WSAStartup( 0x0101, &wsaData ) == 0,
84                     "Cannot find a usable WinSock DLL");
85         INFO0("Found and initialized winsock");
86      }
87 #endif
88    
89      /* Add plugins */
90      gras_trp_plugin_new("file",gras_trp_file_setup);
91      gras_trp_plugin_new("sg",gras_trp_sg_setup);
92      gras_trp_plugin_new("tcp", gras_trp_tcp_setup);
93   }
94    
95   _gras_trp_started++;
96 }
97
98 void
99 gras_trp_exit(void){
100   xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets;
101   gras_socket_t sock_iter;
102   int cursor;
103
104    if (_gras_trp_started == 0) {
105       return;
106    }
107    
108    if ( --_gras_trp_started == 0 ) {
109 #ifdef HAVE_WINSOCK_H
110       if ( WSACleanup() == SOCKET_ERROR ) {
111          if ( WSAGetLastError() == WSAEINPROGRESS ) {
112             WSACancelBlockingCall();
113             WSACleanup();
114          }
115         }
116 #endif
117
118       /* Close all the sockets */
119       xbt_dynar_foreach(sockets,cursor,sock_iter) {
120         VERB1("Closing the socket %p left open on exit. Maybe a socket leak?",
121               sock_iter);
122         gras_socket_close(sock_iter);
123       }
124       
125       /* Delete the plugins */
126       xbt_dict_free(&_gras_trp_plugins);
127    }
128 }
129
130
131 void gras_trp_plugin_free(void *p) {
132   gras_trp_plugin_t plug = p;
133
134   if (plug) {
135     if (plug->exit) {
136       plug->exit(plug);
137     } else if (plug->data) {
138       DEBUG1("Plugin %s lacks exit(). Free data anyway.",plug->name);
139       free(plug->data);
140     }
141
142     free(plug->name);
143     free(plug);
144   }
145 }
146
147
148 /**
149  * gras_trp_socket_new:
150  *
151  * Malloc a new socket, and initialize it with defaults
152  */
153 void gras_trp_socket_new(int incoming,
154                          gras_socket_t *dst) {
155
156   gras_socket_t sock=xbt_new0(s_gras_socket_t,1);
157
158   VERB1("Create a new socket (%p)", (void*)sock);
159
160   sock->plugin = NULL;
161
162   sock->incoming  = incoming ? 1:0;
163   sock->outgoing  = incoming ? 0:1;
164   sock->accepting = incoming ? 1:0;
165   sock->meas = 0;
166   sock->recv_ok = 1;
167   sock->valid = 1;
168   sock->moredata = 0;
169
170   sock->sd     = -1;
171   sock->port      = -1;
172   sock->peer_port = -1;
173   sock->peer_name = NULL;
174   sock->peer_proc = NULL;
175
176   sock->data   = NULL;
177   sock->bufdata = NULL;
178   
179   *dst = sock;
180
181   xbt_dynar_push(((gras_trp_procdata_t) 
182                   gras_libdata_by_id(gras_trp_libdata_id))->sockets,dst);
183   XBT_OUT;
184 }
185  
186 /**
187  * @brief Opens a server socket and makes it ready to be listened to.
188  * @param port: port on which you want to listen
189  * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
190  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
191  * 
192  * In real life, you'll get a TCP socket. 
193  */
194 gras_socket_t
195 gras_socket_server_ext(unsigned short port,
196                        
197                        unsigned long int buf_size,
198                        int measurement) {
199  
200   xbt_ex_t e;
201   gras_trp_plugin_t trp;
202   gras_socket_t sock;
203
204   DEBUG2("Create a server socket from plugin %s on port %d",
205          gras_if_RL() ? "tcp" : "sg",
206          port);
207   trp = gras_trp_plugin_get_by_name(gras_if_SG() ? "sg":"tcp");
208
209   /* defaults settings */
210   gras_trp_socket_new(1,&sock);
211   sock->plugin= trp;
212   sock->port=port;
213   sock->buf_size = buf_size>0 ? buf_size : 32*1024;
214   sock->meas = measurement;
215
216   /* Call plugin socket creation function */
217   DEBUG1("Prepare socket with plugin (fct=%p)",trp->socket_server);
218   TRY {
219     trp->socket_server(trp, sock);
220     DEBUG3("in=%c out=%c accept=%c",
221            sock->incoming?'y':'n', 
222            sock->outgoing?'y':'n',
223            sock->accepting?'y':'n');
224   } CATCH(e) {
225     int cursor;
226     gras_socket_t sock_iter;
227     xbt_dynar_t socks = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets;
228     xbt_dynar_foreach(socks, cursor, sock_iter) {
229        if (sock_iter==sock) 
230          xbt_dynar_cursor_rm(socks,&cursor);
231     }     
232     free(sock);
233     RETHROW;
234   }
235
236   if (!measurement)
237      ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport = port;
238   return sock;
239 }
240 /**
241  * @brief Opens a server socket on any port in the given range
242  * 
243  * @param minport: first port we will try
244  * @param maxport: last port we will try
245  * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
246  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
247  * 
248  * If none of the provided ports works, raises the exception got when trying the last possibility
249  */ 
250 gras_socket_t
251 gras_socket_server_range(unsigned short minport, unsigned short maxport,
252                          unsigned long int buf_size, int measurement) {
253    
254   int port;
255   gras_socket_t res=NULL;
256   xbt_ex_t e;
257   
258   for (port=minport; port<maxport;port ++) {
259     TRY {
260       res=gras_socket_server_ext(port,buf_size,measurement);
261     } CATCH(e) {
262       if (port==maxport)
263         RETHROW;
264       xbt_ex_free(e);
265     }
266     if (res)
267       return res;
268   }
269   THROW_IMPOSSIBLE;
270 }
271    
272 /**
273  * @brief Opens a client socket to a remote host.
274  * @param host: who you want to connect to
275  * @param port: where you want to connect to on this host
276  * @param buf_size: size of the buffer (in bytes) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
277  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
278  * 
279  * In real life, you'll get a TCP socket. 
280  */
281 gras_socket_t
282 gras_socket_client_ext(const char *host,
283                        unsigned short port,
284                        
285                        unsigned long int buf_size,
286                        int measurement) {
287  
288   xbt_ex_t e;
289   gras_trp_plugin_t trp;
290   gras_socket_t sock;
291
292   trp = gras_trp_plugin_get_by_name(gras_if_SG() ? "sg":"tcp");
293
294   DEBUG1("Create a client socket from plugin %s",gras_if_RL() ? "tcp" : "sg");
295   /* defaults settings */
296   gras_trp_socket_new(0,&sock);
297   sock->plugin= trp;
298   sock->peer_port = port;
299   sock->peer_name = (char*)strdup(host?host:"localhost");
300   sock->buf_size = buf_size>0 ? buf_size: 32*1024;
301   sock->meas = measurement;
302
303   /* plugin-specific */
304   TRY {
305     (*trp->socket_client)(trp, sock);
306     DEBUG3("in=%c out=%c accept=%c",
307            sock->incoming?'y':'n', 
308            sock->outgoing?'y':'n',
309            sock->accepting?'y':'n');
310   } CATCH(e) {
311     free(sock);
312     RETHROW;
313   }
314
315   return sock;
316 }
317
318 /**
319  * gras_socket_server:
320  *
321  * Opens a server socket and make it ready to be listened to.
322  * In real life, you'll get a TCP socket.
323  */
324 gras_socket_t
325 gras_socket_server(unsigned short port) {
326    return gras_socket_server_ext(port,32*1024,0);
327 }
328
329 /** @brief Opens a client socket to a remote host */
330 gras_socket_t
331 gras_socket_client(const char *host,
332                    unsigned short port) {
333    return gras_socket_client_ext(host,port,0,0);
334 }
335
336 /** @brief Opens a client socket to a remote host specified as '\a host:\a port' */
337 gras_socket_t
338 gras_socket_client_from_string(const char *host) {
339    xbt_peer_t p = xbt_peer_from_string(host);
340    gras_socket_t res = gras_socket_client_ext(p->name,p->port,0,0);
341    xbt_peer_free(p);
342    return res;
343 }
344
345 /** \brief Close socket */
346 void gras_socket_close(gras_socket_t sock) {
347   xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets;
348   gras_socket_t sock_iter;
349   int cursor;
350
351   XBT_IN;
352   VERB1("Close %p",sock);
353   if (sock == _gras_lastly_selected_socket) {
354      xbt_assert0(!gras_opt_trp_nomoredata_on_close || !sock->moredata,
355                  "Closing a socket having more data in buffer while the nomoredata_on_close option is activated");
356                  
357      if (sock->moredata) 
358        CRITICAL0("Closing a socket having more data in buffer. Option nomoredata_on_close disabled, so continuing.");
359      _gras_lastly_selected_socket=NULL;
360   }
361    
362   /* FIXME: Issue an event when the socket is closed */
363   if (sock) {
364     xbt_dynar_foreach(sockets,cursor,sock_iter) {
365       if (sock == sock_iter) {
366         xbt_dynar_cursor_rm(sockets,&cursor);
367         if (sock->plugin->socket_close) 
368           (* sock->plugin->socket_close)(sock);
369
370         /* free the memory */
371         if (sock->peer_name)
372           free(sock->peer_name);
373         free(sock);
374         XBT_OUT;
375         return;
376       }
377     }
378     WARN1("Ignoring request to free an unknown socket (%p). Execution stack:",sock);
379     xbt_backtrace_display();
380   }
381   XBT_OUT;
382 }
383
384 /**
385  * gras_trp_send:
386  *
387  * Send a bunch of bytes from on socket
388  * (stable if we know the storage will keep as is until the next trp_flush)
389  */
390 void
391 gras_trp_send(gras_socket_t sd, char *data, long int size, int stable) {
392   xbt_assert0(sd->outgoing,"Socket not suited for data send");
393   (*sd->plugin->send)(sd,data,size,stable);
394 }
395 /**
396  * gras_trp_chunk_recv:
397  *
398  * Receive a bunch of bytes from a socket
399  */
400 void
401 gras_trp_recv(gras_socket_t sd, char *data, long int size) {
402   xbt_assert0(sd->incoming,"Socket not suited for data receive");
403   (sd->plugin->recv)(sd,data,size);
404 }
405
406 /**
407  * gras_trp_flush:
408  *
409  * Make sure all pending communications are done
410  */
411 void
412 gras_trp_flush(gras_socket_t sd) {
413   if (sd->plugin->flush)
414     (sd->plugin->flush)(sd);
415 }
416
417 gras_trp_plugin_t
418 gras_trp_plugin_get_by_name(const char *name){
419   return xbt_dict_get(_gras_trp_plugins,name);
420 }
421
422 int gras_socket_my_port  (gras_socket_t sock) {
423   return sock->port;
424 }
425 int   gras_socket_peer_port(gras_socket_t sock) {
426   return sock->peer_port;
427 }
428 char *gras_socket_peer_name(gras_socket_t sock) {
429   return sock->peer_name;
430 }
431 char *gras_socket_peer_proc(gras_socket_t sock) {
432   return sock->peer_proc;
433 }
434
435 void gras_socket_peer_proc_set(gras_socket_t sock,char*peer_proc) {
436   sock->peer_proc = peer_proc;
437 }
438
439 /** \brief Check if the provided socket is a measurement one (or a regular one) */
440 int gras_socket_is_meas(gras_socket_t sock) {
441   return sock->meas;
442 }
443
444 /** \brief Send a chunk of (random) data over a measurement socket 
445  *
446  * @param peer measurement socket to use for the experiment
447  * @param timeout timeout (in seconds)
448  * @param exp_size total amount of data to send (in bytes).
449  * @param msg_size size of each chunk sent over the socket (in bytes).
450  *
451  * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on 
452  * each side of the socket should be paired. 
453  * 
454  * The exchanged data is zeroed to make sure it's initialized, but
455  * there is no way to control what is sent (ie, you cannot use these 
456  * functions to exchange data out of band).
457  */
458 void gras_socket_meas_send(gras_socket_t peer, 
459                            unsigned int timeout,
460                            unsigned long int exp_size, 
461                            unsigned long int msg_size) {
462   char *chunk=NULL;
463   unsigned long int exp_sofar;
464   unsigned long int chunk_size = 0;
465   
466   XBT_IN;
467
468   if (gras_if_RL()) 
469     chunk=xbt_malloc0(msg_size);
470
471   xbt_assert0(peer->meas,"Asked to send measurement data on a regular socket");
472   xbt_assert0(peer->outgoing,"Socket not suited for data send (was created with gras_socket_server(), not gras_socket_client())");
473
474   for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += chunk_size) {
475      chunk_size = exp_sofar + msg_size > exp_size ? 
476        exp_size - exp_sofar : msg_size;
477      CDEBUG6(gras_trp_meas,"Sent %lu of %lu (msg_size=%ld) to %s:%d, sending %lu",
478              exp_sofar,exp_size,msg_size,
479              gras_socket_peer_name(peer), gras_socket_peer_port(peer), 
480              chunk_size);
481      (*peer->plugin->raw_send)(peer,chunk,chunk_size);
482   }
483   CDEBUG5(gras_trp_meas,"Sent %lu of %lu (msg_size=%ld) to %s:%d",
484           exp_sofar,exp_size,msg_size,
485           gras_socket_peer_name(peer), gras_socket_peer_port(peer));
486              
487   if (gras_if_RL()) 
488     free(chunk);
489
490   XBT_OUT;
491 }
492
493 /** \brief Receive a chunk of data over a measurement socket 
494  *
495  * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on 
496  * each side of the socket should be paired. 
497  */
498 void gras_socket_meas_recv(gras_socket_t peer, 
499                            unsigned int timeout,
500                            unsigned long int exp_size, 
501                            unsigned long int msg_size){
502   
503   char *chunk=NULL;
504   unsigned long int exp_sofar;
505   unsigned long int chunk_size = 0;
506
507   XBT_IN;
508
509   if (gras_if_RL()) 
510     chunk = xbt_malloc(msg_size);
511
512   xbt_assert0(peer->meas,
513               "Asked to receive measurement data on a regular socket");
514   xbt_assert0(peer->incoming,"Socket not suited for data receive");
515
516   for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += chunk_size) {
517      chunk_size = exp_sofar + msg_size > exp_size ? 
518        exp_size - exp_sofar : msg_size;
519      CDEBUG6(gras_trp_meas,"Recvd %ld of %lu (msg_size=%ld) from %s:%d, receiving %lu",
520              exp_sofar,exp_size,msg_size,
521              gras_socket_peer_name(peer), gras_socket_peer_port(peer), 
522              chunk_size);
523      (peer->plugin->raw_recv)(peer,chunk,chunk_size);
524   }
525   CDEBUG5(gras_trp_meas,"Recvd %ld of %lu (msg_size=%ld) from %s:%d",
526           exp_sofar,exp_size,msg_size,
527           gras_socket_peer_name(peer), gras_socket_peer_port(peer));
528
529   if (gras_if_RL()) 
530     free(chunk);
531   XBT_OUT;
532 }
533
534 /**
535  * \brief Something similar to the good old accept system call. 
536  *
537  * Make sure that there is someone speaking to the provided server socket.
538  * In RL, it does an accept(2) and return the result as last argument. 
539  * In SG, as accepts are useless, it returns the provided argument as result.
540  * You should thus test whether (peer != accepted) before closing both of them.
541  *
542  * You should only call this on measurement sockets. It is automatically 
543  * done for regular sockets, but you usually want more control about 
544  * what's going on with measurement sockets.
545  */
546 gras_socket_t gras_socket_meas_accept(gras_socket_t peer){
547   gras_socket_t res;
548
549   xbt_assert0(peer->meas,
550               "No need to accept on non-measurement sockets (it's automatic)");
551
552   if (!peer->accepting) {
553     /* nothing to accept here (must be in SG) */
554     /* BUG: FIXME: this is BAD! it makes tricky to free the accepted socket*/
555     return peer;
556   }
557
558   res = (peer->plugin->socket_accept)(peer);
559   res->meas = peer->meas;
560   CDEBUG1(gras_trp_meas,"meas_accepted onto %d",res->sd);
561
562   return res;
563
564
565
566 /*
567  * Creating procdata for this module
568  */
569 static void *gras_trp_procdata_new() {
570    gras_trp_procdata_t res = xbt_new(s_gras_trp_procdata_t,1);
571    
572    res->name = xbt_strdup("gras_trp");
573    res->name_len = 0;
574    res->sockets   = xbt_dynar_new(sizeof(gras_socket_t*), NULL);
575    res->myport = 0;
576    
577    return (void*)res;
578 }
579
580 /*
581  * Freeing procdata for this module
582  */
583 static void gras_trp_procdata_free(void *data) {
584   gras_trp_procdata_t res = (gras_trp_procdata_t)data;
585   
586   xbt_dynar_free(&( res->sockets ));
587   free(res->name);
588   free(res);
589 }
590
591 void gras_trp_socketset_dump(const char *name) {
592   gras_trp_procdata_t procdata = 
593     (gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
594
595   int it;
596   gras_socket_t s;
597
598   INFO1("** Dump the socket set %s",name);
599   xbt_dynar_foreach(procdata->sockets, it, s) {
600     INFO4("  %p -> %s:%d %s",
601           s,gras_socket_peer_name(s),gras_socket_peer_port(s),
602           s->valid?"(valid)":"(peer dead)");
603   }
604   INFO1("** End of socket set %s",name);
605 }
606
607 /*
608  * Module registration
609  */
610 int gras_trp_libdata_id;
611 void gras_trp_register() {
612    gras_trp_libdata_id = gras_procdata_add("gras_trp",gras_trp_procdata_new, gras_trp_procdata_free);
613 }
614
615 int gras_os_myport(void)  {
616    return ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport;
617 }
618