Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Revert "try to port the gras simulation side to the new smx_network infrastructure...
[simgrid.git] / src / gras / Transport / transport_plugin_tcp.c
1 /* $Id$ */
2
3 /* buf trp (transport) - buffered transport using the TCP one               */
4
5 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include <stdlib.h>
11 #include <string.h>             /* memset */
12
13 #include "portable.h"
14 #include "xbt/misc.h"
15 #include "xbt/sysdep.h"
16 #include "xbt/ex.h"
17 #include "gras/Transport/transport_private.h"
18 #include "gras/Msg/msg_interface.h"     /* listener_close_socket */
19
20 /* FIXME maybe READV is sometime a good thing? */
21 #undef HAVE_READV
22
23 #ifdef HAVE_READV
24 #include <sys/uio.h>
25 #endif
26
27 #ifndef MIN
28 #define MIN(a,b) ((a)<(b)?(a):(b))
29 #endif
30
31 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_tcp, gras_trp,
32                                 "TCP buffered transport");
33
34 /***
35  *** Specific socket part
36  ***/
37
38 typedef enum { buffering_buf, buffering_iov } buffering_kind;
39
40 typedef struct {
41   int size;
42   char *data;
43   int pos;                      /* for receive; not exchanged over the net */
44 } gras_trp_buf_t;
45
46
47 struct gras_trp_bufdata_ {
48   int buffsize;
49   gras_trp_buf_t in_buf;
50   gras_trp_buf_t out_buf;
51
52 #ifdef HAVE_READV
53   xbt_dynar_t in_buf_v;
54   xbt_dynar_t out_buf_v;
55 #endif
56
57   buffering_kind in;
58   buffering_kind out;
59 };
60
61
62 /*****************************/
63 /****[ SOCKET MANAGEMENT ]****/
64 /*****************************/
65 /* we exchange port number on client side on socket creation,
66    so we need to be able to talk right now. */
67 static XBT_INLINE void gras_trp_tcp_send(gras_socket_t sock, const char *data,
68                                          unsigned long int size);
69 static int gras_trp_tcp_recv(gras_socket_t sock, char *data,
70                              unsigned long int size);
71
72
73 static int _gras_tcp_proto_number(void);
74
75 static XBT_INLINE void gras_trp_sock_socket_client(gras_trp_plugin_t ignored,
76                                                    gras_socket_t sock)
77 {
78
79   struct sockaddr_in addr;
80   struct hostent *he;
81   struct in_addr *haddr;
82   int size = sock->buf_size;
83   uint32_t myport = htonl(((gras_trp_procdata_t)
84                            gras_libdata_by_id(gras_trp_libdata_id))->myport);
85
86   sock->incoming = 1;           /* TCP sockets are duplex'ed */
87
88   sock->sd = socket(AF_INET, SOCK_STREAM, 0);
89
90   if (sock->sd < 0) {
91     THROW1(system_error, 0, "Failed to create socket: %s",
92            sock_errstr(sock_errno));
93   }
94
95   if (setsockopt
96       (sock->sd, SOL_SOCKET, SO_RCVBUF, (char *) &size, sizeof(size))
97       || setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF, (char *) &size,
98                     sizeof(size))) {
99     VERB1("setsockopt failed, cannot set buffer size: %s",
100           sock_errstr(sock_errno));
101   }
102
103   he = gethostbyname(sock->peer_name);
104   if (he == NULL) {
105     THROW2(system_error, 0, "Failed to lookup hostname %s: %s",
106            sock->peer_name, sock_errstr(sock_errno));
107   }
108
109   haddr = ((struct in_addr *) (he->h_addr_list)[0]);
110
111   memset(&addr, 0, sizeof(struct sockaddr_in));
112   memcpy(&addr.sin_addr, haddr, sizeof(struct in_addr));
113   addr.sin_family = AF_INET;
114   addr.sin_port = htons(sock->peer_port);
115
116   if (connect(sock->sd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
117     tcp_close(sock->sd);
118     THROW3(system_error, 0,
119            "Failed to connect socket to %s:%d (%s)",
120            sock->peer_name, sock->peer_port, sock_errstr(sock_errno));
121   }
122
123   gras_trp_tcp_send(sock, (char *) &myport, sizeof(uint32_t));
124   DEBUG1("peerport sent to %d", sock->peer_port);
125
126   VERB4("Connect to %s:%d (sd=%d, port %d here)",
127         sock->peer_name, sock->peer_port, sock->sd, sock->port);
128 }
129
130 /**
131  * gras_trp_sock_socket_server:
132  *
133  * Open a socket used to receive messages.
134  */
135 static XBT_INLINE void gras_trp_sock_socket_server(gras_trp_plugin_t ignored,
136                                                    gras_socket_t sock)
137 {
138   int size = sock->buf_size;
139   int on = 1;
140   struct sockaddr_in server;
141
142   sock->outgoing = 1;           /* TCP => duplex mode */
143
144   server.sin_port = htons((u_short) sock->port);
145   server.sin_addr.s_addr = INADDR_ANY;
146   server.sin_family = AF_INET;
147   if ((sock->sd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
148     THROW1(system_error, 0, "Socket allocation failed: %s",
149            sock_errstr(sock_errno));
150
151   if (setsockopt
152       (sock->sd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)))
153     THROW1(system_error, 0,
154            "setsockopt failed, cannot condition the socket: %s",
155            sock_errstr(sock_errno));
156
157   if (setsockopt(sock->sd, SOL_SOCKET, SO_RCVBUF,
158                  (char *) &size, sizeof(size))
159       || setsockopt(sock->sd, SOL_SOCKET, SO_SNDBUF,
160                     (char *) &size, sizeof(size))) {
161     VERB1("setsockopt failed, cannot set buffer size: %s",
162           sock_errstr(sock_errno));
163   }
164
165   if (bind(sock->sd, (struct sockaddr *) &server, sizeof(server)) == -1) {
166     tcp_close(sock->sd);
167     THROW2(system_error, 0,
168            "Cannot bind to port %d: %s", sock->port, sock_errstr(sock_errno));
169   }
170
171   DEBUG2("Listen on port %d (sd=%d)", sock->port, sock->sd);
172   if (listen(sock->sd, 5) < 0) {
173     tcp_close(sock->sd);
174     THROW2(system_error, 0,
175            "Cannot listen on port %d: %s",
176            sock->port, sock_errstr(sock_errno));
177   }
178
179   VERB2("Openned a server socket on port %d (sd=%d)", sock->port, sock->sd);
180 }
181
182 static gras_socket_t gras_trp_sock_socket_accept(gras_socket_t sock)
183 {
184   gras_socket_t res;
185
186   struct sockaddr_in peer_in;
187   socklen_t peer_in_len = sizeof(peer_in);
188
189   int sd;
190   int tmp_errno;
191   int size;
192
193   int i = 1;
194   socklen_t s = sizeof(int);
195
196   uint32_t hisport;
197
198   XBT_IN;
199   gras_trp_socket_new(1, &res);
200
201   sd = accept(sock->sd, (struct sockaddr *) &peer_in, &peer_in_len);
202   tmp_errno = sock_errno;
203
204   if (sd == -1) {
205     gras_socket_close(sock);
206     THROW1(system_error, 0,
207            "Accept failed (%s). Droping server socket.",
208            sock_errstr(tmp_errno));
209   }
210
211   if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char *) &i, s)
212       || setsockopt(sd, _gras_tcp_proto_number(), TCP_NODELAY, (char *) &i,
213                     s))
214     THROW1(system_error, 0,
215            "setsockopt failed, cannot condition the socket: %s",
216            sock_errstr(tmp_errno));
217
218   res->buf_size = sock->buf_size;
219   size = sock->buf_size;
220   if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *) &size, sizeof(size))
221       || setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *) &size, sizeof(size)))
222     VERB1("setsockopt failed, cannot set buffer size: %s",
223           sock_errstr(tmp_errno));
224
225   res->plugin = sock->plugin;
226   res->incoming = sock->incoming;
227   res->outgoing = sock->outgoing;
228   res->accepting = 0;
229   res->sd = sd;
230   res->port = -1;
231
232   gras_trp_tcp_recv(res, (char *) &hisport, sizeof(hisport));
233   res->peer_port = ntohl(hisport);
234   DEBUG1("peerport %d received", res->peer_port);
235
236   /* FIXME: Lock to protect inet_ntoa */
237   if (((struct sockaddr *) &peer_in)->sa_family != AF_INET) {
238     res->peer_name = (char *) strdup("unknown");
239   } else {
240     struct in_addr addrAsInAddr;
241     char *tmp;
242
243     addrAsInAddr.s_addr = peer_in.sin_addr.s_addr;
244
245     tmp = inet_ntoa(addrAsInAddr);
246     if (tmp != NULL) {
247       res->peer_name = (char *) strdup(tmp);
248     } else {
249       res->peer_name = (char *) strdup("unknown");
250     }
251   }
252
253   VERB3("Accepted from %s:%d (sd=%d)", res->peer_name, res->peer_port, sd);
254   xbt_dynar_push(((gras_trp_procdata_t)
255                   gras_libdata_by_id(gras_trp_libdata_id))->sockets, &res);
256
257   XBT_OUT;
258   return res;
259 }
260
261 static void gras_trp_sock_socket_close(gras_socket_t sock)
262 {
263
264   if (!sock)
265     return;                     /* close only once */
266
267   VERB1("close tcp connection %d", sock->sd);
268
269   /* ask the listener to close the socket */
270   gras_msg_listener_close_socket(sock->sd);
271 }
272
273 /************************************/
274 /****[ end of SOCKET MANAGEMENT ]****/
275 /************************************/
276
277
278 /************************************/
279 /****[ UNBUFFERED DATA EXCHANGE ]****/
280 /************************************/
281 /* Temptation to merge this with file data exchange is great, 
282    but doesn't work on BillWare (see tcp_write() in portable.h) */
283 static XBT_INLINE void gras_trp_tcp_send(gras_socket_t sock,
284                                          const char *data,
285                                          unsigned long int size)
286 {
287
288   while (size) {
289     int status = 0;
290
291     status = tcp_write(sock->sd, data, (size_t) size);
292     DEBUG3("write(%d, %p, %ld);", sock->sd, data, size);
293
294     if (status < 0) {
295 #ifdef EWOULDBLOCK
296       if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
297 #else
298       if (errno == EINTR || errno == EAGAIN)
299 #endif
300         continue;
301
302       THROW4(system_error, 0, "write(%d,%p,%ld) failed: %s",
303              sock->sd, data, size, sock_errstr(sock_errno));
304     }
305
306     if (status) {
307       size -= status;
308       data += status;
309     } else {
310       THROW1(system_error, 0, "file descriptor closed (%s)",
311              sock_errstr(sock_errno));
312     }
313   }
314 }
315
316 static XBT_INLINE int
317 gras_trp_tcp_recv_withbuffer(gras_socket_t sock,
318                              char *data,
319                              unsigned long int size,
320                              unsigned long int bufsize)
321 {
322
323   int got = 0;
324
325   if (sock->recvd) {
326     data[0] = sock->recvd_val;
327     sock->recvd = 0;
328     got++;
329     bufsize--;
330   }
331
332   while (size > got) {
333     int status = 0;
334
335     DEBUG5("read(%d, %p, %ld) got %d so far (%s)",
336            sock->sd, data + got, bufsize, got,
337            hexa_str((unsigned char *) data, got, 0));
338     status = tcp_read(sock->sd, data + got, (size_t) bufsize);
339
340     if (status < 0) {
341       THROW7(system_error, 0,
342              "read(%d,%p,%d) from %s:%d failed: %s; got %d so far", sock->sd,
343              data + got, (int) size, gras_socket_peer_name(sock),
344              gras_socket_peer_port(sock), sock_errstr(sock_errno), got);
345     }
346     DEBUG2("Got %d more bytes (%s)", status,
347            hexa_str((unsigned char *) data + got, status, 0));
348
349     if (status) {
350       bufsize -= status;
351       got += status;
352     } else {
353       THROW1(system_error, errno,
354              "Socket closed by remote side (got %d bytes before this)", got);
355     }
356   }
357
358   return got;
359 }
360
361 static int gras_trp_tcp_recv(gras_socket_t sock,
362                              char *data, unsigned long int size)
363 {
364   return gras_trp_tcp_recv_withbuffer(sock, data, size, size);
365
366 }
367
368 /*******************************************/
369 /****[ end of UNBUFFERED DATA EXCHANGE ]****/
370 /*******************************************/
371
372 /**********************************/
373 /****[ BUFFERED DATA EXCHANGE ]****/
374 /**********************************/
375
376 /* Make sure the data is sent */
377 static void gras_trp_bufiov_flush(gras_socket_t sock)
378 {
379 #ifdef HAVE_READV
380   xbt_dynar_t vect;
381   int size;
382 #endif
383   gras_trp_bufdata_t *data = sock->bufdata;
384   XBT_IN;
385
386   DEBUG0("Flush");
387   if (data->out == buffering_buf) {
388     if (XBT_LOG_ISENABLED(gras_trp_tcp, xbt_log_priority_debug))
389       hexa_print("chunk to send ",
390                  (unsigned char *) data->out_buf.data, data->out_buf.size);
391     if ((data->out_buf.size - data->out_buf.pos) != 0) {
392       DEBUG3("Send the chunk (size=%d) to %s:%d", data->out_buf.size,
393              gras_socket_peer_name(sock), gras_socket_peer_port(sock));
394       gras_trp_tcp_send(sock, data->out_buf.data, data->out_buf.size);
395       VERB1("Chunk sent (size=%d)", data->out_buf.size);
396       data->out_buf.size = 0;
397     }
398   }
399 #ifdef HAVE_READV
400   if (data->out == buffering_iov) {
401     DEBUG0("Flush out iov");
402     vect = sock->bufdata->out_buf_v;
403     if ((size = xbt_dynar_length(vect))) {
404       DEBUG1("Flush %d chunks out of this socket", size);
405       writev(sock->sd, xbt_dynar_get_ptr(vect, 0), size);
406       xbt_dynar_reset(vect);
407     }
408     data->out_buf.size = 0;     /* reset the buffer containing non-stable data */
409   }
410
411   if (data->in == buffering_iov) {
412     DEBUG0("Flush in iov");
413     vect = sock->bufdata->in_buf_v;
414     if ((size = xbt_dynar_length(vect))) {
415       DEBUG1("Get %d chunks from of this socket", size);
416       readv(sock->sd, xbt_dynar_get_ptr(vect, 0), size);
417       xbt_dynar_reset(vect);
418     }
419   }
420 #endif
421 }
422
423 static void
424 gras_trp_buf_send(gras_socket_t sock,
425                   const char *chunk,
426                   unsigned long int size, int stable_ignored)
427 {
428
429   gras_trp_bufdata_t *data = (gras_trp_bufdata_t *) sock->bufdata;
430   int chunk_pos = 0;
431
432   XBT_IN;
433
434   while (chunk_pos < size) {
435     /* size of the chunk to receive in that shot */
436     long int thissize =
437       min(size - chunk_pos, data->buffsize - data->out_buf.size);
438     DEBUG4("Set the chars %d..%ld into the buffer; size=%ld, ctn=(%s)",
439            (int) data->out_buf.size,
440            ((int) data->out_buf.size) + thissize - 1, size,
441            hexa_str((unsigned char *) chunk, thissize, 0));
442
443     memcpy(data->out_buf.data + data->out_buf.size, chunk + chunk_pos,
444            thissize);
445
446     data->out_buf.size += thissize;
447     chunk_pos += thissize;
448     DEBUG4("New pos = %d; Still to send = %ld of %ld; ctn sofar=(%s)",
449            data->out_buf.size, size - chunk_pos, size,
450            hexa_str((unsigned char *) chunk, chunk_pos, 0));
451
452     if (data->out_buf.size == data->buffsize)   /* out of space. Flush it */
453       gras_trp_bufiov_flush(sock);
454   }
455
456   XBT_OUT;
457 }
458
459 static int
460 gras_trp_buf_recv(gras_socket_t sock, char *chunk, unsigned long int size)
461 {
462
463   gras_trp_bufdata_t *data = sock->bufdata;
464   long int chunk_pos = 0;
465
466   XBT_IN;
467
468   while (chunk_pos < size) {
469     /* size of the chunk to receive in that shot */
470     long int thissize;
471
472     if (data->in_buf.size == data->in_buf.pos) {        /* out of data. Get more */
473
474       DEBUG2("Get more data (size=%d,bufsize=%d)",
475              (int) MIN(size - chunk_pos, data->buffsize),
476              (int) data->buffsize);
477
478
479       data->in_buf.size =
480         gras_trp_tcp_recv_withbuffer(sock, data->in_buf.data,
481                                      MIN(size - chunk_pos, data->buffsize),
482                                      data->buffsize);
483
484       data->in_buf.pos = 0;
485     }
486
487     thissize = min(size - chunk_pos, data->in_buf.size - data->in_buf.pos);
488     memcpy(chunk + chunk_pos, data->in_buf.data + data->in_buf.pos, thissize);
489
490     data->in_buf.pos += thissize;
491     chunk_pos += thissize;
492     DEBUG4("New pos = %d; Still to receive = %ld of %ld. Ctn so far=(%s)",
493            data->in_buf.pos, size - chunk_pos, size,
494            hexa_str((unsigned char *) chunk, chunk_pos, 0));
495   }
496   /* indicate on need to the gras_select function that there is more to read on this socket so that it does not actually select */
497   sock->moredata = (data->in_buf.size > data->in_buf.pos);
498   DEBUG1("There is %smore data", (sock->moredata ? "" : "no "));
499
500   XBT_OUT;
501   return chunk_pos;
502 }
503
504 /*****************************************/
505 /****[ end of BUFFERED DATA EXCHANGE ]****/
506 /*****************************************/
507
508 /********************************/
509 /****[ VECTOR DATA EXCHANGE ]****/
510 /********************************/
511 #ifdef HAVE_READV
512 static void
513 gras_trp_iov_send(gras_socket_t sock,
514                   const char *chunk, unsigned long int size, int stable)
515 {
516   struct iovec elm;
517   gras_trp_bufdata_t *data = (gras_trp_bufdata_t *) sock->bufdata;
518
519
520   DEBUG1("Buffer one chunk to be sent later (%s)",
521          hexa_str((char *) chunk, size, 0));
522
523   elm.iov_len = (size_t) size;
524
525   if (!stable) {
526     /* data storage won't last until flush. Save it in a buffer if we can */
527
528     if (size > data->buffsize - data->out_buf.size) {
529       /* buffer too small: 
530          flush the socket, using data in its actual storage */
531       elm.iov_base = (void *) chunk;
532       xbt_dynar_push(data->out_buf_v, &elm);
533
534       gras_trp_bufiov_flush(sock);
535       return;
536     } else {
537       /* buffer big enough: 
538          copy data into it, and chain it for upcoming writev */
539       memcpy(data->out_buf.data + data->out_buf.size, chunk, size);
540       elm.iov_base = (void *) (data->out_buf.data + data->out_buf.size);
541       data->out_buf.size += size;
542
543       xbt_dynar_push(data->out_buf_v, &elm);
544     }
545
546   } else {
547     /* data storage stable. Chain it */
548
549     elm.iov_base = (void *) chunk;
550     xbt_dynar_push(data->out_buf_v, &elm);
551   }
552 }
553
554 static int
555 gras_trp_iov_recv(gras_socket_t sock, char *chunk, unsigned long int size)
556 {
557   struct iovec elm;
558
559   DEBUG0("Buffer one chunk to be received later");
560   elm.iov_base = (void *) chunk;
561   elm.iov_len = (size_t) size;
562   xbt_dynar_push(sock->bufdata->in_buf_v, &elm);
563
564   return size;
565 }
566
567 #endif
568 /***************************************/
569 /****[ end of VECTOR DATA EXCHANGE ]****/
570 /***************************************/
571
572
573 /***
574  *** Prototypes of BUFFERED
575  ***/
576
577 void gras_trp_buf_socket_client(gras_trp_plugin_t self, gras_socket_t sock);
578 void gras_trp_buf_socket_server(gras_trp_plugin_t self, gras_socket_t sock);
579 gras_socket_t gras_trp_buf_socket_accept(gras_socket_t sock);
580
581 void gras_trp_buf_socket_close(gras_socket_t sd);
582
583
584 gras_socket_t gras_trp_buf_init_sock(gras_socket_t sock)
585 {
586   gras_trp_bufdata_t *data = xbt_new(gras_trp_bufdata_t, 1);
587
588   data->buffsize = 100 * 1024;  /* 100k */
589
590   data->in_buf.size = 0;
591   data->in_buf.data = xbt_malloc(data->buffsize);
592   data->in_buf.pos = 0;         /* useless, indeed, since size==pos */
593
594   data->out_buf.size = 0;
595   data->out_buf.data = xbt_malloc(data->buffsize);
596   data->out_buf.pos = data->out_buf.size;
597
598 #ifdef HAVE_READV
599   data->in_buf_v = data->out_buf_v = NULL;
600   data->in_buf_v = xbt_dynar_new(sizeof(struct iovec), NULL);
601   data->out_buf_v = xbt_dynar_new(sizeof(struct iovec), NULL);
602   data->out = buffering_iov;
603 #else
604   data->out = buffering_buf;
605 #endif
606
607   data->in = buffering_buf;
608
609   sock->bufdata = data;
610   return sock;
611 }
612
613 /***
614  *** Code
615  ***/
616 void gras_trp_tcp_setup(gras_trp_plugin_t plug)
617 {
618
619   plug->socket_client = gras_trp_buf_socket_client;
620   plug->socket_server = gras_trp_buf_socket_server;
621   plug->socket_accept = gras_trp_buf_socket_accept;
622   plug->socket_close = gras_trp_buf_socket_close;
623
624 #ifdef HAVE_READV
625   plug->send = gras_trp_iov_send;
626 #else
627   plug->send = gras_trp_buf_send;
628 #endif
629   plug->recv = gras_trp_buf_recv;
630
631   plug->raw_send = gras_trp_tcp_send;
632   plug->raw_recv = gras_trp_tcp_recv;
633
634   plug->flush = gras_trp_bufiov_flush;
635
636   plug->data = NULL;
637   plug->exit = NULL;
638 }
639
640 void gras_trp_buf_socket_client(gras_trp_plugin_t self,
641                                 /* OUT */ gras_socket_t sock)
642 {
643
644   gras_trp_sock_socket_client(NULL, sock);
645   gras_trp_buf_init_sock(sock);
646 }
647
648 /**
649  * gras_trp_buf_socket_server:
650  *
651  * Open a socket used to receive messages.
652  */
653 void gras_trp_buf_socket_server(gras_trp_plugin_t self,
654                                 /* OUT */ gras_socket_t sock)
655 {
656
657   gras_trp_sock_socket_server(NULL, sock);
658   gras_trp_buf_init_sock(sock);
659 }
660
661 gras_socket_t gras_trp_buf_socket_accept(gras_socket_t sock)
662 {
663   return gras_trp_buf_init_sock(gras_trp_sock_socket_accept(sock));
664 }
665
666 void gras_trp_buf_socket_close(gras_socket_t sock)
667 {
668   gras_trp_bufdata_t *data = sock->bufdata;
669
670   if (data->in_buf.size != data->in_buf.pos) {
671     WARN3("Socket closed, but %d bytes were unread (size=%d,pos=%d)",
672           data->in_buf.size - data->in_buf.pos,
673           data->in_buf.size, data->in_buf.pos);
674   }
675   if (data->in_buf.data)
676     free(data->in_buf.data);
677
678   if (data->out_buf.size != data->out_buf.pos) {
679     DEBUG2("Flush the socket before closing (in=%d,out=%d)",
680            data->in_buf.size, data->out_buf.size);
681     gras_trp_bufiov_flush(sock);
682   }
683   if (data->out_buf.data)
684     free(data->out_buf.data);
685
686 #ifdef HAVE_READV
687   if (data->in_buf_v) {
688     if (xbt_dynar_length(data->in_buf_v))
689       WARN0("Socket closed, but some bytes were unread");
690     xbt_dynar_free(&data->in_buf_v);
691   }
692   if (data->out_buf_v) {
693     if (xbt_dynar_length(data->out_buf_v)) {
694       DEBUG0("Flush the socket before closing");
695       gras_trp_bufiov_flush(sock);
696     }
697     xbt_dynar_free(&data->out_buf_v);
698   }
699 #endif
700
701   free(data);
702   gras_trp_sock_socket_close(sock);
703 }
704
705 /****************************/
706 /****[ HELPER FUNCTIONS ]****/
707 /****************************/
708
709 /*
710  * Returns the tcp protocol number from the network protocol data base.
711  *
712  * getprotobyname() is not thread safe. We need to lock it.
713  */
714 static int _gras_tcp_proto_number(void)
715 {
716   struct protoent *fetchedEntry;
717   static int returnValue = 0;
718
719   if (returnValue == 0) {
720     fetchedEntry = getprotobyname("tcp");
721     xbt_assert0(fetchedEntry, "getprotobyname(tcp) gave NULL");
722     returnValue = fetchedEntry->p_proto;
723   }
724
725   return returnValue;
726 }
727
728 #ifdef HAVE_WINSOCK_H
729 #define RETSTR( x ) case x: return #x
730
731 const char *gras_wsa_err2string(int err)
732 {
733   switch (err) {
734     RETSTR(WSAEINTR);
735     RETSTR(WSAEBADF);
736     RETSTR(WSAEACCES);
737     RETSTR(WSAEFAULT);
738     RETSTR(WSAEINVAL);
739     RETSTR(WSAEMFILE);
740     RETSTR(WSAEWOULDBLOCK);
741     RETSTR(WSAEINPROGRESS);
742     RETSTR(WSAEALREADY);
743     RETSTR(WSAENOTSOCK);
744     RETSTR(WSAEDESTADDRREQ);
745     RETSTR(WSAEMSGSIZE);
746     RETSTR(WSAEPROTOTYPE);
747     RETSTR(WSAENOPROTOOPT);
748     RETSTR(WSAEPROTONOSUPPORT);
749     RETSTR(WSAESOCKTNOSUPPORT);
750     RETSTR(WSAEOPNOTSUPP);
751     RETSTR(WSAEPFNOSUPPORT);
752     RETSTR(WSAEAFNOSUPPORT);
753     RETSTR(WSAEADDRINUSE);
754     RETSTR(WSAEADDRNOTAVAIL);
755     RETSTR(WSAENETDOWN);
756     RETSTR(WSAENETUNREACH);
757     RETSTR(WSAENETRESET);
758     RETSTR(WSAECONNABORTED);
759     RETSTR(WSAECONNRESET);
760     RETSTR(WSAENOBUFS);
761     RETSTR(WSAEISCONN);
762     RETSTR(WSAENOTCONN);
763     RETSTR(WSAESHUTDOWN);
764     RETSTR(WSAETOOMANYREFS);
765     RETSTR(WSAETIMEDOUT);
766     RETSTR(WSAECONNREFUSED);
767     RETSTR(WSAELOOP);
768     RETSTR(WSAENAMETOOLONG);
769     RETSTR(WSAEHOSTDOWN);
770     RETSTR(WSAEHOSTUNREACH);
771     RETSTR(WSAENOTEMPTY);
772     RETSTR(WSAEPROCLIM);
773     RETSTR(WSAEUSERS);
774     RETSTR(WSAEDQUOT);
775     RETSTR(WSAESTALE);
776     RETSTR(WSAEREMOTE);
777     RETSTR(WSASYSNOTREADY);
778     RETSTR(WSAVERNOTSUPPORTED);
779     RETSTR(WSANOTINITIALISED);
780     RETSTR(WSAEDISCON);
781
782 #ifdef HAVE_WINSOCK2
783     RETSTR(WSAENOMORE);
784     RETSTR(WSAECANCELLED);
785     RETSTR(WSAEINVALIDPROCTABLE);
786     RETSTR(WSAEINVALIDPROVIDER);
787     RETSTR(WSASYSCALLFAILURE);
788     RETSTR(WSASERVICE_NOT_FOUND);
789     RETSTR(WSATYPE_NOT_FOUND);
790     RETSTR(WSA_E_NO_MORE);
791     RETSTR(WSA_E_CANCELLED);
792     RETSTR(WSAEREFUSED);
793 #endif /* HAVE_WINSOCK2 */
794
795     RETSTR(WSAHOST_NOT_FOUND);
796     RETSTR(WSATRY_AGAIN);
797     RETSTR(WSANO_RECOVERY);
798     RETSTR(WSANO_DATA);
799   }
800   return "unknown WSA error";
801 }
802 #endif /* HAVE_WINSOCK_H */
803
804 /***********************************/
805 /****[ end of HELPER FUNCTIONS ]****/
806 /***********************************/