Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix a vicious bug: TCP socket use a buffer and read operation get as much data as...
[simgrid.git] / src / gras / Transport / rl_transport.c
1 /* $Id$ */
2
3 /* rl_transport - RL specific functions for transport                       */
4
5 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/ex.h"
11 #include "portable.h"
12 #include "gras/Transport/transport_private.h"
13 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_trp);
14
15 /**
16  * gras_trp_select:
17  *
18  * Returns the next socket to service because it receives a message.
19  *
20  * if timeout<0, we ought to implement the adaptative timeout (FIXME)
21  *
22  * if timeout=0, do not wait for new message, only handle the ones already there.
23  *
24  * if timeout>0 and no message there, wait at most that amount of time before giving up.
25  */
26 gras_socket_t gras_trp_select(double timeout) {
27   xbt_dynar_t sockets= ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets;
28   int done = -1;
29   double wakeup = gras_os_time() + timeout;
30   double now = 0;
31   /* nextToService used to make sure socket with high number do not starve */
32   /*  static int nextToService = 0; */
33   struct timeval tout, *p_tout;
34
35   int max_fds=0; /* first arg of select: number of existing sockets */
36   /* but accept() of winsock returns sockets bigger than the limit, so don't bother 
37      with this tiny optimisation on BillWare */
38   fd_set FDS;
39   int ready; /* return of select: number of socket ready to be serviced */
40   int fd_setsize; /* FD_SETSIZE not always defined. Get this portably */
41
42   gras_socket_t sock_iter; /* iterating over all sockets */
43   int cursor;              /* iterating over all sockets */
44
45   /* Check whether there is more data to read from the socket we selected last time.
46      This can happen with tcp buffered sockets since we try to get as much data as we can for them */
47   static gras_socket_t _lastly_selected_socket = NULL;
48   if (_lastly_selected_socket && _lastly_selected_socket->moredata) 
49      return _lastly_selected_socket;
50   
51   /* Compute FD_SETSIZE */
52 #ifdef HAVE_SYSCONF
53    fd_setsize = sysconf( _SC_OPEN_MAX );
54 #else
55 #  ifdef HAVE_GETDTABLESIZE 
56    fd_setsize = getdtablesize();
57 #  else
58    fd_setsize = FD_SETSIZE;
59 #  endif /* !USE_SYSCONF */
60 #endif
61
62   while (done == -1) {
63     if (timeout > 0) { /* did we timeout already? */
64       now = gras_os_time();
65       DEBUG2("wakeup=%f now=%f",wakeup, now);
66       if (now == -1 || now >= wakeup) {
67         /* didn't find anything; no need to update _lastly_selected_socket since its moredata is 0 (or we would have returned it directly) */
68         THROW1(timeout_error,0,
69                "Timeout (%f) elapsed with selecting for incomming connexions",
70                timeout);
71       }
72     }
73
74     /* construct the set of socket to ear from */
75     FD_ZERO(&FDS);
76     max_fds = -1;
77     xbt_dynar_foreach(sockets,cursor,sock_iter) {
78       if (!sock_iter->valid)
79          continue;
80        
81       if (sock_iter->incoming) {
82         DEBUG1("Considering socket %d for select",sock_iter->sd);
83 #ifndef HAVE_WINSOCK_H
84         if (max_fds < sock_iter->sd)
85           max_fds = sock_iter->sd;
86 #endif
87         FD_SET(sock_iter->sd, &FDS);
88       } else {
89         DEBUG1("Not considering socket %d for select",sock_iter->sd);
90       }
91     }
92
93     if (max_fds == -1) {
94        if (timeout > 0) {
95           DEBUG1("No socket to select onto. Sleep %f sec instead.",timeout);
96           gras_os_sleep(timeout);
97           THROW1(timeout_error,0,"No socket to select onto. Sleep %f sec instead",timeout);
98        } else {
99           DEBUG0("No socket to select onto. Return directly.");
100           THROW0(timeout_error,0, "No socket to select onto. Return directly.");
101        }
102     }
103
104 #ifndef HAVE_WINSOCK_H
105     /* we cannot have more than FD_SETSIZE sockets 
106        ... but with WINSOCK which returns sockets higher than the limit (killing this optim) */
107     if (++max_fds > fd_setsize && fd_setsize > 0) {
108       WARN1("too many open sockets (%d).",max_fds);
109       done = 0;
110       break;
111     }
112 #else
113     max_fds = fd_setsize;
114 #endif
115
116     if (timeout > 0) { 
117       /* set the timeout */
118       tout.tv_sec = (unsigned long)(wakeup - now);
119       tout.tv_usec = ((wakeup -now) - ((unsigned long)(wakeup - now))) * 1000000;
120       p_tout = &tout;
121     } else if (timeout == 0) {
122       /* polling only */
123       tout.tv_sec = 0;
124       tout.tv_usec = 0;
125       p_tout = &tout;
126       /* we just do one loop around */
127       done = 0;
128     } else { 
129       /* no timeout: good luck! */
130       p_tout = NULL;
131     }
132      
133     DEBUG2("Selecting over %d socket(s); timeout=%f", max_fds-1,timeout);
134     ready = select(max_fds, &FDS, NULL, NULL, p_tout);
135     DEBUG1("select returned %d", ready);
136     if (ready == -1) {
137       switch (errno) {
138       case  EINTR: /* a signal we don't care about occured. we don't care */
139         /* if we cared, we would have set an handler */
140         continue;
141       case EINVAL: /* invalid value */
142         THROW3(system_error,EINVAL,"invalid select: nb fds: %d, timeout: %d.%d",
143                max_fds, (int)tout.tv_sec,(int) tout.tv_usec);
144       case ENOMEM: 
145         xbt_assert0(0,"Malloc error during the select");
146       default:
147         THROW2(system_error,errno,"Error during select: %s (%d)",
148                strerror(errno),errno);
149       }
150       THROW_IMPOSSIBLE;
151     } else if (ready == 0) {
152       continue;  /* this was a timeout */
153     }
154
155     xbt_dynar_foreach(sockets,cursor,sock_iter) {
156        if(!FD_ISSET(sock_iter->sd, &FDS)) { /* this socket is not ready */
157         continue;
158        }
159        
160        /* Got a socket to serve */
161        ready--;
162
163        if (   sock_iter->accepting
164            && sock_iter->plugin->socket_accept) { 
165          /* not a socket but an ear. accept on it and serve next socket */
166          gras_socket_t accepted=NULL;
167          
168          accepted = (sock_iter->plugin->socket_accept)(sock_iter);
169          DEBUG2("accepted=%p,&accepted=%p",accepted,&accepted);
170          accepted->meas = sock_iter->meas;
171
172        } else if (sock_iter->recv_ok) {
173          /* Make sure the socket is still alive by reading the first byte */
174          char lookahead;
175          int recvd;
176
177          recvd = recv(sock_iter->sd, &lookahead, 1, MSG_PEEK);
178          if (recvd < 0) {
179            WARN2("socket %d failed: %s", sock_iter->sd, strerror(errno));
180            /* done with this socket */
181            gras_socket_close(sock_iter);
182            cursor--;
183          } else if (recvd == 0) {
184            /* Connection reset (=closed) by peer. */
185            DEBUG1("Connection %d reset by peer", sock_iter->sd);
186            sock_iter->valid=0; /* don't close it. User may keep references to it */
187          } else { 
188            /* Got a suited socket ! */
189            XBT_OUT;
190            _lastly_selected_socket = sock_iter;
191            return sock_iter;
192          }
193
194        } else {
195          /* This is a file socket. Cannot recv() on it, but it must be alive */
196            XBT_OUT;
197            _lastly_selected_socket = sock_iter;
198            return sock_iter;
199        }
200
201        
202        /* if we're here, the socket we found wasn't really ready to be served */
203        if (ready == 0) /* exausted all sockets given by select. Request new ones */
204          break; 
205     }
206
207   }
208
209   /* No socket found. Maybe we had timeout=0 and nothing to do */
210   DEBUG0("TIMEOUT");
211   THROW0(timeout_error,0,"Timeout");
212 }
213
214 void gras_trp_sg_setup(gras_trp_plugin_t plug) {
215   THROW0(mismatch_error,0,"No SG transport on live platforms");
216 }
217