Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
098a2355f9e28c9c584f62fcd0b684eceda971b1
[simgrid.git] / src / gras / Transport / rl_transport.c
1 /* $Id$ */
2
3 /* rl_transport - RL specific functions for transport                       */
4
5 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/ex.h"
11 #include "portable.h"
12 #include "gras/Transport/transport_private.h"
13 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_trp);
14
15 /* check transport_private.h for an explanation of this variable */
16 gras_socket_t _gras_lastly_selected_socket = NULL;
17
18 /**
19  * gras_trp_select:
20  *
21  * Returns the next socket to service because it receives a message.
22  *
23  * if timeout<0, we ought to implement the adaptative timeout (FIXME)
24  *
25  * if timeout=0, do not wait for new message, only handle the ones already there.
26  *
27  * if timeout>0 and no message there, wait at most that amount of time before giving up.
28  */
29 gras_socket_t gras_trp_select(double timeout) {
30   xbt_dynar_t sockets= ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets;
31   int done = -1;
32   double wakeup = gras_os_time() + timeout;
33   double now = 0;
34   /* nextToService used to make sure socket with high number do not starve */
35   /*  static int nextToService = 0; */
36   struct timeval tout, *p_tout;
37
38   int max_fds=0; /* first arg of select: number of existing sockets */
39   /* but accept() of winsock returns sockets bigger than the limit, so don't bother 
40      with this tiny optimisation on BillWare */
41   fd_set FDS;
42   int ready; /* return of select: number of socket ready to be serviced */
43   static int fd_setsize=-1; /* FD_SETSIZE not always defined. Get this portably */
44
45   gras_socket_t sock_iter; /* iterating over all sockets */
46   int cursor;              /* iterating over all sockets */
47
48   /* Check whether there is more data to read from the socket we selected last time.
49      This can happen with tcp buffered sockets since we try to get as much data as we can for them */
50   if (_gras_lastly_selected_socket && _gras_lastly_selected_socket->moredata) {
51      VERB0("Returning _gras_lastly_selected_socket since there is more data on it");
52      return _gras_lastly_selected_socket;
53   }
54   
55   /* Compute FD_SETSIZE on need */
56   if (fd_setsize < 0) { 
57 #ifdef HAVE_SYSCONF
58      fd_setsize = sysconf( _SC_OPEN_MAX );
59 #else
60 #  ifdef HAVE_GETDTABLESIZE 
61      fd_setsize = getdtablesize();
62 #  else
63      fd_setsize = FD_SETSIZE;
64 #  endif /* !USE_SYSCONF */
65 #endif
66   }
67
68   while (done == -1) {
69     if (timeout > 0) { /* did we timeout already? */
70       now = gras_os_time();
71       DEBUG2("wakeup=%f now=%f",wakeup, now);
72       if (now == -1 || now >= wakeup) {
73         /* didn't find anything; no need to update _gras_lastly_selected_socket since its moredata is 0 (or we would have returned it directly) */
74         THROW1(timeout_error,0,
75                "Timeout (%f) elapsed with selecting for incomming connexions",
76                timeout);
77       }
78     }
79
80     /* construct the set of socket to ear from */
81     FD_ZERO(&FDS);
82     max_fds = -1;
83     xbt_dynar_foreach(sockets,cursor,sock_iter) {
84       if (!sock_iter->valid)
85          continue;
86        
87       if (sock_iter->incoming) {
88         DEBUG1("Considering socket %d for select",sock_iter->sd);
89 #ifndef HAVE_WINSOCK_H
90         if (max_fds < sock_iter->sd)
91           max_fds = sock_iter->sd;
92 #endif
93         FD_SET(sock_iter->sd, &FDS);
94       } else {
95         DEBUG1("Not considering socket %d for select",sock_iter->sd);
96       }
97     }
98
99     if (max_fds == -1) {
100        if (timeout > 0) {
101           DEBUG1("No socket to select onto. Sleep %f sec instead.",timeout);
102           gras_os_sleep(timeout);
103           THROW1(timeout_error,0,"No socket to select onto. Sleep %f sec instead",timeout);
104        } else {
105           DEBUG0("No socket to select onto. Return directly.");
106           THROW0(timeout_error,0, "No socket to select onto. Return directly.");
107        }
108     }
109
110 #ifndef HAVE_WINSOCK_H
111     /* we cannot have more than FD_SETSIZE sockets 
112        ... but with WINSOCK which returns sockets higher than the limit (killing this optim) */
113     if (++max_fds > fd_setsize && fd_setsize > 0) {
114       WARN1("too many open sockets (%d).",max_fds);
115       done = 0;
116       break;
117     }
118 #else
119     max_fds = fd_setsize;
120 #endif
121
122     if (timeout > 0) { 
123       /* set the timeout */
124       tout.tv_sec = (unsigned long)(wakeup - now);
125       tout.tv_usec = ((wakeup -now) - ((unsigned long)(wakeup - now))) * 1000000;
126       p_tout = &tout;
127     } else if (timeout == 0) {
128       /* polling only */
129       tout.tv_sec = 0;
130       tout.tv_usec = 0;
131       p_tout = &tout;
132       /* we just do one loop around */
133       done = 0;
134     } else { 
135       /* no timeout: good luck! */
136       p_tout = NULL;
137     }
138      
139     DEBUG2("Selecting over %d socket(s); timeout=%f", max_fds-1,timeout);
140     ready = select(max_fds, &FDS, NULL, NULL, p_tout);
141     DEBUG1("select returned %d", ready);
142     if (ready == -1) {
143       switch (errno) {
144       case  EINTR: /* a signal we don't care about occured. we don't care */
145         /* if we cared, we would have set an handler */
146         continue;
147       case EINVAL: /* invalid value */
148         THROW3(system_error,EINVAL,"invalid select: nb fds: %d, timeout: %d.%d",
149                max_fds, (int)tout.tv_sec,(int) tout.tv_usec);
150       case ENOMEM: 
151         xbt_assert0(0,"Malloc error during the select");
152       default:
153         THROW2(system_error,errno,"Error during select: %s (%d)",
154                strerror(errno),errno);
155       }
156       THROW_IMPOSSIBLE;
157     } else if (ready == 0) {
158       continue;  /* this was a timeout */
159     }
160
161     xbt_dynar_foreach(sockets,cursor,sock_iter) {
162        if(!FD_ISSET(sock_iter->sd, &FDS)) { /* this socket is not ready */
163         continue;
164        }
165        
166        /* Got a socket to serve */
167        ready--;
168
169        if (   sock_iter->accepting
170            && sock_iter->plugin->socket_accept) { 
171          /* not a socket but an ear. accept on it and serve next socket */
172          gras_socket_t accepted=NULL;
173          
174          accepted = (sock_iter->plugin->socket_accept)(sock_iter);
175          DEBUG2("accepted=%p,&accepted=%p",accepted,&accepted);
176          accepted->meas = sock_iter->meas;
177
178        } else if (sock_iter->recv_ok) {
179          /* Make sure the socket is still alive by reading the first byte */
180          char lookahead;
181          int recvd;
182
183          recvd = recv(sock_iter->sd, &lookahead, 1, MSG_PEEK);
184          if (recvd < 0) {
185            WARN2("socket %d failed: %s", sock_iter->sd, strerror(errno));
186            /* done with this socket */
187            gras_socket_close(sock_iter);
188            cursor--;
189          } else if (recvd == 0) {
190            /* Connection reset (=closed) by peer. */
191            DEBUG1("Connection %d reset by peer", sock_iter->sd);
192            sock_iter->valid=0; /* don't close it. User may keep references to it */
193          } else { 
194            /* Got a suited socket ! */
195            XBT_OUT;
196            _gras_lastly_selected_socket = sock_iter;
197            return sock_iter;
198          }
199
200        } else {
201          /* This is a file socket. Cannot recv() on it, but it must be alive */
202            XBT_OUT;
203            _gras_lastly_selected_socket = sock_iter;
204            return sock_iter;
205        }
206
207        
208        /* if we're here, the socket we found wasn't really ready to be served */
209        if (ready == 0) /* exausted all sockets given by select. Request new ones */
210          break; 
211     }
212
213   }
214
215   /* No socket found. Maybe we had timeout=0 and nothing to do */
216   DEBUG0("TIMEOUT");
217   THROW0(timeout_error,0,"Timeout");
218 }
219
220 void gras_trp_sg_setup(gras_trp_plugin_t plug) {
221   THROW0(mismatch_error,0,"No SG transport on live platforms");
222 }
223