Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
7936bf16144c8a690d4c58c024993dfe76a3671b
[simgrid.git] / src / gras / Transport / transport.c
1 /* transport - low level communication                                      */
2
3 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 /***
10  *** Options
11  ***/
12 int gras_opt_trp_nomoredata_on_close = 0;
13
14 #include "xbt/ex.h"
15 #include "xbt/peer.h"
16 #include "portable.h"
17 #include "gras/Transport/transport_private.h"
18 #include "gras/Msg/msg_interface.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp, gras,
21                                 "Conveying bytes over the network");
22 XBT_LOG_NEW_SUBCATEGORY(gras_trp_meas, gras_trp,
23                         "Conveying bytes over the network without formating for perf measurements");
24 static short int _gras_trp_started = 0;
25
26 static xbt_dict_t _gras_trp_plugins;    /* All registered plugins */
27 static void gras_trp_plugin_free(void *p);      /* free one of the plugins */
28
29 static void gras_trp_plugin_new(const char *name, gras_trp_setup_t setup)
30 {
31   xbt_ex_t e;
32
33   gras_trp_plugin_t plug = xbt_new0(s_gras_trp_plugin_t, 1);
34
35   XBT_DEBUG("Create plugin %s", name);
36
37   plug->name = xbt_strdup(name);
38
39   TRY {
40     setup(plug);
41   } CATCH(e) {
42     if (e.category == mismatch_error) {
43       /* SG plugin raise mismatch when in RL mode (and vice versa) */
44       free(plug->name);
45       free(plug);
46       plug = NULL;
47       xbt_ex_free(e);
48     } else {
49       RETHROW;
50     }
51   }
52
53   if (plug)
54     xbt_dict_set(_gras_trp_plugins, name, plug, gras_trp_plugin_free);
55 }
56
57 void gras_trp_init(void)
58 {
59   if (!_gras_trp_started) {
60     /* make room for all plugins */
61     _gras_trp_plugins = xbt_dict_new();
62
63 #ifdef HAVE_WINSOCK2_H
64     /* initialize the windows mechanism */
65     {
66       WORD wVersionRequested;
67       WSADATA wsaData;
68
69       wVersionRequested = MAKEWORD(2, 0);
70       int res;
71       res = WSAStartup(wVersionRequested, &wsaData);
72       xbt_assert0(res == 0, "Cannot find a usable WinSock DLL");
73
74       /* Confirm that the WinSock DLL supports 2.0. */
75       /* Note that if the DLL supports versions greater    */
76       /* than 2.0 in addition to 2.0, it will still return */
77       /* 2.0 in wVersion since that is the version we      */
78       /* requested.                                        */
79
80       xbt_assert0(LOBYTE(wsaData.wVersion) == 2 &&
81                   HIBYTE(wsaData.wVersion) == 0,
82                   "Cannot find a usable WinSock DLL");
83       XBT_INFO("Found and initialized winsock2");
84     }                           /* The WinSock DLL is acceptable. Proceed. */
85 #elif HAVE_WINSOCK_H
86     {
87       WSADATA wsaData;
88       int res;
89       res = WSAStartup(0x0101, &wsaData);
90       xbt_assert0(res == 0, "Cannot find a usable WinSock DLL");
91       XBT_INFO("Found and initialized winsock");
92     }
93 #endif
94
95     /* Add plugins */
96     gras_trp_plugin_new("file", gras_trp_file_setup);
97     gras_trp_plugin_new("sg", gras_trp_sg_setup);
98     gras_trp_plugin_new("tcp", gras_trp_tcp_setup);
99   }
100
101   _gras_trp_started++;
102 }
103
104 void gras_trp_exit(void)
105 {
106   XBT_DEBUG("gras_trp value %d", _gras_trp_started);
107   if (_gras_trp_started == 0) {
108     return;
109   }
110
111   if (--_gras_trp_started == 0) {
112 #ifdef HAVE_WINSOCK_H
113     if (WSACleanup() == SOCKET_ERROR) {
114       if (WSAGetLastError() == WSAEINPROGRESS) {
115         WSACancelBlockingCall();
116         WSACleanup();
117       }
118     }
119 #endif
120
121     /* Delete the plugins */
122     xbt_dict_free(&_gras_trp_plugins);
123   }
124 }
125
126
127 void gras_trp_plugin_free(void *p)
128 {
129   gras_trp_plugin_t plug = p;
130
131   if (plug) {
132     if (plug->exit) {
133       plug->exit(plug);
134     } else if (plug->data) {
135       XBT_DEBUG("Plugin %s lacks exit(). Free data anyway.", plug->name);
136       free(plug->data);
137     }
138
139     free(plug->name);
140     free(plug);
141   }
142 }
143
144
145 /**
146  * gras_trp_socket_new:
147  *
148  * Malloc a new socket, and initialize it with defaults
149  */
150 void gras_trp_socket_new(int incoming, gras_socket_t * dst)
151 {
152
153   gras_socket_t sock = xbt_new0(s_gras_socket_t, 1);
154
155   XBT_VERB("Create a new socket (%p)", (void *) sock);
156
157   sock->plugin = NULL;
158
159   sock->incoming = incoming ? 1 : 0;
160   sock->outgoing = incoming ? 0 : 1;
161   sock->accepting = incoming ? 1 : 0;
162   sock->meas = 0;
163   sock->recvd = 0;
164   sock->valid = 1;
165   sock->moredata = 0;
166
167   sock->refcount = 1;
168   sock->sd = -1;
169
170   sock->data = NULL;
171   sock->bufdata = NULL;
172
173   *dst = sock;
174
175   XBT_OUT();
176 }
177
178 /**
179  * @brief Opens a server socket and makes it ready to be listened to.
180  * @param port: port on which you want to listen
181  * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
182  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
183  *
184  * In real life, you'll get a TCP socket.
185  */
186 gras_socket_t
187 gras_socket_server_ext(unsigned short port,
188                        unsigned long int buf_size, int measurement)
189 {
190
191   xbt_ex_t e;
192   gras_trp_plugin_t trp;
193   gras_socket_t sock;
194
195   XBT_DEBUG("Create a server socket from plugin %s on port %d",
196          gras_if_RL()? "tcp" : "sg", port);
197   trp = gras_trp_plugin_get_by_name(gras_if_SG()? "sg" : "tcp");
198
199   /* defaults settings */
200   gras_trp_socket_new(1, &sock);
201   sock->plugin = trp;
202   sock->buf_size = buf_size > 0 ? buf_size : 32 * 1024;
203   sock->meas = measurement;
204
205   /* Call plugin socket creation function */
206   XBT_DEBUG("Prepare socket with plugin (fct=%p)", trp->socket_server);
207   TRY {
208     trp->socket_server(trp, port, sock);
209     XBT_DEBUG("in=%c out=%c accept=%c",
210            sock->incoming ? 'y' : 'n',
211            sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
212   } CATCH(e) {
213
214     free(sock);
215     RETHROW;
216   }
217
218   if (!measurement)
219     ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport
220         = port;
221   xbt_dynar_push(((gras_trp_procdata_t)
222                   gras_libdata_by_id(gras_trp_libdata_id))->sockets,
223                  &sock);
224
225   gras_msg_listener_awake();
226   return sock;
227 }
228
229 /**
230  * @brief Opens a server socket on any port in the given range
231  *
232  * @param minport: first port we will try
233  * @param maxport: last port we will try
234  * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
235  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
236  *
237  * If none of the provided ports works, raises the exception got when trying the last possibility
238  */
239 gras_socket_t
240 gras_socket_server_range(unsigned short minport, unsigned short maxport,
241                          unsigned long int buf_size, int measurement)
242 {
243
244   int port;
245   gras_socket_t res = NULL;
246   xbt_ex_t e;
247
248   for (port = minport; port < maxport; port++) {
249     TRY {
250       res = gras_socket_server_ext(port, buf_size, measurement);
251     }
252     CATCH(e) {
253       if (port == maxport)
254         RETHROW;
255       xbt_ex_free(e);
256     }
257     if (res)
258       return res;
259   }
260   THROW_IMPOSSIBLE;
261 }
262
263 /**
264  * @brief Opens a client socket to a remote host.
265  * @param host: who you want to connect to
266  * @param port: where you want to connect to on this host
267  * @param buf_size: size of the buffer (in bytes) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
268  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
269  *
270  * In real life, you'll get a TCP socket.
271  */
272 gras_socket_t
273 gras_socket_client_ext(const char *host,
274                        unsigned short port,
275                        unsigned long int buf_size, int measurement)
276 {
277
278   xbt_ex_t e;
279   gras_trp_plugin_t trp;
280   gras_socket_t sock;
281
282   trp = gras_trp_plugin_get_by_name(gras_if_SG()? "sg" : "tcp");
283
284   XBT_DEBUG("Create a client socket from plugin %s",
285          gras_if_RL()? "tcp" : "sg");
286   /* defaults settings */
287   gras_trp_socket_new(0, &sock);
288   sock->plugin = trp;
289   sock->buf_size = buf_size > 0 ? buf_size : 32 * 1024;
290   sock->meas = measurement;
291
292   /* plugin-specific */
293   TRY {
294     (*trp->socket_client) (trp,host,port,sock);
295     XBT_DEBUG("in=%c out=%c accept=%c",
296            sock->incoming ? 'y' : 'n',
297            sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
298   } CATCH(e) {
299     free(sock);
300     RETHROW;
301   }
302   xbt_dynar_push(((gras_trp_procdata_t)
303                   gras_libdata_by_id(gras_trp_libdata_id))->sockets,
304                  &sock);
305   gras_msg_listener_awake();
306   return sock;
307 }
308
309 /**
310  * @brief Opens a server socket and make it ready to be listened to.
311  *
312  * In real life, you'll get a TCP socket.
313  */
314 gras_socket_t gras_socket_server(unsigned short port)
315 {
316   return gras_socket_server_ext(port, 32 * 1024, 0);
317 }
318
319 /** @brief Opens a client socket to a remote host */
320 gras_socket_t gras_socket_client(const char *host, unsigned short port)
321 {
322   return gras_socket_client_ext(host, port, 0, 0);
323 }
324
325 /** @brief Opens a client socket to a remote host specified as '\a host:\a port' */
326 gras_socket_t gras_socket_client_from_string(const char *host)
327 {
328   xbt_peer_t p = xbt_peer_from_string(host);
329   gras_socket_t res = gras_socket_client_ext(p->name, p->port, 0, 0);
330   xbt_peer_free(p);
331   return res;
332 }
333
334 void gras_socket_close_voidp(void *sock)
335 {
336   gras_socket_close((gras_socket_t) sock);
337 }
338
339 /** \brief Close socket */
340 void gras_socket_close(gras_socket_t sock)
341 {
342   if (--sock->refcount)
343     return;
344
345   xbt_dynar_t sockets =
346       ((gras_trp_procdata_t)
347        gras_libdata_by_id(gras_trp_libdata_id))->sockets;
348   gras_socket_t sock_iter = NULL;
349   unsigned int cursor;
350
351   XBT_IN("");
352   XBT_VERB("Close %p", sock);
353   if (sock == _gras_lastly_selected_socket) {
354     xbt_assert0(!gras_opt_trp_nomoredata_on_close || !sock->moredata,
355                 "Closing a socket having more data in buffer while the nomoredata_on_close option is activated");
356
357     if (sock->moredata)
358       XBT_CRITICAL
359           ("Closing a socket having more data in buffer. Option nomoredata_on_close disabled, so continuing.");
360     _gras_lastly_selected_socket = NULL;
361   }
362
363   /* FIXME: Issue an event when the socket is closed */
364   XBT_DEBUG("sockets pointer before %p", sockets);
365   if (sock) {
366     /* FIXME: Cannot get the dynar mutex, because it can be already locked */
367 //              _xbt_dynar_foreach(sockets,cursor,sock_iter) {
368     for (cursor = 0; cursor < xbt_dynar_length(sockets); cursor++) {
369       _xbt_dynar_cursor_get(sockets, cursor, &sock_iter);
370       if (sock == sock_iter) {
371         XBT_DEBUG("remove sock cursor %d dize %lu\n", cursor,
372                xbt_dynar_length(sockets));
373         xbt_dynar_cursor_rm(sockets, &cursor);
374         if (sock->plugin->socket_close)
375           (*sock->plugin->socket_close) (sock);
376
377         /* free the memory */
378         free(sock);
379         XBT_OUT();
380         return;
381       }
382     }
383     XBT_WARN
384         ("Ignoring request to free an unknown socket (%p). Execution stack:",
385          sock);
386     xbt_backtrace_display_current();
387   }
388   XBT_OUT();
389 }
390
391 /**
392  * gras_trp_send:
393  *
394  * Send a bunch of bytes from on socket
395  * (stable if we know the storage will keep as is until the next trp_flush)
396  */
397 void gras_trp_send(gras_socket_t sd, char *data, long int size, int stable)
398 {
399   xbt_assert0(sd->outgoing, "Socket not suited for data send");
400   (*sd->plugin->send) (sd, data, size, stable);
401 }
402
403 /**
404  * gras_trp_chunk_recv:
405  *
406  * Receive a bunch of bytes from a socket
407  */
408 void gras_trp_recv(gras_socket_t sd, char *data, long int size)
409 {
410   xbt_assert0(sd->incoming, "Socket not suited for data receive");
411   (sd->plugin->recv) (sd, data, size);
412 }
413
414 /**
415  * gras_trp_flush:
416  *
417  * Make sure all pending communications are done
418  */
419 void gras_trp_flush(gras_socket_t sd)
420 {
421   if (sd->plugin->flush)
422     (sd->plugin->flush) (sd);
423 }
424
425 gras_trp_plugin_t gras_trp_plugin_get_by_name(const char *name)
426 {
427   return xbt_dict_get(_gras_trp_plugins, name);
428 }
429
430 int gras_socket_my_port(gras_socket_t sock)
431 {
432   if (!sock->plugin->my_port)
433     THROW1(unknown_error,0,"Function my_port unimplemented in plugin %s",sock->plugin->name);
434   return (*sock->plugin->my_port)(sock);
435
436 }
437
438 int gras_socket_peer_port(gras_socket_t sock)
439 {
440   if (!sock->plugin->peer_port)
441     THROW1(unknown_error,0,"Function peer_port unimplemented in plugin %s",sock->plugin->name);
442   return (*sock->plugin->peer_port)(sock);
443 }
444
445 const char *gras_socket_peer_name(gras_socket_t sock)
446 {
447   xbt_assert(sock->plugin);
448   return (*sock->plugin->peer_name)(sock);
449 }
450
451 const char *gras_socket_peer_proc(gras_socket_t sock)
452 {
453   return (*sock->plugin->peer_proc)(sock);
454 }
455
456 void gras_socket_peer_proc_set(gras_socket_t sock, char *peer_proc)
457 {
458   return (*sock->plugin->peer_proc_set)(sock,peer_proc);
459 }
460
461 /** \brief Check if the provided socket is a measurement one (or a regular one) */
462 int gras_socket_is_meas(gras_socket_t sock)
463 {
464   return sock->meas;
465 }
466
467 /** \brief Send a chunk of (random) data over a measurement socket
468  *
469  * @param peer measurement socket to use for the experiment
470  * @param timeout timeout (in seconds)
471  * @param msg_size size of each chunk sent over the socket (in bytes).
472  * @param msg_amount how many of these packets you want to send.
473  *
474  * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
475  * each side of the socket should be paired.
476  *
477  * The exchanged data is zeroed to make sure it's initialized, but
478  * there is no way to control what is sent (ie, you cannot use these
479  * functions to exchange data out of band).
480  *
481  * @warning: in SimGrid version 3.1 and previous, the numerical arguments
482  *           were the total amount of data to send and the msg_size. This
483  *           was changed for the fool wanting to send more than MAXINT
484  *           bytes in a fat pipe.
485  */
486 void gras_socket_meas_send(gras_socket_t peer,
487                            unsigned int timeout,
488                            unsigned long int msg_size,
489                            unsigned long int msg_amount)
490 {
491   char *chunk = NULL;
492   unsigned long int sent_sofar;
493
494   XBT_IN("");
495   THROW0(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
496       "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
497   if (gras_if_RL())
498     chunk = xbt_malloc0(msg_size);
499
500   xbt_assert0(peer->meas,
501               "Asked to send measurement data on a regular socket");
502   xbt_assert0(peer->outgoing,
503               "Socket not suited for data send (was created with gras_socket_server(), not gras_socket_client())");
504
505   for (sent_sofar = 0; sent_sofar < msg_amount; sent_sofar++) {
506     XBT_CDEBUG(gras_trp_meas,
507             "Sent %lu msgs of %lu (size of each: %ld) to %s:%d",
508             sent_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
509             gras_socket_peer_port(peer));
510     (*peer->plugin->raw_send) (peer, chunk, msg_size);
511   }
512   XBT_CDEBUG(gras_trp_meas,
513           "Sent %lu msgs of %lu (size of each: %ld) to %s:%d", sent_sofar,
514           msg_amount, msg_size, gras_socket_peer_name(peer),
515           gras_socket_peer_port(peer));
516
517   if (gras_if_RL())
518     free(chunk);
519
520   XBT_OUT();
521 }
522
523 /** \brief Receive a chunk of data over a measurement socket
524  *
525  * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
526  * each side of the socket should be paired.
527  *
528  * @warning: in SimGrid version 3.1 and previous, the numerical arguments
529  *           were the total amount of data to send and the msg_size. This
530  *           was changed for the fool wanting to send more than MAXINT
531  *           bytes in a fat pipe.
532  */
533 void gras_socket_meas_recv(gras_socket_t peer,
534                            unsigned int timeout,
535                            unsigned long int msg_size,
536                            unsigned long int msg_amount)
537 {
538
539   char *chunk = NULL;
540   unsigned long int got_sofar;
541
542   XBT_IN("");
543   THROW0(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
544       "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
545
546   if (gras_if_RL())
547     chunk = xbt_malloc(msg_size);
548
549   xbt_assert0(peer->meas,
550               "Asked to receive measurement data on a regular socket");
551   xbt_assert0(peer->incoming, "Socket not suited for data receive");
552
553   for (got_sofar = 0; got_sofar < msg_amount; got_sofar++) {
554     XBT_CDEBUG(gras_trp_meas,
555             "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
556             got_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
557             gras_socket_peer_port(peer));
558     (peer->plugin->raw_recv) (peer, chunk, msg_size);
559   }
560   XBT_CDEBUG(gras_trp_meas,
561           "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
562           got_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
563           gras_socket_peer_port(peer));
564
565   if (gras_if_RL())
566     free(chunk);
567   XBT_OUT();
568 }
569
570 /**
571  * \brief Something similar to the good old accept system call.
572  *
573  * Make sure that there is someone speaking to the provided server socket.
574  * In RL, it does an accept(2) and return the result as last argument.
575  * In SG, as accepts are useless, it returns the provided argument as result.
576  * You should thus test whether (peer != accepted) before closing both of them.
577  *
578  * You should only call this on measurement sockets. It is automatically
579  * done for regular sockets, but you usually want more control about
580  * what's going on with measurement sockets.
581  */
582 gras_socket_t gras_socket_meas_accept(gras_socket_t peer)
583 {
584   gras_socket_t res;
585   THROW0(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
586       "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
587
588   xbt_assert0(peer->meas,
589               "No need to accept on non-measurement sockets (it's automatic)");
590
591   if (!peer->accepting) {
592     /* nothing to accept here (must be in SG) */
593     /* BUG: FIXME: this is BAD! it makes tricky to free the accepted socket */
594     return peer;
595   }
596
597   res = (peer->plugin->socket_accept) (peer);
598   res->meas = peer->meas;
599   XBT_CDEBUG(gras_trp_meas, "meas_accepted onto %d", res->sd);
600
601   return res;
602 }
603
604
605 /*
606  * Creating procdata for this module
607  */
608 static void *gras_trp_procdata_new(void)
609 {
610   gras_trp_procdata_t res = xbt_new(s_gras_trp_procdata_t, 1);
611
612   res->name = xbt_strdup("gras_trp");
613   res->name_len = 0;
614   res->sockets = xbt_dynar_new_sync(sizeof(gras_socket_t *), NULL);
615   res->myport = 0;
616
617   return (void *) res;
618 }
619
620 /*
621  * Freeing procdata for this module
622  */
623 static void gras_trp_procdata_free(void *data)
624 {
625   gras_trp_procdata_t res = (gras_trp_procdata_t) data;
626
627   xbt_dynar_free(&(res->sockets));
628   free(res->name);
629   free(res);
630 }
631
632 void gras_trp_socketset_dump(const char *name)
633 {
634   gras_trp_procdata_t procdata =
635       (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
636
637   unsigned int it;
638   gras_socket_t s;
639
640   XBT_INFO("** Dump the socket set %s", name);
641   xbt_dynar_foreach(procdata->sockets, it, s) {
642     XBT_INFO("  %p -> %s:%d %s",
643           s, gras_socket_peer_name(s), gras_socket_peer_port(s),
644           s->valid ? "(valid)" : "(peer dead)");
645   }
646   XBT_INFO("** End of socket set %s", name);
647 }
648
649 /*
650  * Module registration
651  */
652 int gras_trp_libdata_id;
653 void gras_trp_register()
654 {
655   gras_trp_libdata_id =
656       gras_procdata_add("gras_trp", gras_trp_procdata_new,
657                         gras_trp_procdata_free);
658 }
659
660 int gras_os_myport(void)
661 {
662   return ((gras_trp_procdata_t)
663           gras_libdata_by_id(gras_trp_libdata_id))->myport;
664 }