Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
a3c25d5dbc0bdceec302239e3370d8f1990c8b19
[simgrid.git] / src / gras / Transport / transport.c
1 /* transport - low level communication                                      */
2
3 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 /***
10  *** Options
11  ***/
12 int gras_opt_trp_nomoredata_on_close = 0;
13
14 #include "xbt/ex.h"
15 #include "xbt/peer.h"
16 #include "portable.h"
17 #include "gras/Transport/transport_private.h"
18 #include "gras/Msg/msg_interface.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp, gras,
21                                 "Conveying bytes over the network");
22 XBT_LOG_NEW_SUBCATEGORY(gras_trp_meas, gras_trp,
23                         "Conveying bytes over the network without formating for perf measurements");
24 static short int _gras_trp_started = 0;
25
26 static xbt_dict_t _gras_trp_plugins;    /* All registered plugins */
27 static void gras_trp_plugin_free(void *p);      /* free one of the plugins */
28
29 static void gras_trp_plugin_new(const char *name, gras_trp_setup_t setup)
30 {
31   xbt_ex_t e;
32
33   gras_trp_plugin_t plug = xbt_new0(s_gras_trp_plugin_t, 1);
34
35   XBT_DEBUG("Create plugin %s", name);
36
37   plug->name = xbt_strdup(name);
38
39   TRY {
40     setup(plug);
41   } CATCH(e) {
42     if (e.category == mismatch_error) {
43       /* SG plugin raise mismatch when in RL mode (and vice versa) */
44       free(plug->name);
45       free(plug);
46       plug = NULL;
47       xbt_ex_free(e);
48     } else {
49       RETHROW;
50     }
51   }
52
53   if (plug)
54     xbt_dict_set(_gras_trp_plugins, name, plug, gras_trp_plugin_free);
55 }
56
57 void gras_trp_init(void)
58 {
59   if (!_gras_trp_started) {
60     /* make room for all plugins */
61     _gras_trp_plugins = xbt_dict_new();
62
63 #ifdef HAVE_WINSOCK2_H
64     /* initialize the windows mechanism */
65     {
66       WORD wVersionRequested;
67       WSADATA wsaData;
68
69       wVersionRequested = MAKEWORD(2, 0);
70       xbt_assert0(WSAStartup(wVersionRequested, &wsaData) == 0,
71                   "Cannot find a usable WinSock DLL");
72
73       /* Confirm that the WinSock DLL supports 2.0. */
74       /* Note that if the DLL supports versions greater    */
75       /* than 2.0 in addition to 2.0, it will still return */
76       /* 2.0 in wVersion since that is the version we      */
77       /* requested.                                        */
78
79       xbt_assert0(LOBYTE(wsaData.wVersion) == 2 &&
80                   HIBYTE(wsaData.wVersion) == 0,
81                   "Cannot find a usable WinSock DLL");
82       XBT_INFO("Found and initialized winsock2");
83     }                           /* The WinSock DLL is acceptable. Proceed. */
84 #elif HAVE_WINSOCK_H
85     {
86       WSADATA wsaData;
87       xbt_assert0(WSAStartup(0x0101, &wsaData) == 0,
88                   "Cannot find a usable WinSock DLL");
89       XBT_INFO("Found and initialized winsock");
90     }
91 #endif
92
93     /* Add plugins */
94     gras_trp_plugin_new("file", gras_trp_file_setup);
95     gras_trp_plugin_new("sg", gras_trp_sg_setup);
96     gras_trp_plugin_new("tcp", gras_trp_tcp_setup);
97   }
98
99   _gras_trp_started++;
100 }
101
102 void gras_trp_exit(void)
103 {
104   XBT_DEBUG("gras_trp value %d", _gras_trp_started);
105   if (_gras_trp_started == 0) {
106     return;
107   }
108
109   if (--_gras_trp_started == 0) {
110 #ifdef HAVE_WINSOCK_H
111     if (WSACleanup() == SOCKET_ERROR) {
112       if (WSAGetLastError() == WSAEINPROGRESS) {
113         WSACancelBlockingCall();
114         WSACleanup();
115       }
116     }
117 #endif
118
119     /* Delete the plugins */
120     xbt_dict_free(&_gras_trp_plugins);
121   }
122 }
123
124
125 void gras_trp_plugin_free(void *p)
126 {
127   gras_trp_plugin_t plug = p;
128
129   if (plug) {
130     if (plug->exit) {
131       plug->exit(plug);
132     } else if (plug->data) {
133       XBT_DEBUG("Plugin %s lacks exit(). Free data anyway.", plug->name);
134       free(plug->data);
135     }
136
137     free(plug->name);
138     free(plug);
139   }
140 }
141
142
143 /**
144  * gras_trp_socket_new:
145  *
146  * Malloc a new socket, and initialize it with defaults
147  */
148 void gras_trp_socket_new(int incoming, gras_socket_t * dst)
149 {
150
151   gras_socket_t sock = xbt_new0(s_gras_socket_t, 1);
152
153   XBT_VERB("Create a new socket (%p)", (void *) sock);
154
155   sock->plugin = NULL;
156
157   sock->incoming = incoming ? 1 : 0;
158   sock->outgoing = incoming ? 0 : 1;
159   sock->accepting = incoming ? 1 : 0;
160   sock->meas = 0;
161   sock->recvd = 0;
162   sock->valid = 1;
163   sock->moredata = 0;
164
165   sock->refcount = 1;
166   sock->sd = -1;
167
168   sock->data = NULL;
169   sock->bufdata = NULL;
170
171   *dst = sock;
172
173   XBT_OUT;
174 }
175
176 /**
177  * @brief Opens a server socket and makes it ready to be listened to.
178  * @param port: port on which you want to listen
179  * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
180  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
181  *
182  * In real life, you'll get a TCP socket.
183  */
184 gras_socket_t
185 gras_socket_server_ext(unsigned short port,
186                        unsigned long int buf_size, int measurement)
187 {
188
189   xbt_ex_t e;
190   gras_trp_plugin_t trp;
191   gras_socket_t sock;
192
193   XBT_DEBUG("Create a server socket from plugin %s on port %d",
194          gras_if_RL()? "tcp" : "sg", port);
195   trp = gras_trp_plugin_get_by_name(gras_if_SG()? "sg" : "tcp");
196
197   /* defaults settings */
198   gras_trp_socket_new(1, &sock);
199   sock->plugin = trp;
200   sock->buf_size = buf_size > 0 ? buf_size : 32 * 1024;
201   sock->meas = measurement;
202
203   /* Call plugin socket creation function */
204   XBT_DEBUG("Prepare socket with plugin (fct=%p)", trp->socket_server);
205   TRY {
206     trp->socket_server(trp, port, sock);
207     XBT_DEBUG("in=%c out=%c accept=%c",
208            sock->incoming ? 'y' : 'n',
209            sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
210   } CATCH(e) {
211
212     free(sock);
213     RETHROW;
214   }
215
216   if (!measurement)
217     ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport
218         = port;
219   xbt_dynar_push(((gras_trp_procdata_t)
220                   gras_libdata_by_id(gras_trp_libdata_id))->sockets,
221                  &sock);
222
223   gras_msg_listener_awake();
224   return sock;
225 }
226
227 /**
228  * @brief Opens a server socket on any port in the given range
229  *
230  * @param minport: first port we will try
231  * @param maxport: last port we will try
232  * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
233  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
234  *
235  * If none of the provided ports works, raises the exception got when trying the last possibility
236  */
237 gras_socket_t
238 gras_socket_server_range(unsigned short minport, unsigned short maxport,
239                          unsigned long int buf_size, int measurement)
240 {
241
242   int port;
243   gras_socket_t res = NULL;
244   xbt_ex_t e;
245
246   for (port = minport; port < maxport; port++) {
247     TRY {
248       res = gras_socket_server_ext(port, buf_size, measurement);
249     }
250     CATCH(e) {
251       if (port == maxport)
252         RETHROW;
253       xbt_ex_free(e);
254     }
255     if (res)
256       return res;
257   }
258   THROW_IMPOSSIBLE;
259 }
260
261 /**
262  * @brief Opens a client socket to a remote host.
263  * @param host: who you want to connect to
264  * @param port: where you want to connect to on this host
265  * @param buf_size: size of the buffer (in bytes) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
266  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
267  *
268  * In real life, you'll get a TCP socket.
269  */
270 gras_socket_t
271 gras_socket_client_ext(const char *host,
272                        unsigned short port,
273                        unsigned long int buf_size, int measurement)
274 {
275
276   xbt_ex_t e;
277   gras_trp_plugin_t trp;
278   gras_socket_t sock;
279
280   trp = gras_trp_plugin_get_by_name(gras_if_SG()? "sg" : "tcp");
281
282   XBT_DEBUG("Create a client socket from plugin %s",
283          gras_if_RL()? "tcp" : "sg");
284   /* defaults settings */
285   gras_trp_socket_new(0, &sock);
286   sock->plugin = trp;
287   sock->buf_size = buf_size > 0 ? buf_size : 32 * 1024;
288   sock->meas = measurement;
289
290   /* plugin-specific */
291   TRY {
292     (*trp->socket_client) (trp,host,port,sock);
293     XBT_DEBUG("in=%c out=%c accept=%c",
294            sock->incoming ? 'y' : 'n',
295            sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
296   } CATCH(e) {
297     free(sock);
298     RETHROW;
299   }
300   xbt_dynar_push(((gras_trp_procdata_t)
301                   gras_libdata_by_id(gras_trp_libdata_id))->sockets,
302                  &sock);
303   gras_msg_listener_awake();
304   return sock;
305 }
306
307 /**
308  * @brief Opens a server socket and make it ready to be listened to.
309  *
310  * In real life, you'll get a TCP socket.
311  */
312 gras_socket_t gras_socket_server(unsigned short port)
313 {
314   return gras_socket_server_ext(port, 32 * 1024, 0);
315 }
316
317 /** @brief Opens a client socket to a remote host */
318 gras_socket_t gras_socket_client(const char *host, unsigned short port)
319 {
320   return gras_socket_client_ext(host, port, 0, 0);
321 }
322
323 /** @brief Opens a client socket to a remote host specified as '\a host:\a port' */
324 gras_socket_t gras_socket_client_from_string(const char *host)
325 {
326   xbt_peer_t p = xbt_peer_from_string(host);
327   gras_socket_t res = gras_socket_client_ext(p->name, p->port, 0, 0);
328   xbt_peer_free(p);
329   return res;
330 }
331
332 void gras_socket_close_voidp(void *sock)
333 {
334   gras_socket_close((gras_socket_t) sock);
335 }
336
337 /** \brief Close socket */
338 void gras_socket_close(gras_socket_t sock)
339 {
340   if (--sock->refcount)
341     return;
342
343   xbt_dynar_t sockets =
344       ((gras_trp_procdata_t)
345        gras_libdata_by_id(gras_trp_libdata_id))->sockets;
346   gras_socket_t sock_iter = NULL;
347   unsigned int cursor;
348
349   XBT_IN;
350   XBT_VERB("Close %p", sock);
351   if (sock == _gras_lastly_selected_socket) {
352     xbt_assert0(!gras_opt_trp_nomoredata_on_close || !sock->moredata,
353                 "Closing a socket having more data in buffer while the nomoredata_on_close option is activated");
354
355     if (sock->moredata)
356       XBT_CRITICAL
357           ("Closing a socket having more data in buffer. Option nomoredata_on_close disabled, so continuing.");
358     _gras_lastly_selected_socket = NULL;
359   }
360
361   /* FIXME: Issue an event when the socket is closed */
362   XBT_DEBUG("sockets pointer before %p", sockets);
363   if (sock) {
364     /* FIXME: Cannot get the dynar mutex, because it can be already locked */
365 //              _xbt_dynar_foreach(sockets,cursor,sock_iter) {
366     for (cursor = 0; cursor < xbt_dynar_length(sockets); cursor++) {
367       _xbt_dynar_cursor_get(sockets, cursor, &sock_iter);
368       if (sock == sock_iter) {
369         XBT_DEBUG("remove sock cursor %d dize %lu\n", cursor,
370                xbt_dynar_length(sockets));
371         xbt_dynar_cursor_rm(sockets, &cursor);
372         if (sock->plugin->socket_close)
373           (*sock->plugin->socket_close) (sock);
374
375         /* free the memory */
376         free(sock);
377         XBT_OUT;
378         return;
379       }
380     }
381     XBT_WARN
382         ("Ignoring request to free an unknown socket (%p). Execution stack:",
383          sock);
384     xbt_backtrace_display_current();
385   }
386   XBT_OUT;
387 }
388
389 /**
390  * gras_trp_send:
391  *
392  * Send a bunch of bytes from on socket
393  * (stable if we know the storage will keep as is until the next trp_flush)
394  */
395 void gras_trp_send(gras_socket_t sd, char *data, long int size, int stable)
396 {
397   xbt_assert0(sd->outgoing, "Socket not suited for data send");
398   (*sd->plugin->send) (sd, data, size, stable);
399 }
400
401 /**
402  * gras_trp_chunk_recv:
403  *
404  * Receive a bunch of bytes from a socket
405  */
406 void gras_trp_recv(gras_socket_t sd, char *data, long int size)
407 {
408   xbt_assert0(sd->incoming, "Socket not suited for data receive");
409   (sd->plugin->recv) (sd, data, size);
410 }
411
412 /**
413  * gras_trp_flush:
414  *
415  * Make sure all pending communications are done
416  */
417 void gras_trp_flush(gras_socket_t sd)
418 {
419   if (sd->plugin->flush)
420     (sd->plugin->flush) (sd);
421 }
422
423 gras_trp_plugin_t gras_trp_plugin_get_by_name(const char *name)
424 {
425   return xbt_dict_get(_gras_trp_plugins, name);
426 }
427
428 int gras_socket_my_port(gras_socket_t sock)
429 {
430   if (!sock->plugin->my_port)
431     THROW1(unknown_error,0,"Function my_port unimplemented in plugin %s",sock->plugin->name);
432   return (*sock->plugin->my_port)(sock);
433
434 }
435
436 int gras_socket_peer_port(gras_socket_t sock)
437 {
438   if (!sock->plugin->peer_port)
439     THROW1(unknown_error,0,"Function peer_port unimplemented in plugin %s",sock->plugin->name);
440   return (*sock->plugin->peer_port)(sock);
441 }
442
443 const char *gras_socket_peer_name(gras_socket_t sock)
444 {
445   xbt_assert(sock->plugin);
446   return (*sock->plugin->peer_name)(sock);
447 }
448
449 const char *gras_socket_peer_proc(gras_socket_t sock)
450 {
451   return (*sock->plugin->peer_proc)(sock);
452 }
453
454 void gras_socket_peer_proc_set(gras_socket_t sock, char *peer_proc)
455 {
456   return (*sock->plugin->peer_proc_set)(sock,peer_proc);
457 }
458
459 /** \brief Check if the provided socket is a measurement one (or a regular one) */
460 int gras_socket_is_meas(gras_socket_t sock)
461 {
462   return sock->meas;
463 }
464
465 /** \brief Send a chunk of (random) data over a measurement socket
466  *
467  * @param peer measurement socket to use for the experiment
468  * @param timeout timeout (in seconds)
469  * @param msg_size size of each chunk sent over the socket (in bytes).
470  * @param msg_amount how many of these packets you want to send.
471  *
472  * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
473  * each side of the socket should be paired.
474  *
475  * The exchanged data is zeroed to make sure it's initialized, but
476  * there is no way to control what is sent (ie, you cannot use these
477  * functions to exchange data out of band).
478  *
479  * @warning: in SimGrid version 3.1 and previous, the numerical arguments
480  *           were the total amount of data to send and the msg_size. This
481  *           was changed for the fool wanting to send more than MAXINT
482  *           bytes in a fat pipe.
483  */
484 void gras_socket_meas_send(gras_socket_t peer,
485                            unsigned int timeout,
486                            unsigned long int msg_size,
487                            unsigned long int msg_amount)
488 {
489   char *chunk = NULL;
490   unsigned long int sent_sofar;
491
492   XBT_IN;
493   THROW0(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
494       "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
495   if (gras_if_RL())
496     chunk = xbt_malloc0(msg_size);
497
498   xbt_assert0(peer->meas,
499               "Asked to send measurement data on a regular socket");
500   xbt_assert0(peer->outgoing,
501               "Socket not suited for data send (was created with gras_socket_server(), not gras_socket_client())");
502
503   for (sent_sofar = 0; sent_sofar < msg_amount; sent_sofar++) {
504     XBT_CDEBUG(gras_trp_meas,
505             "Sent %lu msgs of %lu (size of each: %ld) to %s:%d",
506             sent_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
507             gras_socket_peer_port(peer));
508     (*peer->plugin->raw_send) (peer, chunk, msg_size);
509   }
510   XBT_CDEBUG(gras_trp_meas,
511           "Sent %lu msgs of %lu (size of each: %ld) to %s:%d", sent_sofar,
512           msg_amount, msg_size, gras_socket_peer_name(peer),
513           gras_socket_peer_port(peer));
514
515   if (gras_if_RL())
516     free(chunk);
517
518   XBT_OUT;
519 }
520
521 /** \brief Receive a chunk of data over a measurement socket
522  *
523  * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
524  * each side of the socket should be paired.
525  *
526  * @warning: in SimGrid version 3.1 and previous, the numerical arguments
527  *           were the total amount of data to send and the msg_size. This
528  *           was changed for the fool wanting to send more than MAXINT
529  *           bytes in a fat pipe.
530  */
531 void gras_socket_meas_recv(gras_socket_t peer,
532                            unsigned int timeout,
533                            unsigned long int msg_size,
534                            unsigned long int msg_amount)
535 {
536
537   char *chunk = NULL;
538   unsigned long int got_sofar;
539
540   XBT_IN;
541   THROW0(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
542       "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
543
544   if (gras_if_RL())
545     chunk = xbt_malloc(msg_size);
546
547   xbt_assert0(peer->meas,
548               "Asked to receive measurement data on a regular socket");
549   xbt_assert0(peer->incoming, "Socket not suited for data receive");
550
551   for (got_sofar = 0; got_sofar < msg_amount; got_sofar++) {
552     XBT_CDEBUG(gras_trp_meas,
553             "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
554             got_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
555             gras_socket_peer_port(peer));
556     (peer->plugin->raw_recv) (peer, chunk, msg_size);
557   }
558   XBT_CDEBUG(gras_trp_meas,
559           "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
560           got_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
561           gras_socket_peer_port(peer));
562
563   if (gras_if_RL())
564     free(chunk);
565   XBT_OUT;
566 }
567
568 /**
569  * \brief Something similar to the good old accept system call.
570  *
571  * Make sure that there is someone speaking to the provided server socket.
572  * In RL, it does an accept(2) and return the result as last argument.
573  * In SG, as accepts are useless, it returns the provided argument as result.
574  * You should thus test whether (peer != accepted) before closing both of them.
575  *
576  * You should only call this on measurement sockets. It is automatically
577  * done for regular sockets, but you usually want more control about
578  * what's going on with measurement sockets.
579  */
580 gras_socket_t gras_socket_meas_accept(gras_socket_t peer)
581 {
582   gras_socket_t res;
583   THROW0(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
584       "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
585
586   xbt_assert0(peer->meas,
587               "No need to accept on non-measurement sockets (it's automatic)");
588
589   if (!peer->accepting) {
590     /* nothing to accept here (must be in SG) */
591     /* BUG: FIXME: this is BAD! it makes tricky to free the accepted socket */
592     return peer;
593   }
594
595   res = (peer->plugin->socket_accept) (peer);
596   res->meas = peer->meas;
597   XBT_CDEBUG(gras_trp_meas, "meas_accepted onto %d", res->sd);
598
599   return res;
600 }
601
602
603 /*
604  * Creating procdata for this module
605  */
606 static void *gras_trp_procdata_new(void)
607 {
608   gras_trp_procdata_t res = xbt_new(s_gras_trp_procdata_t, 1);
609
610   res->name = xbt_strdup("gras_trp");
611   res->name_len = 0;
612   res->sockets = xbt_dynar_new_sync(sizeof(gras_socket_t *), NULL);
613   res->myport = 0;
614
615   return (void *) res;
616 }
617
618 /*
619  * Freeing procdata for this module
620  */
621 static void gras_trp_procdata_free(void *data)
622 {
623   gras_trp_procdata_t res = (gras_trp_procdata_t) data;
624
625   xbt_dynar_free(&(res->sockets));
626   free(res->name);
627   free(res);
628 }
629
630 void gras_trp_socketset_dump(const char *name)
631 {
632   gras_trp_procdata_t procdata =
633       (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
634
635   unsigned int it;
636   gras_socket_t s;
637
638   XBT_INFO("** Dump the socket set %s", name);
639   xbt_dynar_foreach(procdata->sockets, it, s) {
640     XBT_INFO("  %p -> %s:%d %s",
641           s, gras_socket_peer_name(s), gras_socket_peer_port(s),
642           s->valid ? "(valid)" : "(peer dead)");
643   }
644   XBT_INFO("** End of socket set %s", name);
645 }
646
647 /*
648  * Module registration
649  */
650 int gras_trp_libdata_id;
651 void gras_trp_register()
652 {
653   gras_trp_libdata_id =
654       gras_procdata_add("gras_trp", gras_trp_procdata_new,
655                         gras_trp_procdata_free);
656 }
657
658 int gras_os_myport(void)
659 {
660   return ((gras_trp_procdata_t)
661           gras_libdata_by_id(gras_trp_libdata_id))->myport;
662 }