Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Implement gras_socket_im_the_server() the CRUDE way. this lets pmm work on simulation...
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Fri, 26 Nov 2010 14:11:32 +0000 (14:11 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Fri, 26 Nov 2010 14:11:32 +0000 (14:11 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@8671 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/gras/Msg/gras_msg_listener.c
src/gras/Msg/sg_msg.c
src/gras/Transport/README
src/gras/Transport/transport_plugin_sg.c
src/gras/Virtu/virtu_sg.h
src/xbt/xbt_sg_synchro.c

index 9b0f67d..383ceba 100644 (file)
@@ -18,15 +18,17 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg_read, gras_msg,
 #include "gras/Transport/transport_interface.h" /* gras_select */
 
 typedef struct s_gras_msg_listener_ {
+  xbt_thread_t listener; /* keep this first, gras_socket_im_the_server() does funky transtyping in sg_msg.c */
   xbt_queue_t incomming_messages;       /* messages received from the wire and still to be used by master */
   xbt_queue_t socks_to_close;   /* let the listener close the sockets, since it may be selecting on them. Darwin don't like this trick */
   gras_socket_t wakeup_sock_listener_side;
   gras_socket_t wakeup_sock_master_side;
+  int port; /* The port on which the listener opened the command socket */
   xbt_mutex_t init_mutex;       /* both this mutex and condition are used at initialization to make sure that */
   xbt_cond_t init_cond;         /* the main thread speaks to the listener only once it is started (FIXME: It would be easier using a semaphore, if only semaphores were in xbt_synchro) */
-  xbt_thread_t listener;
 } s_gras_msg_listener_t;
 
+#include "gras/Virtu/virtu_private.h" /* gras_procdata_t */
 static void listener_function(void *p)
 {
   gras_msg_listener_t me = (gras_msg_listener_t) p;
@@ -37,8 +39,19 @@ static void listener_function(void *p)
   DEBUG0("I'm the listener");
 
   /* get a free socket for the receiving part of the listener */
-  me->wakeup_sock_listener_side =
-      gras_socket_server_range(5000, 6000, -1, 0);
+  me->wakeup_sock_listener_side =NULL;
+  for (me->port = 5000; me->port < 6000; me->port++) {
+    TRY {
+      me->wakeup_sock_listener_side = gras_socket_server_ext(me->port, -1, 0);
+    }
+    CATCH(e) {
+      if (me->port == 6000)
+        RETHROW;
+      xbt_ex_free(e);
+    }
+    if (me->wakeup_sock_listener_side)
+      break;
+  }
 
   /* wake up the launcher */
   xbt_mutex_acquire(me->init_mutex);
@@ -119,8 +132,7 @@ gras_msg_listener_t gras_msg_listener_launch(xbt_queue_t msg_received)
   /* Connect the other part of the socket */
   arg->wakeup_sock_master_side =
       gras_socket_client(gras_os_myname(),
-                         gras_socket_my_port
-                         (arg->wakeup_sock_listener_side));
+                         arg->port);
   return arg;
 }
 
index 59f8089..6416e38 100644 (file)
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_msg);
 
 typedef void *gras_trp_bufdata_;
