Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Use a queue to control the selectable sockets. We don't need anymore the mutex and...
authordonassbr <donassbr@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 12 Jul 2007 13:57:50 +0000 (13:57 +0000)
committerdonassbr <donassbr@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 12 Jul 2007 13:57:50 +0000 (13:57 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3750 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/gras/Msg/sg_msg.c
src/gras/Transport/sg_transport.c
src/gras/Transport/transport_interface.h
src/gras/Transport/transport_plugin_sg.c
src/gras/Virtu/sg_process.c

index 2a9c84a..0012b3e 100644 (file)
@@ -44,10 +44,6 @@ void gras_msg_send_ext(gras_socket_t   sock,
   xbt_assert1(!gras_socket_is_meas(sock), 
              "Asked to send a message on the measurement socket %p", sock);
        
   xbt_assert1(!gras_socket_is_meas(sock), 
              "Asked to send a message on the measurement socket %p", sock);
        
-       /* got the mutex my port */
-       DEBUG1("Sock port %d",sock->port);
-       SIMIX_mutex_lock(sock_data->mutex);
-
        /*initialize gras message */
        msg = xbt_new(s_gras_msg_t,1);
        msg->expe = sock;
        /*initialize gras message */
        msg = xbt_new(s_gras_msg_t,1);
        msg->expe = sock;
@@ -73,17 +69,17 @@ void gras_msg_send_ext(gras_socket_t   sock,
       whole_payload_size = gras_datadesc_copy(msgtype->ctn_type,
                                              payload, msg->payl);
   }
       whole_payload_size = gras_datadesc_copy(msgtype->ctn_type,
                                              payload, msg->payl);
   }
-  /* put message on msg_queue */
-  msg_remote_proc = (gras_msg_procdata_t)
-    gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process);
-  xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg);
-  
-  /* wake-up the receiver */
+
+       /* put the selectable socket on the queue */
   trp_remote_proc = (gras_trp_procdata_t)
     gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
 
   trp_remote_proc = (gras_trp_procdata_t)
     gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
 
-  xbt_fifo_push(trp_remote_proc->msg_selectable_sockets,sock);  
-  SIMIX_cond_signal(trp_remote_proc->msg_select_cond);
+  xbt_queue_push(trp_remote_proc->msg_selectable_sockets,&sock);  
+
+  /* put message on msg_queue */
+       msg_remote_proc = (gras_msg_procdata_t)
+               gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process);
+       xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg);
   
   /* wait for the receiver */
   SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
   
   /* wait for the receiver */
   SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
index 7299205..50b7dfe 100644 (file)
@@ -33,29 +33,27 @@ gras_socket_t gras_trp_select(double timeout) {
     (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
   gras_trp_sg_sock_data_t *sockdata;
   gras_trp_plugin_t trp;
     (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
   gras_trp_sg_sock_data_t *sockdata;
   gras_trp_plugin_t trp;
-  gras_socket_t active_socket;
+  gras_socket_t active_socket = NULL;
   gras_trp_sg_sock_data_t *active_socket_data;
   gras_socket_t sock_iter; /* iterating over all sockets */
   gras_trp_sg_sock_data_t *active_socket_data;
   gras_socket_t sock_iter; /* iterating over all sockets */
+       xbt_ex_t e;
   int cursor;
 
   int cursor;
 
-  DEBUG0("Trying to get the lock pd, trp_select");
-  SIMIX_mutex_lock(pd->msg_select_mutex);
   DEBUG3("select on %s@%s with timeout=%f",
         SIMIX_process_get_name(SIMIX_process_self()),
         SIMIX_host_get_name(SIMIX_host_self()),
         timeout);
 
   DEBUG3("select on %s@%s with timeout=%f",
         SIMIX_process_get_name(SIMIX_process_self()),
         SIMIX_host_get_name(SIMIX_host_self()),
         timeout);
 
-  if (xbt_fifo_size(pd->msg_selectable_sockets) == 0) {
-    /* message didn't arrive yet, wait */
-    SIMIX_cond_wait_timeout(pd->msg_select_cond,pd->msg_select_mutex,timeout);
-  }
-
-  if (xbt_fifo_size(pd->msg_selectable_sockets) == 0) {
+       TRY {
+               xbt_queue_shift_timed(pd->msg_selectable_sockets,
+                                                               &active_socket, timeout);
+       } CATCH(e) {
+               RETHROW;
+       }
+  if (active_socket == NULL) {
     DEBUG0("TIMEOUT");
     DEBUG0("TIMEOUT");
-    SIMIX_mutex_unlock(pd->msg_select_mutex);
     THROW0(timeout_error,0,"Timeout");
   }
     THROW0(timeout_error,0,"Timeout");
   }
-  active_socket = xbt_fifo_shift(pd->msg_selectable_sockets);
   active_socket_data = (gras_trp_sg_sock_data_t*)active_socket->data;
   
   /* Ok, got something. Open a socket back to the expeditor */
   active_socket_data = (gras_trp_sg_sock_data_t*)active_socket->data;
   
   /* Ok, got something. Open a socket back to the expeditor */
@@ -72,7 +70,6 @@ gras_socket_t gras_trp_select(double timeout) {
 
     if ( (sock_data->to_socket == active_socket) && 
         (sock_data->to_host == SIMIX_process_get_host(active_socket_data->from_process)) ) {
 
     if ( (sock_data->to_socket == active_socket) && 
         (sock_data->to_host == SIMIX_process_get_host(active_socket_data->from_process)) ) {
-      SIMIX_mutex_unlock(pd->msg_select_mutex);
       return sock_iter;
     }
   }
       return sock_iter;
     }
   }
@@ -121,7 +118,6 @@ gras_socket_t gras_trp_select(double timeout) {
         res->peer_port,
         SIMIX_process_get_name(sockdata->to_process), res->port);
 
         res->peer_port,
         SIMIX_process_get_name(sockdata->to_process), res->port);
 
-  SIMIX_mutex_unlock(pd->msg_select_mutex);
   return res;
 }
 
   return res;
 }
 
