Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Use CATCH_ANONYMOUS whenever possible, and remove unused variables.
[simgrid.git] / src / gras / Transport / transport.c
1 /* transport - low level communication                                      */
2
3 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 /***
10  *** Options
11  ***/
12 int gras_opt_trp_nomoredata_on_close = 0;
13
14 #include "xbt/ex.h"
15 #include "xbt/peer.h"
16 #include "portable.h"
17 #include "gras/Transport/transport_private.h"
18 #include "gras/Msg/msg_interface.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp, gras,
21                                 "Conveying bytes over the network");
22 XBT_LOG_NEW_SUBCATEGORY(gras_trp_meas, gras_trp,
23                         "Conveying bytes over the network without formating for perf measurements");
24 static short int _gras_trp_started = 0;
25
26 static xbt_dict_t _gras_trp_plugins;    /* All registered plugins */
27 static void gras_trp_plugin_free(void *p);      /* free one of the plugins */
28
29 static void gras_trp_plugin_new(const char *name, gras_trp_setup_t setup)
30 {
31   xbt_ex_t e;
32
33   gras_trp_plugin_t plug = xbt_new0(s_gras_trp_plugin_t, 1);
34
35   XBT_DEBUG("Create plugin %s", name);
36
37   plug->name = xbt_strdup(name);
38
39   TRY {
40     setup(plug);
41   } CATCH(e) {
42     if (e.category == mismatch_error) {
43       /* SG plugin raise mismatch when in RL mode (and vice versa) */
44       free(plug->name);
45       free(plug);
46       plug = NULL;
47       xbt_ex_free(e);
48     } else {
49       RETHROW;
50     }
51   }
52
53   if (plug)
54     xbt_dict_set(_gras_trp_plugins, name, plug, gras_trp_plugin_free);
55 }
56
57 void gras_trp_init(void)
58 {
59   if (!_gras_trp_started) {
60     /* make room for all plugins */
61     _gras_trp_plugins = xbt_dict_new();
62
63 #ifdef HAVE_WINSOCK2_H
64     /* initialize the windows mechanism */
65     {
66       WORD wVersionRequested;
67       WSADATA wsaData;
68
69       wVersionRequested = MAKEWORD(2, 0);
70       int res;
71       res = WSAStartup(wVersionRequested, &wsaData);
72       xbt_assert(res == 0, "Cannot find a usable WinSock DLL");
73
74       /* Confirm that the WinSock DLL supports 2.0. */
75       /* Note that if the DLL supports versions greater    */
76       /* than 2.0 in addition to 2.0, it will still return */
77       /* 2.0 in wVersion since that is the version we      */
78       /* requested.                                        */
79
80       xbt_assert(LOBYTE(wsaData.wVersion) == 2 &&
81                   HIBYTE(wsaData.wVersion) == 0,
82                   "Cannot find a usable WinSock DLL");
83       XBT_INFO("Found and initialized winsock2");
84     }                           /* The WinSock DLL is acceptable. Proceed. */
85 #elif HAVE_WINSOCK_H
86     {
87       WSADATA wsaData;
88       int res;
89       res = WSAStartup(0x0101, &wsaData);
90       xbt_assert(res == 0, "Cannot find a usable WinSock DLL");
91       XBT_INFO("Found and initialized winsock");
92     }
93 #endif
94
95     /* Add plugins */
96     gras_trp_plugin_new("file", gras_trp_file_setup);
97     gras_trp_plugin_new("sg", gras_trp_sg_setup);
98     gras_trp_plugin_new("tcp", gras_trp_tcp_setup);
99   }
100
101   _gras_trp_started++;
102 }
103
104 void gras_trp_exit(void)
105 {
106   XBT_DEBUG("gras_trp value %d", _gras_trp_started);
107   if (_gras_trp_started == 0) {
108     return;
109   }
110
111   if (--_gras_trp_started == 0) {
112 #ifdef HAVE_WINSOCK_H
113     if (WSACleanup() == SOCKET_ERROR) {
114       if (WSAGetLastError() == WSAEINPROGRESS) {
115         WSACancelBlockingCall();
116         WSACleanup();
117       }
118     }
119 #endif
120
121     /* Delete the plugins */
122     xbt_dict_free(&_gras_trp_plugins);
123   }
124 }
125
126
127 void gras_trp_plugin_free(void *p)
128 {
129   gras_trp_plugin_t plug = p;
130
131   if (plug) {
132     if (plug->exit) {
133       plug->exit(plug);
134     } else if (plug->data) {
135       XBT_DEBUG("Plugin %s lacks exit(). Free data anyway.", plug->name);
136       free(plug->data);
137     }
138
139     free(plug->name);
140     free(plug);
141   }
142 }
143
144
145 /**
146  * gras_trp_socket_new:
147  *
148  * Malloc a new socket, and initialize it with defaults
149  */
150 void gras_trp_socket_new(int incoming, gras_socket_t * dst)
151 {
152
153   gras_socket_t sock = xbt_new0(s_gras_socket_t, 1);
154
155   XBT_VERB("Create a new socket (%p)", (void *) sock);
156
157   sock->plugin = NULL;
158
159   sock->incoming = incoming ? 1 : 0;
160   sock->outgoing = incoming ? 0 : 1;
161   sock->accepting = incoming ? 1 : 0;
162   sock->meas = 0;
163   sock->recvd = 0;
164   sock->valid = 1;
165   sock->moredata = 0;
166
167   sock->refcount = 1;
168   sock->sd = -1;
169
170   sock->data = NULL;
171   sock->bufdata = NULL;
172
173   *dst = sock;
174
175   XBT_OUT();
176 }
177
178 /**
179  * @brief Opens a server socket and makes it ready to be listened to.
180  * @param port: port on which you want to listen
181  * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
182  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
183  *
184  * In real life, you'll get a TCP socket.
185  */
186 gras_socket_t
187 gras_socket_server_ext(unsigned short port,
188                        unsigned long int buf_size, int measurement)
189 {
190   gras_trp_plugin_t trp;
191   gras_socket_t sock;
192
193   XBT_DEBUG("Create a server socket from plugin %s on port %d",
194          gras_if_RL()? "tcp" : "sg", port);
195   trp = gras_trp_plugin_get_by_name(gras_if_SG()? "sg" : "tcp");
196
197   /* defaults settings */
198   gras_trp_socket_new(1, &sock);
199   sock->plugin = trp;
200   sock->buf_size = buf_size > 0 ? buf_size : 32 * 1024;
201   sock->meas = measurement;
202
203   /* Call plugin socket creation function */
204   XBT_DEBUG("Prepare socket with plugin (fct=%p)", trp->socket_server);
205   TRY {
206     trp->socket_server(trp, port, sock);
207     XBT_DEBUG("in=%c out=%c accept=%c",
208            sock->incoming ? 'y' : 'n',
209            sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
210   }
211   CATCH_ANONYMOUS {
212     free(sock);
213     RETHROW;
214   }
215
216   if (!measurement)
217     ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport
218         = port;
219   xbt_dynar_push(((gras_trp_procdata_t)
220                   gras_libdata_by_id(gras_trp_libdata_id))->sockets,
221                  &sock);
222
223   gras_msg_listener_awake();
224   return sock;
225 }
226
227 /**
228  * @brief Opens a server socket on any port in the given range
229  *
230  * @param minport: first port we will try
231  * @param maxport: last port we will try
232  * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
233  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
234  *
235  * If none of the provided ports works, raises the exception got when trying the last possibility
236  */
237 gras_socket_t
238 gras_socket_server_range(unsigned short minport, unsigned short maxport,
239                          unsigned long int buf_size, int measurement)
240 {
241
242   int port;
243   gras_socket_t res = NULL;
244   xbt_ex_t e;
245
246   for (port = minport; port < maxport; port++) {
247     TRY {
248       res = gras_socket_server_ext(port, buf_size, measurement);
249     }
250     CATCH(e) {
251       if (port == maxport)
252         RETHROW;
253       xbt_ex_free(e);
254     }
255     if (res)
256       return res;
257   }
258   THROW_IMPOSSIBLE;
259 }
260
261 /**
262  * @brief Opens a client socket to a remote host.
263  * @param host: who you want to connect to
264  * @param port: where you want to connect to on this host
265  * @param buf_size: size of the buffer (in bytes) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
266  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
267  *
268  * In real life, you'll get a TCP socket.
269  */
270 gras_socket_t
271 gras_socket_client_ext(const char *host,
272                        unsigned short port,
273                        unsigned long int buf_size, int measurement)
274 {
275   gras_trp_plugin_t trp;
276   gras_socket_t sock;
277
278   trp = gras_trp_plugin_get_by_name(gras_if_SG()? "sg" : "tcp");
279
280   XBT_DEBUG("Create a client socket from plugin %s",
281          gras_if_RL()? "tcp" : "sg");
282   /* defaults settings */
283   gras_trp_socket_new(0, &sock);
284   sock->plugin = trp;
285   sock->buf_size = buf_size > 0 ? buf_size : 32 * 1024;
286   sock->meas = measurement;
287
288   /* plugin-specific */
289   TRY {
290     (*trp->socket_client) (trp,host,port,sock);
291     XBT_DEBUG("in=%c out=%c accept=%c",
292            sock->incoming ? 'y' : 'n',
293            sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
294   }
295   CATCH_ANONYMOUS {
296     free(sock);
297     RETHROW;
298   }
299   xbt_dynar_push(((gras_trp_procdata_t)
300                   gras_libdata_by_id(gras_trp_libdata_id))->sockets,
301                  &sock);
302   gras_msg_listener_awake();
303   return sock;
304 }
305
306 /**
307  * @brief Opens a server socket and make it ready to be listened to.
308  *
309  * In real life, you'll get a TCP socket.
310  */
311 gras_socket_t gras_socket_server(unsigned short port)
312 {
313   return gras_socket_server_ext(port, 32 * 1024, 0);
314 }
315
316 /** @brief Opens a client socket to a remote host */
317 gras_socket_t gras_socket_client(const char *host, unsigned short port)
318 {
319   return gras_socket_client_ext(host, port, 0, 0);
320 }
321
322 /** @brief Opens a client socket to a remote host specified as '\a host:\a port' */
323 gras_socket_t gras_socket_client_from_string(const char *host)
324 {
325   xbt_peer_t p = xbt_peer_from_string(host);
326   gras_socket_t res = gras_socket_client_ext(p->name, p->port, 0, 0);
327   xbt_peer_free(p);
328   return res;
329 }
330
331 void gras_socket_close_voidp(void *sock)
332 {
333   gras_socket_close((gras_socket_t) sock);
334 }
335
336 /** \brief Close socket */
337 void gras_socket_close(gras_socket_t sock)
338 {
339   if (--sock->refcount)
340     return;
341
342   xbt_dynar_t sockets =
343       ((gras_trp_procdata_t)
344        gras_libdata_by_id(gras_trp_libdata_id))->sockets;
345   gras_socket_t sock_iter = NULL;
346   unsigned int cursor;
347
348   XBT_IN("");
349   XBT_VERB("Close %p", sock);
350   if (sock == _gras_lastly_selected_socket) {
351     xbt_assert(!gras_opt_trp_nomoredata_on_close || !sock->moredata,
352                 "Closing a socket having more data in buffer while the nomoredata_on_close option is activated");
353
354     if (sock->moredata)
355       XBT_CRITICAL
356           ("Closing a socket having more data in buffer. Option nomoredata_on_close disabled, so continuing.");
357     _gras_lastly_selected_socket = NULL;
358   }
359
360   /* FIXME: Issue an event when the socket is closed */
361   XBT_DEBUG("sockets pointer before %p", sockets);
362   if (sock) {
363     /* FIXME: Cannot get the dynar mutex, because it can be already locked */
364 //              _xbt_dynar_foreach(sockets,cursor,sock_iter) {
365     for (cursor = 0; cursor < xbt_dynar_length(sockets); cursor++) {
366       _xbt_dynar_cursor_get(sockets, cursor, &sock_iter);
367       if (sock == sock_iter) {
368         XBT_DEBUG("remove sock cursor %d dize %lu\n", cursor,
369                xbt_dynar_length(sockets));
370         xbt_dynar_cursor_rm(sockets, &cursor);
371         if (sock->plugin->socket_close)
372           (*sock->plugin->socket_close) (sock);
373
374         /* free the memory */
375         free(sock);
376         XBT_OUT();
377         return;
378       }
379     }
380     XBT_WARN
381         ("Ignoring request to free an unknown socket (%p). Execution stack:",
382          sock);
383     xbt_backtrace_display_current();
384   }
385   XBT_OUT();
386 }
387
388 /**
389  * gras_trp_send:
390  *
391  * Send a bunch of bytes from on socket
392  * (stable if we know the storage will keep as is until the next trp_flush)
393  */
394 void gras_trp_send(gras_socket_t sd, char *data, long int size, int stable)
395 {
396   xbt_assert(sd->outgoing, "Socket not suited for data send");
397   (*sd->plugin->send) (sd, data, size, stable);
398 }
399
400 /**
401  * gras_trp_chunk_recv:
402  *
403  * Receive a bunch of bytes from a socket
404  */
405 void gras_trp_recv(gras_socket_t sd, char *data, long int size)
406 {
407   xbt_assert(sd->incoming, "Socket not suited for data receive");
408   (sd->plugin->recv) (sd, data, size);
409 }
410
411 /**
412  * gras_trp_flush:
413  *
414  * Make sure all pending communications are done
415  */
416 void gras_trp_flush(gras_socket_t sd)
417 {
418   if (sd->plugin->flush)
419     (sd->plugin->flush) (sd);
420 }
421
422 gras_trp_plugin_t gras_trp_plugin_get_by_name(const char *name)
423 {
424   return xbt_dict_get(_gras_trp_plugins, name);
425 }
426
427 int gras_socket_my_port(gras_socket_t sock)
428 {
429   if (!sock->plugin->my_port)
430     THROWF(unknown_error,0,"Function my_port unimplemented in plugin %s",sock->plugin->name);
431   return (*sock->plugin->my_port)(sock);
432
433 }
434
435 int gras_socket_peer_port(gras_socket_t sock)
436 {
437   if (!sock->plugin->peer_port)
438     THROWF(unknown_error,0,"Function peer_port unimplemented in plugin %s",sock->plugin->name);
439   return (*sock->plugin->peer_port)(sock);
440 }
441
442 const char *gras_socket_peer_name(gras_socket_t sock)
443 {
444   xbt_assert(sock->plugin);
445   return (*sock->plugin->peer_name)(sock);
446 }
447
448 const char *gras_socket_peer_proc(gras_socket_t sock)
449 {
450   return (*sock->plugin->peer_proc)(sock);
451 }
452
453 void gras_socket_peer_proc_set(gras_socket_t sock, char *peer_proc)
454 {
455   return (*sock->plugin->peer_proc_set)(sock,peer_proc);
456 }
457
458 /** \brief Check if the provided socket is a measurement one (or a regular one) */
459 int gras_socket_is_meas(gras_socket_t sock)
460 {
461   return sock->meas;
462 }
463
464 /** \brief Send a chunk of (random) data over a measurement socket
465  *
466  * @param peer measurement socket to use for the experiment
467  * @param timeout timeout (in seconds)
468  * @param msg_size size of each chunk sent over the socket (in bytes).
469  * @param msg_amount how many of these packets you want to send.
470  *
471  * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
472  * each side of the socket should be paired.
473  *
474  * The exchanged data is zeroed to make sure it's initialized, but
475  * there is no way to control what is sent (ie, you cannot use these
476  * functions to exchange data out of band).
477  *
478  * @warning: in SimGrid version 3.1 and previous, the numerical arguments
479  *           were the total amount of data to send and the msg_size. This
480  *           was changed for the fool wanting to send more than MAXINT
481  *           bytes in a fat pipe.
482  */
483 void gras_socket_meas_send(gras_socket_t peer,
484                            unsigned int timeout,
485                            unsigned long int msg_size,
486                            unsigned long int msg_amount)
487 {
488   char *chunk = NULL;
489   unsigned long int sent_sofar;
490
491   XBT_IN("");
492   THROWF(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
493       "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
494   if (gras_if_RL())
495     chunk = xbt_malloc0(msg_size);
496
497   xbt_assert(peer->meas,
498               "Asked to send measurement data on a regular socket");
499   xbt_assert(peer->outgoing,
500               "Socket not suited for data send (was created with gras_socket_server(), not gras_socket_client())");
501
502   for (sent_sofar = 0; sent_sofar < msg_amount; sent_sofar++) {
503     XBT_CDEBUG(gras_trp_meas,
504             "Sent %lu msgs of %lu (size of each: %ld) to %s:%d",
505             sent_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
506             gras_socket_peer_port(peer));
507     (*peer->plugin->raw_send) (peer, chunk, msg_size);
508   }
509   XBT_CDEBUG(gras_trp_meas,
510           "Sent %lu msgs of %lu (size of each: %ld) to %s:%d", sent_sofar,
511           msg_amount, msg_size, gras_socket_peer_name(peer),
512           gras_socket_peer_port(peer));
513
514   if (gras_if_RL())
515     free(chunk);
516
517   XBT_OUT();
518 }
519
520 /** \brief Receive a chunk of data over a measurement socket
521  *
522  * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
523  * each side of the socket should be paired.
524  *
525  * @warning: in SimGrid version 3.1 and previous, the numerical arguments
526  *           were the total amount of data to send and the msg_size. This
527  *           was changed for the fool wanting to send more than MAXINT
528  *           bytes in a fat pipe.
529  */
530 void gras_socket_meas_recv(gras_socket_t peer,
531                            unsigned int timeout,
532                            unsigned long int msg_size,
533                            unsigned long int msg_amount)
534 {
535
536   char *chunk = NULL;
537   unsigned long int got_sofar;
538
539   XBT_IN("");
540   THROWF(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
541       "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
542
543   if (gras_if_RL())
544     chunk = xbt_malloc(msg_size);
545
546   xbt_assert(peer->meas,
547               "Asked to receive measurement data on a regular socket");
548   xbt_assert(peer->incoming, "Socket not suited for data receive");
549
550   for (got_sofar = 0; got_sofar < msg_amount; got_sofar++) {
551     XBT_CDEBUG(gras_trp_meas,
552             "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
553             got_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
554             gras_socket_peer_port(peer));
555     (peer->plugin->raw_recv) (peer, chunk, msg_size);
556   }
557   XBT_CDEBUG(gras_trp_meas,
558           "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
559           got_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
560           gras_socket_peer_port(peer));
561
562   if (gras_if_RL())
563     free(chunk);
564   XBT_OUT();
565 }
566
567 /**
568  * \brief Something similar to the good old accept system call.
569  *
570  * Make sure that there is someone speaking to the provided server socket.
571  * In RL, it does an accept(2) and return the result as last argument.
572  * In SG, as accepts are useless, it returns the provided argument as result.
573  * You should thus test whether (peer != accepted) before closing both of them.
574  *
575  * You should only call this on measurement sockets. It is automatically
576  * done for regular sockets, but you usually want more control about
577  * what's going on with measurement sockets.
578  */
579 gras_socket_t gras_socket_meas_accept(gras_socket_t peer)
580 {
581   gras_socket_t res;
582   THROWF(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
583       "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
584
585   xbt_assert(peer->meas,
586               "No need to accept on non-measurement sockets (it's automatic)");
587
588   if (!peer->accepting) {
589     /* nothing to accept here (must be in SG) */
590     /* BUG: FIXME: this is BAD! it makes tricky to free the accepted socket */
591     return peer;
592   }
593
594   res = (peer->plugin->socket_accept) (peer);
595   res->meas = peer->meas;
596   XBT_CDEBUG(gras_trp_meas, "meas_accepted onto %d", res->sd);
597
598   return res;
599 }
600
601
602 /*
603  * Creating procdata for this module
604  */
605 static void *gras_trp_procdata_new(void)
606 {
607   gras_trp_procdata_t res = xbt_new(s_gras_trp_procdata_t, 1);
608
609   res->name = xbt_strdup("gras_trp");
610   res->name_len = 0;
611   res->sockets = xbt_dynar_new_sync(sizeof(gras_socket_t *), NULL);
612   res->myport = 0;
613
614   return (void *) res;
615 }
616
617 /*
618  * Freeing procdata for this module
619  */
620 static void gras_trp_procdata_free(void *data)
621 {
622   gras_trp_procdata_t res = (gras_trp_procdata_t) data;
623
624   xbt_dynar_free(&(res->sockets));
625   free(res->name);
626   free(res);
627 }
628
629 void gras_trp_socketset_dump(const char *name)
630 {
631   gras_trp_procdata_t procdata =
632       (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
633
634   unsigned int it;
635   gras_socket_t s;
636
637   XBT_INFO("** Dump the socket set %s", name);
638   xbt_dynar_foreach(procdata->sockets, it, s) {
639     XBT_INFO("  %p -> %s:%d %s",
640           s, gras_socket_peer_name(s), gras_socket_peer_port(s),
641           s->valid ? "(valid)" : "(peer dead)");
642   }
643   XBT_INFO("** End of socket set %s", name);
644 }
645
646 /*
647  * Module registration
648  */
649 int gras_trp_libdata_id;
650 void gras_trp_register()
651 {
652   gras_trp_libdata_id =
653       gras_procdata_add("gras_trp", gras_trp_procdata_new,
654                         gras_trp_procdata_free);
655 }
656
657 int gras_os_myport(void)
658 {
659   return ((gras_trp_procdata_t)
660           gras_libdata_by_id(gras_trp_libdata_id))->myport;
661 }