Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Gras listener thread of each process do select(-1) instead of while(1) {select(0.5)}
[simgrid.git] / src / gras / Transport / transport.c
index 1df5295..d292390 100644 (file)
@@ -16,6 +16,7 @@ int gras_opt_trp_nomoredata_on_close=0;
 #include "xbt/peer.h"
 #include "portable.h"
 #include "gras/Transport/transport_private.h"
+#include "gras/Msg/msg_interface.h"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp,gras,"Conveying bytes over the network");
 XBT_LOG_NEW_SUBCATEGORY(gras_trp_meas,gras_trp,"Conveying bytes over the network without formating for perf measurements");
@@ -29,7 +30,7 @@ gras_trp_plugin_new(const char *name, gras_trp_setup_t setup) {
   xbt_ex_t e;
 
   gras_trp_plugin_t plug = xbt_new0(s_gras_trp_plugin_t, 1);
-  
+
   DEBUG1("Create plugin %s",name);
 
   plug->name=xbt_strdup(name);
@@ -59,20 +60,20 @@ void gras_trp_init(void){
 
 #ifdef HAVE_WINSOCK2_H
      /* initialize the windows mechanism */
-     {  
+     {
        WORD wVersionRequested;
        WSADATA wsaData;
-       
+
        wVersionRequested = MAKEWORD( 2, 0 );
        xbt_assert0(WSAStartup( wVersionRequested, &wsaData ) == 0,
                    "Cannot find a usable WinSock DLL");
-       
+
        /* Confirm that the WinSock DLL supports 2.0.*/
        /* Note that if the DLL supports versions greater    */
        /* than 2.0 in addition to 2.0, it will still return */
        /* 2.0 in wVersion since that is the version we      */
        /* requested.                                        */
-       
+
        xbt_assert0(LOBYTE( wsaData.wVersion ) == 2 &&
                    HIBYTE( wsaData.wVersion ) == 0,
                    "Cannot find a usable WinSock DLL");
@@ -85,13 +86,13 @@ void gras_trp_init(void){
        INFO0("Found and initialized winsock");
      }
 #endif
-   
+
      /* Add plugins */
      gras_trp_plugin_new("file",gras_trp_file_setup);
      gras_trp_plugin_new("sg",gras_trp_sg_setup);
      gras_trp_plugin_new("tcp", gras_trp_tcp_setup);
   }
-   
+
   _gras_trp_started++;
 }
 
@@ -101,7 +102,7 @@ gras_trp_exit(void){
    if (_gras_trp_started == 0) {
       return;
    }
-   
+
    if ( --_gras_trp_started == 0 ) {
 #ifdef HAVE_WINSOCK_H
       if ( WSACleanup() == SOCKET_ERROR ) {
@@ -165,26 +166,26 @@ void gras_trp_socket_new(int incoming,
 
   sock->data   = NULL;
   sock->bufdata = NULL;
-  
+
   *dst = sock;
 
   XBT_OUT;
 }
+
 /**
  * @brief Opens a server socket and makes it ready to be listened to.
  * @param port: port on which you want to listen
  * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
- * 
- * In real life, you'll get a TCP socket. 
+ *
+ * In real life, you'll get a TCP socket.
  */
 gras_socket_t
 gras_socket_server_ext(unsigned short port,
-                      
+
                       unsigned long int buf_size,
                       int measurement) {
+
   xbt_ex_t e;
   gras_trp_plugin_t trp;
   gras_socket_t sock;
@@ -206,7 +207,7 @@ gras_socket_server_ext(unsigned short port,
   TRY {
     trp->socket_server(trp, sock);
     DEBUG3("in=%c out=%c accept=%c",
-          sock->incoming?'y':'n', 
+          sock->incoming?'y':'n',
           sock->outgoing?'y':'n',
           sock->accepting?'y':'n');
   } CATCH(e) {
@@ -217,28 +218,30 @@ gras_socket_server_ext(unsigned short port,
 
   if (!measurement)
      ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport = port;
-  xbt_dynar_push(((gras_trp_procdata_t) 
+  xbt_dynar_push(((gras_trp_procdata_t)
                  gras_libdata_by_id(gras_trp_libdata_id))->sockets,&sock);
+
+  gras_msg_listener_awake();
   return sock;
 }
 /**
  * @brief Opens a server socket on any port in the given range
- * 
+ *
  * @param minport: first port we will try
  * @param maxport: last port we will try
  * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
- * 
+ *
  * If none of the provided ports works, raises the exception got when trying the last possibility
- */ 
+ */
 gras_socket_t
 gras_socket_server_range(unsigned short minport, unsigned short maxport,
                         unsigned long int buf_size, int measurement) {
-   
+
   int port;
   gras_socket_t res=NULL;
   xbt_ex_t e;
-  
+
   for (port=minport; port<maxport;port ++) {
     TRY {
       res=gras_socket_server_ext(port,buf_size,measurement);
@@ -252,23 +255,23 @@ gras_socket_server_range(unsigned short minport, unsigned short maxport,
   }
   THROW_IMPOSSIBLE;
 }
-   
+
 /**
  * @brief Opens a client socket to a remote host.
  * @param host: who you want to connect to
  * @param port: where you want to connect to on this host
  * @param buf_size: size of the buffer (in bytes) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
- * 
- * In real life, you'll get a TCP socket. 
+ *
+ * In real life, you'll get a TCP socket.
  */
 gras_socket_t
 gras_socket_client_ext(const char *host,
                       unsigned short port,
-                      
+
                       unsigned long int buf_size,
                       int measurement) {
+
   xbt_ex_t e;
   gras_trp_plugin_t trp;
   gras_socket_t sock;
@@ -288,22 +291,22 @@ gras_socket_client_ext(const char *host,
   TRY {
     (*trp->socket_client)(trp, sock);
     DEBUG3("in=%c out=%c accept=%c",
-          sock->incoming?'y':'n', 
+          sock->incoming?'y':'n',
           sock->outgoing?'y':'n',
           sock->accepting?'y':'n');
   } CATCH(e) {
      free(sock);
      RETHROW;
   }
-  xbt_dynar_push(((gras_trp_procdata_t) 
+  xbt_dynar_push(((gras_trp_procdata_t)
                  gras_libdata_by_id(gras_trp_libdata_id))->sockets,&sock);
+  gras_msg_listener_awake();
   return sock;
 }
 
 /**
- * gras_socket_server:
+ * @brief Opens a server socket and make it ready to be listened to.
  *
- * Opens a server socket and make it ready to be listened to.
  * In real life, you'll get a TCP socket.
  */
 gras_socket_t
@@ -338,12 +341,12 @@ void gras_socket_close(gras_socket_t sock) {
   if (sock == _gras_lastly_selected_socket) {
      xbt_assert0(!gras_opt_trp_nomoredata_on_close || !sock->moredata,
                 "Closing a socket having more data in buffer while the nomoredata_on_close option is activated");
-                
-     if (sock->moredata) 
+
+     if (sock->moredata)
        CRITICAL0("Closing a socket having more data in buffer. Option nomoredata_on_close disabled, so continuing.");
      _gras_lastly_selected_socket=NULL;
   }
-   
+
   /* FIXME: Issue an event when the socket is closed */
        DEBUG1("sockets pointer before %p",sockets);
   if (sock) {
@@ -354,7 +357,7 @@ void gras_socket_close(gras_socket_t sock) {
                        if (sock == sock_iter) {
                                DEBUG2("remove sock cursor %d dize %lu\n",cursor,xbt_dynar_length(sockets));
                                xbt_dynar_cursor_rm(sockets,&cursor);
-                               if (sock->plugin->socket_close) 
+                               if (sock->plugin->socket_close)
                                        (* sock->plugin->socket_close)(sock);
 
                                /* free the memory */
@@ -431,35 +434,35 @@ int gras_socket_is_meas(gras_socket_t sock) {
   return sock->meas;
 }
 
-/** \brief Send a chunk of (random) data over a measurement socket 
+/** \brief Send a chunk of (random) data over a measurement socket
  *
  * @param peer measurement socket to use for the experiment
  * @param timeout timeout (in seconds)
  * @param msg_size size of each chunk sent over the socket (in bytes).
- * @param msg_amount how many of these packets you want to send. 
+ * @param msg_amount how many of these packets you want to send.
+ *
+ * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
+ * each side of the socket should be paired.
  *
- * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on 
- * each side of the socket should be paired. 
- * 
  * The exchanged data is zeroed to make sure it's initialized, but
- * there is no way to control what is sent (ie, you cannot use these 
+ * there is no way to control what is sent (ie, you cannot use these
  * functions to exchange data out of band).
- * 
- * @warning: in SimGrid version 3.1 and previous, the numerical arguments 
- *           were the total amount of data to send and the msg_size. This 
- *           was changed for the fool wanting to send more than MAXINT 
- *           bytes in a fat pipe. 
+ *
+ * @warning: in SimGrid version 3.1 and previous, the numerical arguments
+ *           were the total amount of data to send and the msg_size. This
+ *           was changed for the fool wanting to send more than MAXINT
+ *           bytes in a fat pipe.
  */
-void gras_socket_meas_send(gras_socket_t peer, 
+void gras_socket_meas_send(gras_socket_t peer,
                           unsigned int timeout,
-                          unsigned long int msg_size, 
+                          unsigned long int msg_size,
                           unsigned long int msg_amount) {
   char *chunk=NULL;
   unsigned long int sent_sofar;
-  
+
   XBT_IN;
 
-  if (gras_if_RL()) 
+  if (gras_if_RL())
     chunk=xbt_malloc0(msg_size);
 
   xbt_assert0(peer->meas,"Asked to send measurement data on a regular socket");
@@ -474,34 +477,34 @@ void gras_socket_meas_send(gras_socket_t peer,
   CDEBUG5(gras_trp_meas,"Sent %lu msgs of %lu (size of each: %ld) to %s:%d",
          sent_sofar,msg_amount,msg_size,
          gras_socket_peer_name(peer), gras_socket_peer_port(peer));
-            
-  if (gras_if_RL()) 
+
+  if (gras_if_RL())
     free(chunk);
 
   XBT_OUT;
 }
 
-/** \brief Receive a chunk of data over a measurement socket 
+/** \brief Receive a chunk of data over a measurement socket
  *
- * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on 
- * each side of the socket should be paired. 
+ * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
+ * each side of the socket should be paired.
  *
- * @warning: in SimGrid version 3.1 and previous, the numerical arguments 
- *           were the total amount of data to send and the msg_size. This 
- *           was changed for the fool wanting to send more than MAXINT 
- *           bytes in a fat pipe. 
+ * @warning: in SimGrid version 3.1 and previous, the numerical arguments
+ *           were the total amount of data to send and the msg_size. This
+ *           was changed for the fool wanting to send more than MAXINT
+ *           bytes in a fat pipe.
  */
-void gras_socket_meas_recv(gras_socket_t peer, 
+void gras_socket_meas_recv(gras_socket_t peer,
                           unsigned int timeout,
-                          unsigned long int msg_size, 
+                          unsigned long int msg_size,
                           unsigned long int msg_amount){
-  
+
   char *chunk=NULL;
   unsigned long int got_sofar;
 
   XBT_IN;
 
-  if (gras_if_RL()) 
+  if (gras_if_RL())
     chunk = xbt_malloc(msg_size);
 
   xbt_assert0(peer->meas,
@@ -518,21 +521,21 @@ void gras_socket_meas_recv(gras_socket_t peer,
          got_sofar,msg_amount,msg_size,
          gras_socket_peer_name(peer), gras_socket_peer_port(peer));
 
-  if (gras_if_RL()) 
+  if (gras_if_RL())
     free(chunk);
   XBT_OUT;
 }
 
 /**
- * \brief Something similar to the good old accept system call. 
+ * \brief Something similar to the good old accept system call.
  *
  * Make sure that there is someone speaking to the provided server socket.
- * In RL, it does an accept(2) and return the result as last argument. 
+ * In RL, it does an accept(2) and return the result as last argument.
  * In SG, as accepts are useless, it returns the provided argument as result.
  * You should thus test whether (peer != accepted) before closing both of them.
  *
- * You should only call this on measurement sockets. It is automatically 
- * done for regular sockets, but you usually want more control about 
+ * You should only call this on measurement sockets. It is automatically
+ * done for regular sockets, but you usually want more control about
  * what's going on with measurement sockets.
  */
 gras_socket_t gras_socket_meas_accept(gras_socket_t peer){
@@ -552,7 +555,7 @@ gras_socket_t gras_socket_meas_accept(gras_socket_t peer){
   CDEBUG1(gras_trp_meas,"meas_accepted onto %d",res->sd);
 
   return res;
-} 
+}
 
 
 /*
@@ -560,12 +563,12 @@ gras_socket_t gras_socket_meas_accept(gras_socket_t peer){
  */
 static void *gras_trp_procdata_new(void) {
    gras_trp_procdata_t res = xbt_new(s_gras_trp_procdata_t,1);
-   
+
    res->name = xbt_strdup("gras_trp");
    res->name_len = 0;
    res->sockets = xbt_dynar_new_sync(sizeof(gras_socket_t*), NULL);
    res->myport = 0;
-   
+
    return (void*)res;
 }
 
@@ -574,14 +577,14 @@ static void *gras_trp_procdata_new(void) {
  */
 static void gras_trp_procdata_free(void *data) {
   gras_trp_procdata_t res = (gras_trp_procdata_t)data;
-  
+
   xbt_dynar_free(&( res->sockets ));
   free(res->name);
   free(res);
 }
 
 void gras_trp_socketset_dump(const char *name) {
-  gras_trp_procdata_t procdata = 
+  gras_trp_procdata_t procdata =
     (gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
 
   unsigned int it;