index 63547d2..fde430f 100644 (file)
@@ -14,6 +14,7 @@
 
 #include "portable.h" /* sometimes needed for fd_set */
 #include "simix/simix.h"
 
 #include "portable.h" /* sometimes needed for fd_set */
 #include "simix/simix.h"
+#include "xbt/queue.h"
 
 /***
  *** Options
 
 /***
  *** Options
@@ -109,16 +110,9 @@ typedef struct {
   /* SG only elements. In RL, they are part of the OS ;) */
    
   /* List of sockets ready to be select()ed */
   /* SG only elements. In RL, they are part of the OS ;) */
    
   /* List of sockets ready to be select()ed */
-  xbt_fifo_t msg_selectable_sockets; /* regular sockets  */
-  xbt_fifo_t meas_selectable_sockets;/* measurement ones */
-
-  /* Synchronisation on msg_selectable_sockets */
-  smx_cond_t msg_select_cond;
-  smx_mutex_t msg_select_mutex;
-  /* Synchronisation on meas_selectable_sockets */
-  smx_cond_t meas_select_cond;
-  smx_mutex_t meas_select_mutex;
-   
+  xbt_queue_t msg_selectable_sockets; /* regular sockets  */
+  xbt_queue_t meas_selectable_sockets;/* measurement ones */
+
 } s_gras_trp_procdata_t,*gras_trp_procdata_t;
 
 /* Display the content of our socket set (debugging purpose) */
 } s_gras_trp_procdata_t,*gras_trp_procdata_t;
 
 /* Display the content of our socket set (debugging purpose) */
index b46e05e..86eb934 100644 (file)
@@ -282,20 +282,17 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
     msg->payl = NULL;
   }
   
     msg->payl = NULL;
   }
   
-  /* put message on msg_queue */
-  msg_remote_proc = (gras_msg_procdata_t)
-    gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
 
 
-  xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas,msg);
-
-  /* put his socket on the selectable socket list */
+  /* put his socket on the selectable socket queue */
   trp_remote_proc = (gras_trp_procdata_t)
     gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
   trp_remote_proc = (gras_trp_procdata_t)
     gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
+  xbt_queue_push(trp_remote_proc->meas_selectable_sockets,&sock);      
 
 
-  xbt_fifo_push(trp_remote_proc->meas_selectable_sockets,sock);        
+  /* put message on msg_queue */
+  msg_remote_proc = (gras_msg_procdata_t)
+    gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
 
 
-  /* wake-up the receiver */
-  SIMIX_cond_signal(trp_remote_proc->meas_select_cond);
+  xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas,msg);
 
   /* wait for the receiver */
   SIMIX_cond_wait(sock_data->cond,sock_data->mutex);
 
   /* wait for the receiver */
   SIMIX_cond_wait(sock_data->cond,sock_data->mutex);
