Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
8b7c9d7151fa3fdb59364407d54e171538e3b1ca
[simgrid.git] / src / gras / Transport / rl_transport.c
1 /* $Id$ */
2
3 /* rl_transport - RL specific functions for transport                       */
4
5 /* Copyright (c) 2004 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/ex.h"
11 #include "portable.h"
12 #include "gras/Transport/transport_private.h"
13 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_trp);
14
15 /* check transport_private.h for an explanation of this variable */
16 gras_socket_t _gras_lastly_selected_socket = NULL;
17
18 /**
19  * gras_trp_select:
20  *
21  * Returns the next socket to service because it receives a message.
22  *
23  * if timeout<0, we ought to implement the adaptative timeout (FIXME)
24  *
25  * if timeout=0, do not wait for new message, only handle the ones already there.
26  *
27  * if timeout>0 and no message there, wait at most that amount of time before giving up.
28  */
29 gras_socket_t gras_trp_select(double timeout) {
30   xbt_dynar_t sockets= ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets;
31   int done = -1;
32   double wakeup = gras_os_time() + timeout;
33   double now = 0;
34   /* nextToService used to make sure socket with high number do not starve */
35   /*  static int nextToService = 0; */
36   struct timeval tout, *p_tout;
37
38   int max_fds=0; /* first arg of select: number of existing sockets */
39   /* but accept() of winsock returns sockets bigger than the limit, so don't bother 
40      with this tiny optimisation on BillWare */
41   fd_set FDS;
42   int ready; /* return of select: number of socket ready to be serviced */
43   int fd_setsize; /* FD_SETSIZE not always defined. Get this portably */
44
45   gras_socket_t sock_iter; /* iterating over all sockets */
46   int cursor;              /* iterating over all sockets */
47
48   /* Check whether there is more data to read from the socket we selected last time.
49      This can happen with tcp buffered sockets since we try to get as much data as we can for them */
50   if (_gras_lastly_selected_socket && _gras_lastly_selected_socket->moredata) {
51      VERB0("Returning _gras_lastly_selected_socket since there is more data on it");
52      return _gras_lastly_selected_socket;
53   }
54   
55   /* Compute FD_SETSIZE */
56 #ifdef HAVE_SYSCONF
57    fd_setsize = sysconf( _SC_OPEN_MAX );
58 #else
59 #  ifdef HAVE_GETDTABLESIZE 
60    fd_setsize = getdtablesize();
61 #  else
62    fd_setsize = FD_SETSIZE;
63 #  endif /* !USE_SYSCONF */
64 #endif
65
66   while (done == -1) {
67     if (timeout > 0) { /* did we timeout already? */
68       now = gras_os_time();
69       DEBUG2("wakeup=%f now=%f",wakeup, now);
70       if (now == -1 || now >= wakeup) {
71         /* didn't find anything; no need to update _gras_lastly_selected_socket since its moredata is 0 (or we would have returned it directly) */
72         THROW1(timeout_error,0,
73                "Timeout (%f) elapsed with selecting for incomming connexions",
74                timeout);
75       }
76     }
77
78     /* construct the set of socket to ear from */
79     FD_ZERO(&FDS);
80     max_fds = -1;
81     xbt_dynar_foreach(sockets,cursor,sock_iter) {
82       if (!sock_iter->valid)
83          continue;
84        
85       if (sock_iter->incoming) {
86         DEBUG1("Considering socket %d for select",sock_iter->sd);
87 #ifndef HAVE_WINSOCK_H
88         if (max_fds < sock_iter->sd)
89           max_fds = sock_iter->sd;
90 #endif
91         FD_SET(sock_iter->sd, &FDS);
92       } else {
93         DEBUG1("Not considering socket %d for select",sock_iter->sd);
94       }
95     }
96
97     if (max_fds == -1) {
98        if (timeout > 0) {
99           DEBUG1("No socket to select onto. Sleep %f sec instead.",timeout);
100           gras_os_sleep(timeout);
101           THROW1(timeout_error,0,"No socket to select onto. Sleep %f sec instead",timeout);
102        } else {
103           DEBUG0("No socket to select onto. Return directly.");
104           THROW0(timeout_error,0, "No socket to select onto. Return directly.");
105        }
106     }
107
108 #ifndef HAVE_WINSOCK_H
109     /* we cannot have more than FD_SETSIZE sockets 
110        ... but with WINSOCK which returns sockets higher than the limit (killing this optim) */
111     if (++max_fds > fd_setsize && fd_setsize > 0) {
112       WARN1("too many open sockets (%d).",max_fds);
113       done = 0;
114       break;
115     }
116 #else
117     max_fds = fd_setsize;
118 #endif
119
120     if (timeout > 0) { 
121       /* set the timeout */
122       tout.tv_sec = (unsigned long)(wakeup - now);
123       tout.tv_usec = ((wakeup -now) - ((unsigned long)(wakeup - now))) * 1000000;
124       p_tout = &tout;
125     } else if (timeout == 0) {
126       /* polling only */
127       tout.tv_sec = 0;
128       tout.tv_usec = 0;
129       p_tout = &tout;
130       /* we just do one loop around */
131       done = 0;
132     } else { 
133       /* no timeout: good luck! */
134       p_tout = NULL;
135     }
136      
137     DEBUG2("Selecting over %d socket(s); timeout=%f", max_fds-1,timeout);
138     ready = select(max_fds, &FDS, NULL, NULL, p_tout);
139     DEBUG1("select returned %d", ready);
140     if (ready == -1) {
141       switch (errno) {
142       case  EINTR: /* a signal we don't care about occured. we don't care */
143         /* if we cared, we would have set an handler */
144         continue;
145       case EINVAL: /* invalid value */
146         THROW3(system_error,EINVAL,"invalid select: nb fds: %d, timeout: %d.%d",
147                max_fds, (int)tout.tv_sec,(int) tout.tv_usec);
148       case ENOMEM: 
149         xbt_assert0(0,"Malloc error during the select");
150       default:
151         THROW2(system_error,errno,"Error during select: %s (%d)",
152                strerror(errno),errno);
153       }
154       THROW_IMPOSSIBLE;
155     } else if (ready == 0) {
156       continue;  /* this was a timeout */
157     }
158
159     xbt_dynar_foreach(sockets,cursor,sock_iter) {
160        if(!FD_ISSET(sock_iter->sd, &FDS)) { /* this socket is not ready */
161         continue;
162        }
163        
164        /* Got a socket to serve */
165        ready--;
166
167        if (   sock_iter->accepting
168            && sock_iter->plugin->socket_accept) { 
169          /* not a socket but an ear. accept on it and serve next socket */
170          gras_socket_t accepted=NULL;
171          
172          accepted = (sock_iter->plugin->socket_accept)(sock_iter);
173          DEBUG2("accepted=%p,&accepted=%p",accepted,&accepted);
174          accepted->meas = sock_iter->meas;
175
176        } else if (sock_iter->recv_ok) {
177          /* Make sure the socket is still alive by reading the first byte */
178          char lookahead;
179          int recvd;
180
181          recvd = recv(sock_iter->sd, &lookahead, 1, MSG_PEEK);
182          if (recvd < 0) {
183            WARN2("socket %d failed: %s", sock_iter->sd, strerror(errno));
184            /* done with this socket */
185            gras_socket_close(sock_iter);
186            cursor--;
187          } else if (recvd == 0) {
188            /* Connection reset (=closed) by peer. */
189            DEBUG1("Connection %d reset by peer", sock_iter->sd);
190            sock_iter->valid=0; /* don't close it. User may keep references to it */
191          } else { 
192            /* Got a suited socket ! */
193            XBT_OUT;
194            _gras_lastly_selected_socket = sock_iter;
195            return sock_iter;
196          }
197
198        } else {
199          /* This is a file socket. Cannot recv() on it, but it must be alive */
200            XBT_OUT;
201            _gras_lastly_selected_socket = sock_iter;
202            return sock_iter;
203        }
204
205        
206        /* if we're here, the socket we found wasn't really ready to be served */
207        if (ready == 0) /* exausted all sockets given by select. Request new ones */
208          break; 
209     }
210
211   }
212
213   /* No socket found. Maybe we had timeout=0 and nothing to do */
214   DEBUG0("TIMEOUT");
215   THROW0(timeout_error,0,"Timeout");
216 }
217
218 void gras_trp_sg_setup(gras_trp_plugin_t plug) {
219   THROW0(mismatch_error,0,"No SG transport on live platforms");
220 }
221