+#include "simix/datatypes.h"
+#include "simix/private.h"
+
+/* Yeah, the following is awfull, breaking the encapsulation of at least 3 modules
+ * at the same time, but I'm tracking this bug since too long now, I want it dead. now.
+ * Sorry, Mt.
+ */
+typedef struct {
+  xbt_thread_t listener;
+} *fake_gras_msg_listener_t;
+typedef struct {
+  smx_process_t s_process;
+} *fake_xbt_thread_t;
+
+int gras_socket_im_the_server(gras_socket_t sock) {
+  gras_trp_sg_sock_data_t sock_data = sock->data;
+  gras_procdata_t* pd;
+  gras_msg_listener_t l;
+  xbt_thread_t listener_thread;
+  smx_process_t server_listener_process=NULL;
+  smx_process_t client_listener_process = NULL;
+
+
+  if (sock_data->server == SIMIX_process_self())
+    return 1;
+  if (sock_data->client == SIMIX_process_self())
+    return 0;
+
+  /* neither the client nor the server. Check their respective listeners */
+  pd = ((gras_procdata_t*)SIMIX_process_get_data(sock_data->server));
+  l = pd->listener;
+  if (l) {
+    listener_thread = ((fake_gras_msg_listener_t)l)->listener;
+    server_listener_process = ((fake_xbt_thread_t)listener_thread)->s_process;
+    if (server_listener_process == SIMIX_process_self())
+      return 1;
+  }
+
+  if (sock_data->client) {
+    pd = ((gras_procdata_t*)SIMIX_process_get_data(sock_data->client));
+    l = pd->listener;
+    if (l) {
+      listener_thread = ((fake_gras_msg_listener_t)l)->listener;
+      client_listener_process = ((fake_xbt_thread_t)listener_thread)->s_process;
+      if (client_listener_process == SIMIX_process_self())
+        return 0;
+    }
+  }
+  /* THAT'S BAD! I should be either client or server of the sockets I get messages on!! */
+  /* This is where the bug is visible. Try to die as loudly as possible */
+  xbt_backtrace_display_current();
+  ((char*)sock)[sizeof(*sock)+1] = '0'; /* Try to make valgrind angry to see where that damn socket comes from */
+  system(bprintf("cat /proc/%d/maps 1>&2",getpid()));
+  INFO0(bprintf("I'm not the client in socket %p (comm:%p, rdvser=%p, rdvcli=%p) to %s, that's %s",
+      sock,sock_data->comm_recv,sock_data->rdv_server,sock_data->rdv_client,
+      SIMIX_host_get_name(SIMIX_process_get_host(sock_data->server)),
+      sock_data->client?SIMIX_host_get_name(SIMIX_process_get_host(sock_data->client)):"(no client)"));
+  INFO7("server:%s (%p) server_listener=%p client:%s (%p) client_listener=%p, I'm %p",
+      SIMIX_host_get_name(SIMIX_process_get_host(sock_data->server)), sock_data->server,server_listener_process,
+      sock_data->client?SIMIX_host_get_name(SIMIX_process_get_host(sock_data->client)):"(no client)", sock_data->client,client_listener_process,
+          SIMIX_process_self());
+  xbt_die("Bailing out after finding that damn bug");
+
+}
 
 gras_msg_t gras_msg_recv_any(void)
 {
@@ -39,12 +103,11 @@ gras_msg_t gras_msg_recv_any(void)
     DEBUG5
         ("Consider socket %p (data:%p; Here rdv: %p; Remote rdv: %p; Comm %p) to get a message",
          sock, sock_data,
-         (sock_data->server ==
-          SIMIX_process_self())? sock_data->
-         rdv_server : sock_data->rdv_client,
-         (sock_data->server ==
-          SIMIX_process_self())? sock_data->
-         rdv_client : sock_data->rdv_server, sock_data->comm_recv);
+         gras_socket_im_the_server(sock)?
+             sock_data->rdv_server : sock_data->rdv_client,
+         gras_socket_im_the_server(sock)?
+             sock_data->rdv_client : sock_data->rdv_server,
+         sock_data->comm_recv);
 
 
     /* If the following assert fails in some valid conditions, we need to
@@ -79,14 +142,12 @@ gras_msg_t gras_msg_recv_any(void)
                 sock);
     /* End of paranoia */
 
-    VERB3("Consider receiving messages from on comm_recv %p rdv:%p (other rdv:%p)",
-          sock_data->comm_recv,
-          (sock_data->server ==
-           SIMIX_process_self())? sock_data->
-          rdv_server : sock_data->rdv_client,
-          (sock_data->server ==
-           SIMIX_process_self())? sock_data->
-          rdv_client : sock_data->rdv_server);
+    VERB4("Consider receiving messages from on comm_recv %p (%s) rdv:%p (other rdv:%p)",
+          sock_data->comm_recv, sock_data->comm_recv->type == comm_send? "send":"recv",
+          gras_socket_im_the_server(sock)?
+              sock_data->rdv_server : sock_data->rdv_client,
+          gras_socket_im_the_server(sock)?
+              sock_data->rdv_client : sock_data->rdv_server);
     xbt_dynar_push(comms, &(sock_data->comm_recv));
   }
   VERB1("Wait on %ld 'sockets'", xbt_dynar_length(comms));
@@ -110,9 +171,9 @@ gras_msg_t gras_msg_recv_any(void)
   }
   */
   sock_data->comm_recv =
-      SIMIX_network_irecv(sock_data->rdv_server != NULL ?
-                          sock_data->rdv_server
-                          : sock_data->rdv_client, NULL, 0);
+      SIMIX_network_irecv(gras_socket_im_the_server(sock) ?
+                          sock_data->rdv_server : sock_data->rdv_client,
+                          NULL, 0);
 
   return msg;
 }
