Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Do not use recv() but read() to check whether a socket returned by select() is valid...
[simgrid.git] / src / gras / Transport / rl_transport.c
index 8b7c9d7..254e187 100644 (file)
@@ -22,7 +22,6 @@ gras_socket_t _gras_lastly_selected_socket = NULL;
  *
  * if timeout<0, we ought to implement the adaptative timeout (FIXME)
  *
- * if timeout=0, do not wait for new message, only handle the ones already there.
  *
  * if timeout>0 and no message there, wait at most that amount of time before giving up.
  */
@@ -40,7 +39,7 @@ gras_socket_t gras_trp_select(double timeout) {
      with this tiny optimisation on BillWare */
   fd_set FDS;
   int ready; /* return of select: number of socket ready to be serviced */
-  int fd_setsize; /* FD_SETSIZE not always defined. Get this portably */
+  static int fd_setsize=-1; /* FD_SETSIZE not always defined. Get this portably */
 
   gras_socket_t sock_iter; /* iterating over all sockets */
   int cursor;              /* iterating over all sockets */
@@ -52,16 +51,18 @@ gras_socket_t gras_trp_select(double timeout) {
      return _gras_lastly_selected_socket;
   }
   
-  /* Compute FD_SETSIZE */
+  /* Compute FD_SETSIZE on need */
+  if (fd_setsize < 0) {        
 #ifdef HAVE_SYSCONF
-   fd_setsize = sysconf( _SC_OPEN_MAX );
+     fd_setsize = sysconf( _SC_OPEN_MAX );
 #else
 #  ifdef HAVE_GETDTABLESIZE 
-   fd_setsize = getdtablesize();
+     fd_setsize = getdtablesize();
 #  else
-   fd_setsize = FD_SETSIZE;
+     fd_setsize = FD_SETSIZE;
 #  endif /* !USE_SYSCONF */
 #endif
+  }
 
   while (done == -1) {
     if (timeout > 0) { /* did we timeout already? */
@@ -87,12 +88,16 @@ gras_socket_t gras_trp_select(double timeout) {
 #ifndef HAVE_WINSOCK_H
        if (max_fds < sock_iter->sd)
          max_fds = sock_iter->sd;
+#else
+      max_fds = 0;
+
 #endif
        FD_SET(sock_iter->sd, &FDS);
       } else {
        DEBUG1("Not considering socket %d for select",sock_iter->sd);
       }
     }
+     
 
     if (max_fds == -1) {
        if (timeout > 0) {
@@ -146,7 +151,7 @@ gras_socket_t gras_trp_select(double timeout) {
        THROW3(system_error,EINVAL,"invalid select: nb fds: %d, timeout: %d.%d",
               max_fds, (int)tout.tv_sec,(int) tout.tv_usec);
       case ENOMEM: 
-       xbt_assert0(0,"Malloc error during the select");
+       xbt_die("Malloc error during the select");
       default:
        THROW2(system_error,errno,"Error during select: %s (%d)",
               strerror(errno),errno);
@@ -169,21 +174,25 @@ gras_socket_t gras_trp_select(double timeout) {
         /* not a socket but an ear. accept on it and serve next socket */
         gras_socket_t accepted=NULL;
         
+        /* release mutex before accept; it will change the sockets dynar, so we have to break the foreach asap */
+        xbt_dynar_cursor_unlock(sockets);
         accepted = (sock_iter->plugin->socket_accept)(sock_iter);
+
         DEBUG2("accepted=%p,&accepted=%p",accepted,&accepted);
         accepted->meas = sock_iter->meas;
+        break;
 
-       } else if (sock_iter->recv_ok) {
+       } else {
         /* Make sure the socket is still alive by reading the first byte */
-        char lookahead;
         int recvd;
 
-        recvd = recv(sock_iter->sd, &lookahead, 1, MSG_PEEK);
+        recvd = read(sock_iter->sd, &sock_iter->recvd_val, 1);
         if (recvd < 0) {
           WARN2("socket %d failed: %s", sock_iter->sd, strerror(errno));
-          /* done with this socket */
+          /* done with this socket; remove it and break the foreach since it will change the dynar */
+          xbt_dynar_cursor_unlock(sockets);
           gras_socket_close(sock_iter);
-          cursor--;
+          break;
         } else if (recvd == 0) {
           /* Connection reset (=closed) by peer. */
           DEBUG1("Connection %d reset by peer", sock_iter->sd);
@@ -191,21 +200,20 @@ gras_socket_t gras_trp_select(double timeout) {
         } else { 
           /* Got a suited socket ! */
           XBT_OUT;
+          sock_iter->recvd = 1;
           _gras_lastly_selected_socket = sock_iter;
+          /* break sync dynar iteration */
+          xbt_dynar_cursor_unlock(sockets);
           return sock_iter;
         }
-
-       } else {
-        /* This is a file socket. Cannot recv() on it, but it must be alive */
-          XBT_OUT;
-          _gras_lastly_selected_socket = sock_iter;
-          return sock_iter;
        }
 
-       
        /* if we're here, the socket we found wasn't really ready to be served */
-       if (ready == 0) /* exausted all sockets given by select. Request new ones */
-        break; 
+       if (ready == 0) { /* exausted all sockets given by select. Request new ones */
+
+         xbt_dynar_cursor_unlock(sockets);
+         break; 
+       }
     }
 
   }