Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
model-checker : hash of regions in snapshot stored instead of all the data
[simgrid.git] / src / gras / Transport / transport.c
1 /* transport - low level communication                                      */
2
3 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 /***
10  *** Options
11  ***/
12 int gras_opt_trp_nomoredata_on_close = 0;
13
14 #include "xbt/ex.h"
15 #include "xbt/peer.h"
16 #include "portable.h"
17 #include "gras/Transport/transport_private.h"
18 #include "gras/Msg/msg_interface.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp, gras,
21                                 "Conveying bytes over the network");
22 XBT_LOG_NEW_SUBCATEGORY(gras_trp_meas, gras_trp,
23                         "Conveying bytes over the network without formating for perf measurements");
24 static short int _gras_trp_started = 0;
25
26 static xbt_dict_t _gras_trp_plugins;    /* All registered plugins */
27 static void gras_trp_plugin_free(void *p);      /* free one of the plugins */
28
29 static void gras_trp_plugin_new(const char *name, gras_trp_setup_t setup)
30 {
31   xbt_ex_t e;
32
33   gras_trp_plugin_t plug = xbt_new0(s_gras_trp_plugin_t, 1);
34
35   XBT_DEBUG("Create plugin %s", name);
36
37   plug->name = xbt_strdup(name);
38
39   TRY {
40     setup(plug);
41   }
42   CATCH(e) {
43     if (e.category == mismatch_error) {
44       /* SG plugin raise mismatch when in RL mode (and vice versa) */
45       free(plug->name);
46       free(plug);
47       plug = NULL;
48       xbt_ex_free(e);
49     } else {
50       RETHROW;
51     }
52   }
53
54   if (plug)
55     xbt_dict_set(_gras_trp_plugins, name, plug, gras_trp_plugin_free);
56 }
57
58 void gras_trp_init(void)
59 {
60   if (!_gras_trp_started) {
61     /* make room for all plugins */
62     _gras_trp_plugins = xbt_dict_new();
63
64 #ifdef HAVE_WINSOCK2_H
65     /* initialize the windows mechanism */
66     {
67       WORD wVersionRequested;
68       WSADATA wsaData;
69
70       wVersionRequested = MAKEWORD(2, 0);
71       int res;
72       res = WSAStartup(wVersionRequested, &wsaData);
73       xbt_assert(res == 0, "Cannot find a usable WinSock DLL");
74
75       /* Confirm that the WinSock DLL supports 2.0. */
76       /* Note that if the DLL supports versions greater    */
77       /* than 2.0 in addition to 2.0, it will still return */
78       /* 2.0 in wVersion since that is the version we      */
79       /* requested.                                        */
80
81       xbt_assert(LOBYTE(wsaData.wVersion) == 2 &&
82                   HIBYTE(wsaData.wVersion) == 0,
83                   "Cannot find a usable WinSock DLL");
84       XBT_INFO("Found and initialized winsock2");
85     }                           /* The WinSock DLL is acceptable. Proceed. */
86 #elif HAVE_WINSOCK_H
87     {
88       WSADATA wsaData;
89       int res;
90       res = WSAStartup(0x0101, &wsaData);
91       xbt_assert(res == 0, "Cannot find a usable WinSock DLL");
92       XBT_INFO("Found and initialized winsock");
93     }
94 #endif
95
96     /* Add plugins */
97     gras_trp_plugin_new("file", gras_trp_file_setup);
98     gras_trp_plugin_new("sg", gras_trp_sg_setup);
99     gras_trp_plugin_new("tcp", gras_trp_tcp_setup);
100   }
101
102   _gras_trp_started++;
103 }
104
105 void gras_trp_exit(void)
106 {
107   XBT_DEBUG("gras_trp value %d", _gras_trp_started);
108   if (_gras_trp_started == 0) {
109     return;
110   }
111
112   if (--_gras_trp_started == 0) {
113 #ifdef HAVE_WINSOCK_H
114     if (WSACleanup() == SOCKET_ERROR) {
115       if (WSAGetLastError() == WSAEINPROGRESS) {
116         WSACancelBlockingCall();
117         WSACleanup();
118       }
119     }
120 #endif
121
122     /* Delete the plugins */
123     xbt_dict_free(&_gras_trp_plugins);
124   }
125 }
126
127
128 void gras_trp_plugin_free(void *p)
129 {
130   gras_trp_plugin_t plug = p;
131
132   if (plug) {
133     if (plug->exit) {
134       plug->exit(plug);
135     } else if (plug->data) {
136       XBT_DEBUG("Plugin %s lacks exit(). Free data anyway.", plug->name);
137       free(plug->data);
138     }
139
140     free(plug->name);
141     free(plug);
142   }
143 }
144
145
146 /**
147  * gras_trp_socket_new:
148  *
149  * Malloc a new socket, and initialize it with defaults
150  */
151 void gras_trp_socket_new(int incoming, gras_socket_t * dst)
152 {
153
154   gras_socket_t sock = xbt_new0(s_gras_socket_t, 1);
155
156   XBT_VERB("Create a new socket (%p)", (void *) sock);
157
158   sock->plugin = NULL;
159
160   sock->incoming = incoming ? 1 : 0;
161   sock->outgoing = incoming ? 0 : 1;
162   sock->accepting = incoming ? 1 : 0;
163   sock->meas = 0;
164   sock->recvd = 0;
165   sock->valid = 1;
166   sock->moredata = 0;
167
168   sock->refcount = 1;
169   sock->sd = -1;
170
171   sock->data = NULL;
172   sock->bufdata = NULL;
173
174   *dst = sock;
175
176   XBT_OUT();
177 }
178
179 /**
180  * @brief Opens a server socket and makes it ready to be listened to.
181  * @param port: port on which you want to listen
182  * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
183  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
184  *
185  * In real life, you'll get a TCP socket.
186  */
187 gras_socket_t
188 gras_socket_server_ext(unsigned short port,
189                        unsigned long int buf_size, int measurement)
190 {
191   gras_trp_plugin_t trp;
192   gras_socket_t sock;
193
194   XBT_DEBUG("Create a server socket from plugin %s on port %d",
195          gras_if_RL()? "tcp" : "sg", port);
196   trp = gras_trp_plugin_get_by_name(gras_if_SG()? "sg" : "tcp");
197
198   /* defaults settings */
199   gras_trp_socket_new(1, &sock);
200   sock->plugin = trp;
201   sock->buf_size = buf_size > 0 ? buf_size : 32 * 1024;
202   sock->meas = measurement;
203
204   /* Call plugin socket creation function */
205   XBT_DEBUG("Prepare socket with plugin (fct=%p)", trp->socket_server);
206   TRY {
207     trp->socket_server(trp, port, sock);
208     XBT_DEBUG("in=%c out=%c accept=%c",
209            sock->incoming ? 'y' : 'n',
210            sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
211   }
212   CATCH_ANONYMOUS {
213     free(sock);
214     RETHROW;
215   }
216
217   if (!measurement)
218     ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport
219         = port;
220   xbt_dynar_push(((gras_trp_procdata_t)
221                   gras_libdata_by_id(gras_trp_libdata_id))->sockets,
222                  &sock);
223
224   gras_msg_listener_awake();
225   return sock;
226 }
227
228 /**
229  * @brief Opens a server socket on any port in the given range
230  *
231  * @param minport: first port we will try
232  * @param maxport: last port we will try
233  * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
234  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
235  *
236  * If none of the provided ports works, raises the exception got when trying the last possibility
237  */
238 gras_socket_t
239 gras_socket_server_range(unsigned short minport, unsigned short maxport,
240                          unsigned long int buf_size, int measurement)
241 {
242
243   int port;
244   gras_socket_t res = NULL;
245   xbt_ex_t e;
246
247   for (port = minport; port < maxport; port++) {
248     TRY {
249       res = gras_socket_server_ext(port, buf_size, measurement);
250     }
251     CATCH(e) {
252       if (port == maxport)
253         RETHROW;
254       xbt_ex_free(e);
255     }
256     if (res)
257       return res;
258   }
259   THROW_IMPOSSIBLE;
260 }
261
262 /**
263  * @brief Opens a client socket to a remote host.
264  * @param host: who you want to connect to
265  * @param port: where you want to connect to on this host
266  * @param buf_size: size of the buffer (in bytes) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
267  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
268  *
269  * In real life, you'll get a TCP socket.
270  */
271 gras_socket_t
272 gras_socket_client_ext(const char *host,
273                        unsigned short port,
274                        unsigned long int buf_size, int measurement)
275 {
276   gras_trp_plugin_t trp;
277   gras_socket_t sock;
278
279   trp = gras_trp_plugin_get_by_name(gras_if_SG()? "sg" : "tcp");
280
281   XBT_DEBUG("Create a client socket from plugin %s",
282          gras_if_RL()? "tcp" : "sg");
283   /* defaults settings */
284   gras_trp_socket_new(0, &sock);
285   sock->plugin = trp;
286   sock->buf_size = buf_size > 0 ? buf_size : 32 * 1024;
287   sock->meas = measurement;
288
289   /* plugin-specific */
290   TRY {
291     (*trp->socket_client) (trp,host,port,sock);
292     XBT_DEBUG("in=%c out=%c accept=%c",
293            sock->incoming ? 'y' : 'n',
294            sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
295   }
296   CATCH_ANONYMOUS {
297     free(sock);
298     RETHROW;
299   }
300   xbt_dynar_push(((gras_trp_procdata_t)
301                   gras_libdata_by_id(gras_trp_libdata_id))->sockets,
302                  &sock);
303   gras_msg_listener_awake();
304   return sock;
305 }
306
307 /**
308  * @brief Opens a server socket and make it ready to be listened to.
309  *
310  * In real life, you'll get a TCP socket.
311  */
312 gras_socket_t gras_socket_server(unsigned short port)
313 {
314   return gras_socket_server_ext(port, 32 * 1024, 0);
315 }
316
317 /** @brief Opens a client socket to a remote host */
318 gras_socket_t gras_socket_client(const char *host, unsigned short port)
319 {
320   return gras_socket_client_ext(host, port, 0, 0);
321 }
322
323 /** @brief Opens a client socket to a remote host specified as '\a host:\a port' */
324 gras_socket_t gras_socket_client_from_string(const char *host)
325 {
326   xbt_peer_t p = xbt_peer_from_string(host);
327   gras_socket_t res = gras_socket_client_ext(p->name, p->port, 0, 0);
328   xbt_peer_free(p);
329   return res;
330 }
331
332 void gras_socket_close_voidp(void *sock)
333 {
334   gras_socket_close((gras_socket_t) sock);
335 }
336
337 /** \brief Close socket */
338 void gras_socket_close(gras_socket_t sock)
339 {
340   if (--sock->refcount)
341     return;
342
343   xbt_dynar_t sockets =
344       ((gras_trp_procdata_t)
345        gras_libdata_by_id(gras_trp_libdata_id))->sockets;
346   gras_socket_t sock_iter = NULL;
347   unsigned int cursor;
348
349   XBT_IN("");
350   XBT_VERB("Close %p", sock);
351   if (sock == _gras_lastly_selected_socket) {
352     xbt_assert(!gras_opt_trp_nomoredata_on_close || !sock->moredata,
353                 "Closing a socket having more data in buffer while the nomoredata_on_close option is activated");
354
355     if (sock->moredata)
356       XBT_CRITICAL
357           ("Closing a socket having more data in buffer. Option nomoredata_on_close disabled, so continuing.");
358     _gras_lastly_selected_socket = NULL;
359   }
360
361   /* FIXME: Issue an event when the socket is closed */
362   XBT_DEBUG("sockets pointer before %p", sockets);
363   if (sock) {
364     /* FIXME: Cannot get the dynar mutex, because it can be already locked */
365 //              _xbt_dynar_foreach(sockets,cursor,sock_iter) {
366     for (cursor = 0; cursor < xbt_dynar_length(sockets); cursor++) {
367       _xbt_dynar_cursor_get(sockets, cursor, &sock_iter);
368       if (sock == sock_iter) {
369         XBT_DEBUG("remove sock cursor %d dize %lu\n", cursor,
370                xbt_dynar_length(sockets));
371         xbt_dynar_cursor_rm(sockets, &cursor);
372         if (sock->plugin->socket_close)
373           (*sock->plugin->socket_close) (sock);
374
375         /* free the memory */
376         free(sock);
377         XBT_OUT();
378         return;
379       }
380     }
381     XBT_WARN
382         ("Ignoring request to free an unknown socket (%p). Execution stack:",
383          sock);
384     xbt_backtrace_display_current();
385   }
386   XBT_OUT();
387 }
388
389 /**
390  * gras_trp_send:
391  *
392  * Send a bunch of bytes from on socket
393  * (stable if we know the storage will keep as is until the next trp_flush)
394  */
395 void gras_trp_send(gras_socket_t sd, char *data, long int size, int stable)
396 {
397   xbt_assert(sd->outgoing, "Socket not suited for data send");
398   sd->plugin->send(sd, data, size, stable);
399 }
400
401 /**
402  * gras_trp_chunk_recv:
403  *
404  * Receive a bunch of bytes from a socket
405  */
406 void gras_trp_recv(gras_socket_t sd, char *data, long int size)
407 {
408   xbt_assert(sd->incoming, "Socket not suited for data receive");
409   (sd->plugin->recv) (sd, data, size);
410 }
411
412 /**
413  * gras_trp_flush:
414  *
415  * Make sure all pending communications are done
416  */
417 void gras_trp_flush(gras_socket_t sd)
418 {
419   if (sd->plugin->flush)
420     (sd->plugin->flush) (sd);
421 }
422
423 gras_trp_plugin_t gras_trp_plugin_get_by_name(const char *name)
424 {
425   return xbt_dict_get(_gras_trp_plugins, name);
426 }
427
428 int gras_socket_my_port(gras_socket_t sock)
429 {
430   if (!sock->plugin->my_port)
431     THROWF(unknown_error,0,"Function my_port unimplemented in plugin %s",sock->plugin->name);
432   return sock->plugin->my_port(sock);
433
434 }
435
436 int gras_socket_peer_port(gras_socket_t sock)
437 {
438   if (!sock->plugin->peer_port)
439     THROWF(unknown_error,0,"Function peer_port unimplemented in plugin %s",sock->plugin->name);
440   return sock->plugin->peer_port(sock);
441 }
442
443 const char *gras_socket_peer_name(gras_socket_t sock)
444 {
445   xbt_assert(sock->plugin);
446   return sock->plugin->peer_name(sock);
447 }
448
449 const char *gras_socket_peer_proc(gras_socket_t sock)
450 {
451   return sock->plugin->peer_proc(sock);
452 }
453
454 void gras_socket_peer_proc_set(gras_socket_t sock, char *peer_proc)
455 {
456   return sock->plugin->peer_proc_set(sock,peer_proc);
457 }
458
459 /** \brief Check if the provided socket is a measurement one (or a regular one) */
460 int gras_socket_is_meas(gras_socket_t sock)
461 {
462   return sock->meas;
463 }
464
465 /** \brief Send a chunk of (random) data over a measurement socket
466  *
467  * @param peer measurement socket to use for the experiment
468  * @param timeout timeout (in seconds)
469  * @param msg_size size of each chunk sent over the socket (in bytes).
470  * @param msg_amount how many of these packets you want to send.
471  *
472  * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
473  * each side of the socket should be paired.
474  *
475  * The exchanged data is zeroed to make sure it's initialized, but
476  * there is no way to control what is sent (ie, you cannot use these
477  * functions to exchange data out of band).
478  *
479  * @warning: in SimGrid version 3.1 and previous, the numerical arguments
480  *           were the total amount of data to send and the msg_size. This
481  *           was changed for the fool wanting to send more than MAXINT
482  *           bytes in a fat pipe.
483  */
484 void gras_socket_meas_send(gras_socket_t peer,
485                            unsigned int timeout,
486                            unsigned long int msg_size,
487                            unsigned long int msg_amount)
488 {
489   char *chunk = NULL;
490   unsigned long int sent_sofar;
491
492   XBT_IN("");
493   THROWF(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
494       "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
495   if (gras_if_RL())
496     chunk = xbt_malloc0(msg_size);
497
498   xbt_assert(peer->meas,
499               "Asked to send measurement data on a regular socket");
500   xbt_assert(peer->outgoing,
501               "Socket not suited for data send (was created with gras_socket_server(), not gras_socket_client())");
502
503   for (sent_sofar = 0; sent_sofar < msg_amount; sent_sofar++) {
504     XBT_CDEBUG(gras_trp_meas,
505             "Sent %lu msgs of %lu (size of each: %ld) to %s:%d",
506             sent_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
507             gras_socket_peer_port(peer));
508     peer->plugin->raw_send(peer, chunk, msg_size);
509   }
510   XBT_CDEBUG(gras_trp_meas,
511           "Sent %lu msgs of %lu (size of each: %ld) to %s:%d", sent_sofar,
512           msg_amount, msg_size, gras_socket_peer_name(peer),
513           gras_socket_peer_port(peer));
514
515   if (gras_if_RL())
516     free(chunk);
517
518   XBT_OUT();
519 }
520
521 /** \brief Receive a chunk of data over a measurement socket
522  *
523  * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
524  * each side of the socket should be paired.
525  *
526  * @warning: in SimGrid version 3.1 and previous, the numerical arguments
527  *           were the total amount of data to send and the msg_size. This
528  *           was changed for the fool wanting to send more than MAXINT
529  *           bytes in a fat pipe.
530  */
531 void gras_socket_meas_recv(gras_socket_t peer,
532                            unsigned int timeout,
533                            unsigned long int msg_size,
534                            unsigned long int msg_amount)
535 {
536
537   char *chunk = NULL;
538   unsigned long int got_sofar;
539
540   XBT_IN("");
541   THROWF(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
542       "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
543
544   if (gras_if_RL())
545     chunk = xbt_malloc(msg_size);
546
547   xbt_assert(peer->meas,
548               "Asked to receive measurement data on a regular socket");
549   xbt_assert(peer->incoming, "Socket not suited for data receive");
550
551   for (got_sofar = 0; got_sofar < msg_amount; got_sofar++) {
552     XBT_CDEBUG(gras_trp_meas,
553             "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
554             got_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
555             gras_socket_peer_port(peer));
556     (peer->plugin->raw_recv) (peer, chunk, msg_size);
557   }
558   XBT_CDEBUG(gras_trp_meas,
559           "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
560           got_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
561           gras_socket_peer_port(peer));
562
563   if (gras_if_RL())
564     free(chunk);
565   XBT_OUT();
566 }
567
568 /**
569  * \brief Something similar to the good old accept system call.
570  *
571  * Make sure that there is someone speaking to the provided server socket.
572  * In RL, it does an accept(2) and return the result as last argument.
573  * In SG, as accepts are useless, it returns the provided argument as result.
574  * You should thus test whether (peer != accepted) before closing both of them.
575  *
576  * You should only call this on measurement sockets. It is automatically
577  * done for regular sockets, but you usually want more control about
578  * what's going on with measurement sockets.
579  */
580 gras_socket_t gras_socket_meas_accept(gras_socket_t peer)
581 {
582   gras_socket_t res;
583   THROWF(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
584       "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
585
586   xbt_assert(peer->meas,
587               "No need to accept on non-measurement sockets (it's automatic)");
588
589   if (!peer->accepting) {
590     /* nothing to accept here (must be in SG) */
591     /* BUG: FIXME: this is BAD! it makes tricky to free the accepted socket */
592     return peer;
593   }
594
595   res = (peer->plugin->socket_accept) (peer);
596   res->meas = peer->meas;
597   XBT_CDEBUG(gras_trp_meas, "meas_accepted onto %d", res->sd);
598
599   return res;
600 }
601
602
603 /*
604  * Creating procdata for this module
605  */
606 static void *gras_trp_procdata_new(void)
607 {
608   gras_trp_procdata_t res = xbt_new(s_gras_trp_procdata_t, 1);
609
610   res->name = xbt_strdup("gras_trp");
611   res->name_len = 0;
612   res->sockets = xbt_dynar_new_sync(sizeof(gras_socket_t *), NULL);
613   res->myport = 0;
614
615   return (void *) res;
616 }
617
618 /*
619  * Freeing procdata for this module
620  */
621 static void gras_trp_procdata_free(void *data)
622 {
623   gras_trp_procdata_t res = (gras_trp_procdata_t) data;
624
625   xbt_dynar_free(&(res->sockets));
626   free(res->name);
627   free(res);
628 }
629
630 void gras_trp_socketset_dump(const char *name)
631 {
632   gras_trp_procdata_t procdata =
633       (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
634
635   unsigned int it;
636   gras_socket_t s;
637
638   XBT_INFO("** Dump the socket set %s", name);
639   xbt_dynar_foreach(procdata->sockets, it, s) {
640     XBT_INFO("  %p -> %s:%d %s",
641           s, gras_socket_peer_name(s), gras_socket_peer_port(s),
642           s->valid ? "(valid)" : "(peer dead)");
643   }
644   XBT_INFO("** End of socket set %s", name);
645 }
646
647 /*
648  * Module registration
649  */
650 int gras_trp_libdata_id;
651 void gras_trp_register()
652 {
653   gras_trp_libdata_id =
654       gras_procdata_add("gras_trp", gras_trp_procdata_new,
655                         gras_trp_procdata_free);
656 }
657
658 int gras_os_myport(void)
659 {
660   return ((gras_trp_procdata_t)
661           gras_libdata_by_id(gras_trp_libdata_id))->myport;
662 }