Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Gras listener thread of each process do select(-1) instead of while(1) {select(0.5)}
[simgrid.git] / src / gras / Transport / transport.c
1 /* $Id$ */
2
3 /* transport - low level communication                                      */
4
5 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 /***
11  *** Options
12  ***/
13 int gras_opt_trp_nomoredata_on_close=0;
14
15 #include "xbt/ex.h"
16 #include "xbt/peer.h"
17 #include "portable.h"
18 #include "gras/Transport/transport_private.h"
19 #include "gras/Msg/msg_interface.h"
20
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp,gras,"Conveying bytes over the network");
22 XBT_LOG_NEW_SUBCATEGORY(gras_trp_meas,gras_trp,"Conveying bytes over the network without formating for perf measurements");
23 static short int _gras_trp_started = 0;
24
25 static xbt_dict_t _gras_trp_plugins;      /* All registered plugins */
26 static void gras_trp_plugin_free(void *p); /* free one of the plugins */
27
28 static void
29 gras_trp_plugin_new(const char *name, gras_trp_setup_t setup) {
30   xbt_ex_t e;
31
32   gras_trp_plugin_t plug = xbt_new0(s_gras_trp_plugin_t, 1);
33
34   DEBUG1("Create plugin %s",name);
35
36   plug->name=xbt_strdup(name);
37
38   TRY {
39     setup(plug);
40   } CATCH(e) {
41     if (e.category == mismatch_error) {
42       /* SG plugin raise mismatch when in RL mode (and vice versa) */
43       free(plug->name);
44       free(plug);
45       plug=NULL;
46       xbt_ex_free(e);
47     } else {
48       RETHROW;
49     }
50   }
51
52   if (plug)
53     xbt_dict_set(_gras_trp_plugins, name, plug, gras_trp_plugin_free);
54 }
55
56 void gras_trp_init(void){
57   if (!_gras_trp_started) {
58      /* make room for all plugins */
59      _gras_trp_plugins=xbt_dict_new();
60
61 #ifdef HAVE_WINSOCK2_H
62      /* initialize the windows mechanism */
63      {
64         WORD wVersionRequested;
65         WSADATA wsaData;
66
67         wVersionRequested = MAKEWORD( 2, 0 );
68         xbt_assert0(WSAStartup( wVersionRequested, &wsaData ) == 0,
69                     "Cannot find a usable WinSock DLL");
70
71         /* Confirm that the WinSock DLL supports 2.0.*/
72         /* Note that if the DLL supports versions greater    */
73         /* than 2.0 in addition to 2.0, it will still return */
74         /* 2.0 in wVersion since that is the version we      */
75         /* requested.                                        */
76
77         xbt_assert0(LOBYTE( wsaData.wVersion ) == 2 &&
78                     HIBYTE( wsaData.wVersion ) == 0,
79                     "Cannot find a usable WinSock DLL");
80         INFO0("Found and initialized winsock2");
81      }       /* The WinSock DLL is acceptable. Proceed. */
82 #elif HAVE_WINSOCK_H
83      {       WSADATA wsaData;
84         xbt_assert0(WSAStartup( 0x0101, &wsaData ) == 0,
85                     "Cannot find a usable WinSock DLL");
86         INFO0("Found and initialized winsock");
87      }
88 #endif
89
90      /* Add plugins */
91      gras_trp_plugin_new("file",gras_trp_file_setup);
92      gras_trp_plugin_new("sg",gras_trp_sg_setup);
93      gras_trp_plugin_new("tcp", gras_trp_tcp_setup);
94   }
95
96   _gras_trp_started++;
97 }
98
99 void
100 gras_trp_exit(void){
101    DEBUG1("gras_trp value %d",_gras_trp_started);
102    if (_gras_trp_started == 0) {
103       return;
104    }
105
106    if ( --_gras_trp_started == 0 ) {
107 #ifdef HAVE_WINSOCK_H
108       if ( WSACleanup() == SOCKET_ERROR ) {
109          if ( WSAGetLastError() == WSAEINPROGRESS ) {
110             WSACancelBlockingCall();
111             WSACleanup();
112          }
113         }
114 #endif
115
116       /* Delete the plugins */
117       xbt_dict_free(&_gras_trp_plugins);
118    }
119 }
120
121
122 void gras_trp_plugin_free(void *p) {
123   gras_trp_plugin_t plug = p;
124
125   if (plug) {
126     if (plug->exit) {
127       plug->exit(plug);
128     } else if (plug->data) {
129       DEBUG1("Plugin %s lacks exit(). Free data anyway.",plug->name);
130       free(plug->data);
131     }
132
133     free(plug->name);
134     free(plug);
135   }
136 }
137
138
139 /**
140  * gras_trp_socket_new:
141  *
142  * Malloc a new socket, and initialize it with defaults
143  */
144 void gras_trp_socket_new(int incoming,
145                          gras_socket_t *dst) {
146
147   gras_socket_t sock=xbt_new0(s_gras_socket_t,1);
148
149   VERB1("Create a new socket (%p)", (void*)sock);
150
151   sock->plugin = NULL;
152
153   sock->incoming  = incoming ? 1:0;
154   sock->outgoing  = incoming ? 0:1;
155   sock->accepting = incoming ? 1:0;
156   sock->meas = 0;
157   sock->recvd = 0;
158   sock->valid = 1;
159   sock->moredata = 0;
160
161   sock->sd     = -1;
162   sock->port      = -1;
163   sock->peer_port = -1;
164   sock->peer_name = NULL;
165   sock->peer_proc = NULL;
166
167   sock->data   = NULL;
168   sock->bufdata = NULL;
169
170   *dst = sock;
171
172   XBT_OUT;
173 }
174
175 /**
176  * @brief Opens a server socket and makes it ready to be listened to.
177  * @param port: port on which you want to listen
178  * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
179  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
180  *
181  * In real life, you'll get a TCP socket.
182  */
183 gras_socket_t
184 gras_socket_server_ext(unsigned short port,
185
186                        unsigned long int buf_size,
187                        int measurement) {
188
189   xbt_ex_t e;
190   gras_trp_plugin_t trp;
191   gras_socket_t sock;
192
193   DEBUG2("Create a server socket from plugin %s on port %d",
194          gras_if_RL() ? "tcp" : "sg",
195          port);
196   trp = gras_trp_plugin_get_by_name(gras_if_SG() ? "sg":"tcp");
197
198   /* defaults settings */
199   gras_trp_socket_new(1,&sock);
200   sock->plugin= trp;
201   sock->port=port;
202   sock->buf_size = buf_size>0 ? buf_size : 32*1024;
203   sock->meas = measurement;
204
205   /* Call plugin socket creation function */
206   DEBUG1("Prepare socket with plugin (fct=%p)",trp->socket_server);
207   TRY {
208     trp->socket_server(trp, sock);
209     DEBUG3("in=%c out=%c accept=%c",
210            sock->incoming?'y':'n',
211            sock->outgoing?'y':'n',
212            sock->accepting?'y':'n');
213   } CATCH(e) {
214
215     free(sock);
216     RETHROW;
217   }
218
219   if (!measurement)
220      ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport = port;
221   xbt_dynar_push(((gras_trp_procdata_t)
222                   gras_libdata_by_id(gras_trp_libdata_id))->sockets,&sock);
223
224   gras_msg_listener_awake();
225   return sock;
226 }
227 /**
228  * @brief Opens a server socket on any port in the given range
229  *
230  * @param minport: first port we will try
231  * @param maxport: last port we will try
232  * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
233  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
234  *
235  * If none of the provided ports works, raises the exception got when trying the last possibility
236  */
237 gras_socket_t
238 gras_socket_server_range(unsigned short minport, unsigned short maxport,
239                          unsigned long int buf_size, int measurement) {
240
241   int port;
242   gras_socket_t res=NULL;
243   xbt_ex_t e;
244
245   for (port=minport; port<maxport;port ++) {
246     TRY {
247       res=gras_socket_server_ext(port,buf_size,measurement);
248     } CATCH(e) {
249       if (port==maxport)
250         RETHROW;
251       xbt_ex_free(e);
252     }
253     if (res)
254       return res;
255   }
256   THROW_IMPOSSIBLE;
257 }
258
259 /**
260  * @brief Opens a client socket to a remote host.
261  * @param host: who you want to connect to
262  * @param port: where you want to connect to on this host
263  * @param buf_size: size of the buffer (in bytes) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
264  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
265  *
266  * In real life, you'll get a TCP socket.
267  */
268 gras_socket_t
269 gras_socket_client_ext(const char *host,
270                        unsigned short port,
271
272                        unsigned long int buf_size,
273                        int measurement) {
274
275   xbt_ex_t e;
276   gras_trp_plugin_t trp;
277   gras_socket_t sock;
278
279   trp = gras_trp_plugin_get_by_name(gras_if_SG() ? "sg":"tcp");
280
281   DEBUG1("Create a client socket from plugin %s",gras_if_RL() ? "tcp" : "sg");
282   /* defaults settings */
283   gras_trp_socket_new(0,&sock);
284   sock->plugin= trp;
285   sock->peer_port = port;
286   sock->peer_name = (char*)strdup(host?host:"localhost");
287   sock->buf_size = buf_size>0 ? buf_size: 32*1024;
288   sock->meas = measurement;
289
290   /* plugin-specific */
291   TRY {
292     (*trp->socket_client)(trp, sock);
293     DEBUG3("in=%c out=%c accept=%c",
294            sock->incoming?'y':'n',
295            sock->outgoing?'y':'n',
296            sock->accepting?'y':'n');
297   } CATCH(e) {
298      free(sock);
299      RETHROW;
300   }
301   xbt_dynar_push(((gras_trp_procdata_t)
302                   gras_libdata_by_id(gras_trp_libdata_id))->sockets,&sock);
303   gras_msg_listener_awake();
304   return sock;
305 }
306
307 /**
308  * @brief Opens a server socket and make it ready to be listened to.
309  *
310  * In real life, you'll get a TCP socket.
311  */
312 gras_socket_t
313 gras_socket_server(unsigned short port) {
314    return gras_socket_server_ext(port,32*1024,0);
315 }
316
317 /** @brief Opens a client socket to a remote host */
318 gras_socket_t
319 gras_socket_client(const char *host,
320                    unsigned short port) {
321    return gras_socket_client_ext(host,port,0,0);
322 }
323
324 /** @brief Opens a client socket to a remote host specified as '\a host:\a port' */
325 gras_socket_t
326 gras_socket_client_from_string(const char *host) {
327    xbt_peer_t p = xbt_peer_from_string(host);
328    gras_socket_t res = gras_socket_client_ext(p->name,p->port,0,0);
329    xbt_peer_free(p);
330    return res;
331 }
332
333 /** \brief Close socket */
334 void gras_socket_close(gras_socket_t sock) {
335   xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets;
336   gras_socket_t sock_iter = NULL;
337   unsigned int cursor;
338
339   XBT_IN;
340   VERB1("Close %p",sock);
341   if (sock == _gras_lastly_selected_socket) {
342      xbt_assert0(!gras_opt_trp_nomoredata_on_close || !sock->moredata,
343                  "Closing a socket having more data in buffer while the nomoredata_on_close option is activated");
344
345      if (sock->moredata)
346        CRITICAL0("Closing a socket having more data in buffer. Option nomoredata_on_close disabled, so continuing.");
347      _gras_lastly_selected_socket=NULL;
348   }
349
350   /* FIXME: Issue an event when the socket is closed */
351         DEBUG1("sockets pointer before %p",sockets);
352   if (sock) {
353         /* FIXME: Cannot get the dynar mutex, because it can be already locked */
354 //              _xbt_dynar_foreach(sockets,cursor,sock_iter) {
355                 for (cursor=0; cursor< xbt_dynar_length(sockets); cursor++)  {
356                         _xbt_dynar_cursor_get(sockets,&cursor,&sock_iter);
357                         if (sock == sock_iter) {
358                                 DEBUG2("remove sock cursor %d dize %lu\n",cursor,xbt_dynar_length(sockets));
359                                 xbt_dynar_cursor_rm(sockets,&cursor);
360                                 if (sock->plugin->socket_close)
361                                         (* sock->plugin->socket_close)(sock);
362
363                                 /* free the memory */
364                                 if (sock->peer_name)
365                                         free(sock->peer_name);
366                                 free(sock);
367                                 XBT_OUT;
368                                 return;
369                         }
370     }
371     WARN1("Ignoring request to free an unknown socket (%p). Execution stack:",sock);
372     xbt_backtrace_display_current();
373   }
374   XBT_OUT;
375 }
376
377 /**
378  * gras_trp_send:
379  *
380  * Send a bunch of bytes from on socket
381  * (stable if we know the storage will keep as is until the next trp_flush)
382  */
383 void
384 gras_trp_send(gras_socket_t sd, char *data, long int size, int stable) {
385   xbt_assert0(sd->outgoing,"Socket not suited for data send");
386   (*sd->plugin->send)(sd,data,size,stable);
387 }
388 /**
389  * gras_trp_chunk_recv:
390  *
391  * Receive a bunch of bytes from a socket
392  */
393 void
394 gras_trp_recv(gras_socket_t sd, char *data, long int size) {
395   xbt_assert0(sd->incoming,"Socket not suited for data receive");
396   (sd->plugin->recv)(sd,data,size);
397 }
398
399 /**
400  * gras_trp_flush:
401  *
402  * Make sure all pending communications are done
403  */
404 void
405 gras_trp_flush(gras_socket_t sd) {
406   if (sd->plugin->flush)
407     (sd->plugin->flush)(sd);
408 }
409
410 gras_trp_plugin_t
411 gras_trp_plugin_get_by_name(const char *name){
412   return xbt_dict_get(_gras_trp_plugins,name);
413 }
414
415 int gras_socket_my_port  (gras_socket_t sock) {
416   return sock->port;
417 }
418 int   gras_socket_peer_port(gras_socket_t sock) {
419   return sock->peer_port;
420 }
421 char *gras_socket_peer_name(gras_socket_t sock) {
422   return sock->peer_name;
423 }
424 char *gras_socket_peer_proc(gras_socket_t sock) {
425   return sock->peer_proc;
426 }
427
428 void gras_socket_peer_proc_set(gras_socket_t sock,char*peer_proc) {
429   sock->peer_proc = peer_proc;
430 }
431
432 /** \brief Check if the provided socket is a measurement one (or a regular one) */
433 int gras_socket_is_meas(gras_socket_t sock) {
434   return sock->meas;
435 }
436
437 /** \brief Send a chunk of (random) data over a measurement socket
438  *
439  * @param peer measurement socket to use for the experiment
440  * @param timeout timeout (in seconds)
441  * @param msg_size size of each chunk sent over the socket (in bytes).
442  * @param msg_amount how many of these packets you want to send.
443  *
444  * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
445  * each side of the socket should be paired.
446  *
447  * The exchanged data is zeroed to make sure it's initialized, but
448  * there is no way to control what is sent (ie, you cannot use these
449  * functions to exchange data out of band).
450  *
451  * @warning: in SimGrid version 3.1 and previous, the numerical arguments
452  *           were the total amount of data to send and the msg_size. This
453  *           was changed for the fool wanting to send more than MAXINT
454  *           bytes in a fat pipe.
455  */
456 void gras_socket_meas_send(gras_socket_t peer,
457                            unsigned int timeout,
458                            unsigned long int msg_size,
459                            unsigned long int msg_amount) {
460   char *chunk=NULL;
461   unsigned long int sent_sofar;
462
463   XBT_IN;
464
465   if (gras_if_RL())
466     chunk=xbt_malloc0(msg_size);
467
468   xbt_assert0(peer->meas,"Asked to send measurement data on a regular socket");
469   xbt_assert0(peer->outgoing,"Socket not suited for data send (was created with gras_socket_server(), not gras_socket_client())");
470
471   for (sent_sofar=0; sent_sofar < msg_amount; sent_sofar++) {
472      CDEBUG5(gras_trp_meas,"Sent %lu msgs of %lu (size of each: %ld) to %s:%d",
473              sent_sofar,msg_amount,msg_size,
474              gras_socket_peer_name(peer), gras_socket_peer_port(peer));
475      (*peer->plugin->raw_send)(peer,chunk,msg_size);
476   }
477   CDEBUG5(gras_trp_meas,"Sent %lu msgs of %lu (size of each: %ld) to %s:%d",
478           sent_sofar,msg_amount,msg_size,
479           gras_socket_peer_name(peer), gras_socket_peer_port(peer));
480
481   if (gras_if_RL())
482     free(chunk);
483
484   XBT_OUT;
485 }
486
487 /** \brief Receive a chunk of data over a measurement socket
488  *
489  * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
490  * each side of the socket should be paired.
491  *
492  * @warning: in SimGrid version 3.1 and previous, the numerical arguments
493  *           were the total amount of data to send and the msg_size. This
494  *           was changed for the fool wanting to send more than MAXINT
495  *           bytes in a fat pipe.
496  */
497 void gras_socket_meas_recv(gras_socket_t peer,
498                            unsigned int timeout,
499                            unsigned long int msg_size,
500                            unsigned long int msg_amount){
501
502   char *chunk=NULL;
503   unsigned long int got_sofar;
504
505   XBT_IN;
506
507   if (gras_if_RL())
508     chunk = xbt_malloc(msg_size);
509
510   xbt_assert0(peer->meas,
511               "Asked to receive measurement data on a regular socket");
512   xbt_assert0(peer->incoming,"Socket not suited for data receive");
513
514   for (got_sofar=0; got_sofar < msg_amount; got_sofar ++) {
515      CDEBUG5(gras_trp_meas,"Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
516              got_sofar,msg_amount,msg_size,
517              gras_socket_peer_name(peer), gras_socket_peer_port(peer));
518      (peer->plugin->raw_recv)(peer,chunk,msg_size);
519   }
520   CDEBUG5(gras_trp_meas,"Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
521           got_sofar,msg_amount,msg_size,
522           gras_socket_peer_name(peer), gras_socket_peer_port(peer));
523
524   if (gras_if_RL())
525     free(chunk);
526   XBT_OUT;
527 }
528
529 /**
530  * \brief Something similar to the good old accept system call.
531  *
532  * Make sure that there is someone speaking to the provided server socket.
533  * In RL, it does an accept(2) and return the result as last argument.
534  * In SG, as accepts are useless, it returns the provided argument as result.
535  * You should thus test whether (peer != accepted) before closing both of them.
536  *
537  * You should only call this on measurement sockets. It is automatically
538  * done for regular sockets, but you usually want more control about
539  * what's going on with measurement sockets.
540  */
541 gras_socket_t gras_socket_meas_accept(gras_socket_t peer){
542   gras_socket_t res;
543
544   xbt_assert0(peer->meas,
545               "No need to accept on non-measurement sockets (it's automatic)");
546
547   if (!peer->accepting) {
548     /* nothing to accept here (must be in SG) */
549     /* BUG: FIXME: this is BAD! it makes tricky to free the accepted socket*/
550     return peer;
551   }
552
553   res = (peer->plugin->socket_accept)(peer);
554   res->meas = peer->meas;
555   CDEBUG1(gras_trp_meas,"meas_accepted onto %d",res->sd);
556
557   return res;
558 }
559
560
561 /*
562  * Creating procdata for this module
563  */
564 static void *gras_trp_procdata_new(void) {
565    gras_trp_procdata_t res = xbt_new(s_gras_trp_procdata_t,1);
566
567    res->name = xbt_strdup("gras_trp");
568    res->name_len = 0;
569    res->sockets = xbt_dynar_new_sync(sizeof(gras_socket_t*), NULL);
570    res->myport = 0;
571
572    return (void*)res;
573 }
574
575 /*
576  * Freeing procdata for this module
577  */
578 static void gras_trp_procdata_free(void *data) {
579   gras_trp_procdata_t res = (gras_trp_procdata_t)data;
580
581   xbt_dynar_free(&( res->sockets ));
582   free(res->name);
583   free(res);
584 }
585
586 void gras_trp_socketset_dump(const char *name) {
587   gras_trp_procdata_t procdata =
588     (gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
589
590   unsigned int it;
591   gras_socket_t s;
592
593   INFO1("** Dump the socket set %s",name);
594   xbt_dynar_foreach(procdata->sockets, it, s) {
595     INFO4("  %p -> %s:%d %s",
596           s,gras_socket_peer_name(s),gras_socket_peer_port(s),
597           s->valid?"(valid)":"(peer dead)");
598   }
599   INFO1("** End of socket set %s",name);
600 }
601
602 /*
603  * Module registration
604  */
605 int gras_trp_libdata_id;
606 void gras_trp_register() {
607    gras_trp_libdata_id = gras_procdata_add("gras_trp",gras_trp_procdata_new, gras_trp_procdata_free);
608 }
609
610 int gras_os_myport(void)  {
611    return ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport;
612 }
613