@@ -130,9 +191,9 @@ void gras_msg_send_ext(gras_socket_t sock,
   gras_trp_sg_sock_data_t sock_data = (gras_trp_sg_sock_data_t) sock->data;
 
   smx_rdv_t target_rdv =
-      (sock_data->server ==
-       SIMIX_process_self())? sock_data->
-      rdv_client : sock_data->rdv_server;
+      (sock_data->server == SIMIX_process_self())?
+          sock_data->rdv_client :
+          sock_data->rdv_server;
 
   /*initialize gras message */
   msg = xbt_new(s_gras_msg_t, 1);
@@ -173,6 +234,6 @@ void gras_msg_send_ext(gras_socket_t sock,
   SIMIX_network_send(target_rdv, whole_payload_size, -1, -1, &msg,
                      sizeof(void *), &comm, msg);
 
-  VERB0("Message sent");
+  VERB0("Message sent (and received)");
 
 }
index e66f9b3..d6b3aac 100644 (file)
@@ -66,17 +66,5 @@ created by gras_socket_client() and friends. So, they are created on
 client side, but the master side will see it as message expeditor when
 getting a message.
 
-When sending, you can see if the current process is the server by
-checking if data_sock->server == SMX_process_self(). If wrong, that
-means that we are the client process today.
-
-When receiving this won't work because SMX_process_self() is the
-listener associated to the user thread. So, when receiving, you can
-see if you are on the server side or by checking if rdv_client==NULL.
-If not that means that we are on the client side today.
-
-That's messy, and should probably be reworked a bit, but I feel like
-the main issue is the interface used. It's too close and too differnt
-from BSD at the same time. One day, I hope to find the time to redo
-everything with an interface similar to the one of the 0MQ project for
-example.
\ No newline at end of file
+You can see which side of the socket you are with the
+gras_socket_im_the_server() function, which is designed for that.
index 6cd4c72..5f419a5 100644 (file)
@@ -75,34 +75,23 @@ static gras_sg_portrec_t find_port(gras_hostdata_t * hd, int port)
  ***/
 static int gras_trp_sg_my_port(gras_socket_t s) {
   gras_trp_sg_sock_data_t sockdata = s->data;
-  if (sockdata->rdv_client == NULL) /* Master socket, I'm server */
+  if (gras_socket_im_the_server(s))
     return sockdata->server_port;
   else
     return sockdata->client_port;
 }
 static int gras_trp_sg_peer_port(gras_socket_t s) {
   gras_trp_sg_sock_data_t sockdata = s->data;
-  if (sockdata->server == SIMIX_process_self())
+  if (gras_socket_im_the_server(s))
     return sockdata->client_port;
   else
     return sockdata->server_port;
 }
 static const char* gras_trp_sg_peer_name(gras_socket_t s) {
   gras_trp_sg_sock_data_t sockdata = s->data;
-  if (sockdata->server == SIMIX_process_self())
+  if (gras_socket_im_the_server(s))
     return SIMIX_host_get_name(SIMIX_process_get_host(sockdata->client));
   else {
-    if (sockdata->client!=SIMIX_process_self()) {
-      /* THAT'S BAD! I should be either client or server of the sockets I get messages on!! */
-      /* This is where the bug is visible. Try to die as loudly as possible */
-      xbt_backtrace_display_current();
-      ((char*)s)[sizeof(*s)+1] = '0'; /* Try to make valgrind angry to see where that damn socket comes from */
-      xbt_die(bprintf("I'm not the client in socket %p (comm:%p, rdvser=%p, rdvcli=%p) to %s, that's %s",
-          socket,sockdata->comm_recv,sockdata->rdv_server,sockdata->rdv_client,
-          SIMIX_host_get_name(SIMIX_process_get_host(sockdata->server)),
-          SIMIX_host_get_name(SIMIX_process_get_host(sockdata->client))));
-    }
-    xbt_assert(sockdata->client_port==gras_os_myport());
     return SIMIX_host_get_name(SIMIX_process_get_host(sockdata->server));
   }
 }
index 0180b9a..1ff489f 100644 (file)
@@ -43,6 +43,10 @@ typedef struct {
 } s_gras_trp_sg_sock_data_t, *gras_trp_sg_sock_data_t;
 
 
+/** \brief Returns if I am on the server side of this socket (either server or listener of server) */
+int gras_socket_im_the_server(gras_socket_t sock);
+
+
 void *gras_libdata_by_name_from_remote(const char *name, smx_process_t p);
 /* The same function by id would be really dangerous.
  * 
index 8816696..a46072e 100644 (file)
@@ -22,8 +22,8 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_sync, xbt,
 /* the implementation would be cleaner (and faster) with ELF symbol aliasing */
 
 typedef struct s_xbt_thread_ {
+  smx_process_t s_process; /* keep this first, gras_socket_im_the_server() does funky transtyping in sg_msg.c */
   char *name;
-  smx_process_t s_process;
   void_f_pvoid_t code;
   void *userparam;
   void *father_data;