@@ -325,29 +322,28 @@ int gras_trp_sg_chunk_recv(gras_socket_t sock,
                           unsigned long int size){
   gras_trp_sg_sock_data_t *sock_data; 
   gras_trp_sg_sock_data_t *remote_sock_data; 
                           unsigned long int size){
   gras_trp_sg_sock_data_t *sock_data; 
   gras_trp_sg_sock_data_t *remote_sock_data; 
-  gras_socket_t remote_socket;
+  gras_socket_t remote_socket= NULL;
   gras_msg_t msg_got;
   gras_msg_procdata_t msg_procdata = 
     (gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
   gras_trp_procdata_t trp_proc =
     (gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
   gras_msg_t msg_got;
   gras_msg_procdata_t msg_procdata = 
     (gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
   gras_trp_procdata_t trp_proc =
     (gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
+       xbt_ex_t e;
 
   xbt_assert0(sock->meas, 
            "SG chunk exchange shouldn't be used on non-measurement sockets");
 
   xbt_assert0(sock->meas, 
            "SG chunk exchange shouldn't be used on non-measurement sockets");
-       
-  SIMIX_mutex_lock(trp_proc->meas_select_mutex);
-  if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) {
-    SIMIX_cond_wait_timeout(trp_proc->meas_select_cond,
-                           trp_proc->meas_select_mutex,60);
-  }
-  if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) {
-    SIMIX_mutex_unlock(trp_proc->meas_select_mutex);
+       TRY {
+               xbt_queue_shift_timed(trp_proc->meas_selectable_sockets,
+                                                                               &remote_socket, 60);
+       } CATCH(e) {
+               RETHROW;
+       }
+
+       if (remote_socket == NULL) {
     THROW0(timeout_error,0,"Timeout");
     THROW0(timeout_error,0,"Timeout");
-  }
-  SIMIX_mutex_unlock(trp_proc->meas_select_mutex);
-  
-  remote_socket = xbt_fifo_shift(trp_proc->meas_selectable_sockets);
-  remote_sock_data = (gras_trp_sg_sock_data_t *)remote_socket->data;
+       }
+
+       remote_sock_data = (gras_trp_sg_sock_data_t *)remote_socket->data;
   msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas);
   
   sock_data = (gras_trp_sg_sock_data_t *)sock->data;
   msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas);
   
   sock_data = (gras_trp_sg_sock_data_t *)sock->data;
index 48cd273..af417a4 100644 (file)
@@ -60,13 +60,9 @@ gras_process_init() {
   }
   else pd->ppid = -1; 
   
   }
   else pd->ppid = -1; 
   
-  trp_pd->msg_selectable_sockets = xbt_fifo_new();
-  trp_pd->msg_select_mutex = SIMIX_mutex_init();
-  trp_pd->msg_select_cond = SIMIX_cond_init();
+  trp_pd->msg_selectable_sockets = xbt_queue_new(1000,sizeof(gras_socket_t));
   
   
-  trp_pd->meas_selectable_sockets = xbt_fifo_new();
-  trp_pd->meas_select_mutex = SIMIX_mutex_init();
-  trp_pd->meas_select_cond = SIMIX_cond_init();
+  trp_pd->meas_selectable_sockets = xbt_queue_new(1000,sizeof(gras_socket_t));
   
   VERB2("Creating process '%s' (%d)",
        SIMIX_process_get_name(SIMIX_process_self()),
   
   VERB2("Creating process '%s' (%d)",
        SIMIX_process_get_name(SIMIX_process_self()),
@@ -88,13 +84,9 @@ gras_process_exit() {
   gras_trp_procdata_t trp_pd=
     (gras_trp_procdata_t)gras_libdata_by_name("gras_trp");
 
   gras_trp_procdata_t trp_pd=
     (gras_trp_procdata_t)gras_libdata_by_name("gras_trp");
 
-  SIMIX_mutex_destroy(trp_pd->msg_select_mutex);
-  SIMIX_cond_destroy(trp_pd->msg_select_cond);
-  xbt_fifo_free(trp_pd->msg_selectable_sockets);
+  xbt_queue_free(&trp_pd->msg_selectable_sockets);
 
 
-  SIMIX_mutex_destroy(trp_pd->meas_select_mutex);
-  SIMIX_cond_destroy(trp_pd->meas_select_cond);
-  xbt_fifo_free(trp_pd->meas_selectable_sockets);
+  xbt_queue_free(&trp_pd->meas_selectable_sockets);
 
 
   xbt_assert0(hd,"Run gras_process_init (ie, gras_init)!!");
 
 
   xbt_assert0(hd,"Run gras_process_init (ie, gras_init)!!");