Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
try to port the gras simulation side to the new smx_network infrastructure (not yet...
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 10 Nov 2009 12:28:14 +0000 (12:28 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 10 Nov 2009 12:28:14 +0000 (12:28 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6835 48e7efb5-ca39-0410-a469-dd3cf9ba447f

20 files changed:
include/gras/messages.h
src/gras/Msg/gras_msg_listener.c
src/gras/Msg/msg_interface.h
src/gras/Msg/sg_msg.c
src/gras/Transport/rl_transport.c
src/gras/Transport/sg_transport.c
src/gras/Transport/transport.c
src/gras/Transport/transport_interface.h
src/gras/Transport/transport_plugin_file.c
src/gras/Transport/transport_plugin_sg.c
src/gras/Transport/transport_plugin_tcp.c
src/gras/Transport/transport_private.h
src/gras/Virtu/process.c
src/gras/Virtu/sg_process.c
src/gras/Virtu/virtu_private.h
src/gras/Virtu/virtu_sg.h
src/include/simix/simix.h
src/msg/msg_mailbox.c
src/simix/smx_global.c
src/simix/smx_network.c

index e748515..c522c53 100644 (file)
@@ -281,6 +281,7 @@ XBT_PUBLIC(void) gras_msg_rpcreturn(double timeOut, gras_msg_cb_ctx_t ctx,
        gras_msgtype_t type;
        unsigned long int ID;
        void *payl;
+       void *comm; /* simix_comm in SG */
        int payl_size;
      } s_gras_msg_t, *gras_msg_t;
 
index b583d58..c66c18a 100644 (file)
@@ -26,6 +26,14 @@ typedef struct s_gras_msg_listener_ {
   xbt_thread_t listener;
 } s_gras_msg_listener_t;
 
+static void do_close_socket(gras_socket_t sock) {
+  if (sock->plugin->socket_close)
+     (*sock->plugin->socket_close) (sock);
+   /* free the memory */
+   if (sock->peer_name)
+     free(sock->peer_name);
+   free(sock);
+}
 static void listener_function(void *p)
 {
   gras_msg_listener_t me = (gras_msg_listener_t) p;
@@ -56,12 +64,9 @@ static void listener_function(void *p)
     /* empty the list of sockets to trash */
     TRY {
       while (1) {
-        int sock;
+        gras_socket_t sock;
         xbt_queue_shift_timed(me->socks_to_close, &sock, 0);
-        if (tcp_close(sock) < 0) {
-          WARN3("error while closing tcp socket %d: %d (%s)\n",
-                sock, sock_errno, sock_errstr(sock_errno));
-        }
+        do_close_socket(sock);
       }
     }
     CATCH(e) {
@@ -134,14 +139,14 @@ void gras_msg_listener_awake()
   }
 }
 
-void gras_msg_listener_close_socket(int sd)
+void gras_msg_listener_close_socket(gras_socket_t sock)
 {
   gras_procdata_t *pd = gras_procdata_get();
   if (pd->listener) {
-    xbt_queue_push(pd->listener->socks_to_close, &sd);
+    xbt_queue_push(pd->listener->socks_to_close, &sock);
     gras_msg_listener_awake();
   } else {
     /* do it myself */
-    tcp_close(sd);
+    do_close_socket(sock);
   }
 }
index 0bad57a..27c2c70 100644 (file)
@@ -49,7 +49,7 @@ typedef struct {
 void gras_msg_send_namev(gras_socket_t sock,
                          const char *namev, void *payload);
 void gras_msg_listener_awake(void);
-void gras_msg_listener_close_socket(int sd);
+void gras_msg_listener_close_socket(gras_socket_t sock);
 
 #define GRAS_PROTOCOL_VERSION '\1';
 
index 1afb8aa..131ee9f 100644 (file)
@@ -27,20 +27,13 @@ void gras_msg_send_ext(gras_socket_t sock,
                        unsigned long int ID,
                        gras_msgtype_t msgtype, void *payload)
 {
-
-  smx_action_t act;             /* simix action */
   gras_trp_sg_sock_data_t *sock_data;
-  gras_hostdata_t *hd;
-  gras_trp_procdata_t trp_remote_proc;
-  gras_msg_procdata_t msg_remote_proc;
   gras_msg_t msg;               /* message to send */
   int whole_payload_size = 0;   /* msg->payload_size is used to memcpy the payload.
                                    This is used to report the load onto the simulator. It also counts the size of pointed stuff */
 
   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
 
-  hd = (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
-
   xbt_assert1(!gras_socket_is_meas(sock),
               "Asked to send a message on the measurement socket %p", sock);
 
@@ -51,7 +44,7 @@ void gras_msg_send_ext(gras_socket_t sock,
   msg->type = msgtype;
   msg->ID = ID;
   if (kind == e_gras_msg_kind_rpcerror) {
-    /* error on remote host, carfull, payload is an exception */
+    /* error on remote host, careful, payload is an exception */
     msg->payl_size = gras_datadesc_size(gras_datadesc_by_name("ex_t"));
     msg->payl = xbt_malloc(msg->payl_size);
     whole_payload_size = gras_datadesc_memcpy(gras_datadesc_by_name("ex_t"),
@@ -74,56 +67,23 @@ void gras_msg_send_ext(gras_socket_t sock,
                                                 payload, msg->payl);
   }
 
-  /* put the selectable socket on the queue */
-  trp_remote_proc = (gras_trp_procdata_t)
-    gras_libdata_by_name_from_remote("gras_trp", sock_data->to_process);
-
-  xbt_queue_push(trp_remote_proc->msg_selectable_sockets, &sock);
-
-  /* put message on msg_queue */
-  msg_remote_proc = (gras_msg_procdata_t)
-    gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
-  xbt_fifo_push(msg_remote_proc->msg_to_receive_queue, msg);
-
-  /* wait for the receiver */
-  SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
 
-  /* creates simix action and waits its ends, waits in the sender host
-     condition */
-  act = SIMIX_action_communicate(SIMIX_host_self(),
-                                 sock_data->to_host, msgtype->name,
-                                 (double) whole_payload_size, -1);
-  SIMIX_register_action_to_condition(act, sock_data->cond);
 
   VERB5("Sending to %s(%s) a message type '%s' kind '%s' ID %lu",
-        SIMIX_host_get_name(sock_data->to_host),
-        SIMIX_process_get_name(sock_data->to_process),
+        sock->peer_name,sock->peer_proc,
         msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID);
-
-  SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
-  SIMIX_unregister_action_to_condition(act, sock_data->cond);
-  /* error treatmeant (FIXME) */
-
-  /* cleanup structures */
-  SIMIX_action_destroy(act);
-  SIMIX_mutex_unlock(sock_data->mutex);
+  SIMIX_network_send(sock_data->rdv,whole_payload_size,-1.,-1.,msg,sizeof(s_gras_msg_t),(smx_comm_t*)&(msg->comm),&msg);
 
   VERB0("Message sent");
-
 }
 
 /*
  * receive the next message on the given socket.
  */
-void gras_msg_recv(gras_socket_t sock, gras_msg_t msg)
-{
-
+void gras_msg_recv(gras_socket_t sock, gras_msg_t msg) {
+  gras_trp_procdata_t pd =
+    (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
   gras_trp_sg_sock_data_t *sock_data;
-  gras_trp_sg_sock_data_t *remote_sock_data;
-  gras_hostdata_t *remote_hd;
-  gras_msg_t msg_got;
-  gras_msg_procdata_t msg_procdata =
-    (gras_msg_procdata_t) gras_libdata_by_name("gras_msg");
 
   xbt_assert1(!gras_socket_is_meas(sock),
               "Asked to receive a message on the measurement socket %p",
@@ -132,11 +92,28 @@ void gras_msg_recv(gras_socket_t sock, gras_msg_t msg)
   xbt_assert0(msg, "msg is an out parameter of gras_msg_recv...");
 
   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
+
+  /* The message was already received while emulating the select, so simply copy it here */
+  memcpy(msg,&(sock_data->ongoing_msg),sizeof(s_gras_msg_t));
+  msg->expe = sock;
+  VERB1("Using %p as a msg",&(sock_data->ongoing_msg));
+  VERB5("Received a message type '%s' kind '%s' ID %lu from %s(%s)",
+        msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID,
+        sock->peer_name,sock->peer_proc);
+
+  /* Recreate another comm object to replace the one which just terminated */
+  int rank = xbt_dynar_search(pd->sockets,&sock);
+  xbt_assert0(rank>=0,"Socket not found in my array");
+  sock_data->ongoing_msg_size = sizeof(s_gras_msg_t);
+  smx_comm_t comm = SIMIX_network_irecv(sock_data->rdv,&(sock_data->ongoing_msg),&(sock_data->ongoing_msg_size));
+  xbt_dynar_set(pd->comms,rank,&comm);
+
+#if 0 /* KILLME */
+  SIMIX_network_recv(sock_data->rdv,-1.,&msg_got,NULL,&comm);
+
+
   remote_sock_data =
     ((gras_trp_sg_sock_data_t *) sock->data)->to_socket->data;
-  DEBUG3("Remote host %s, Remote Port: %d Local port %d",
-         SIMIX_host_get_name(sock_data->to_host), sock->peer_port,
-         sock->port);
   remote_hd = (gras_hostdata_t *) SIMIX_host_get_data(sock_data->to_host);
 
   if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0) {
@@ -151,13 +128,13 @@ void gras_msg_recv(gras_socket_t sock, gras_msg_t msg)
   SIMIX_cond_signal(remote_sock_data->cond);
 
   /* wait for communication end */
+  INFO2("Wait communication (from %s) termination on %p",sock->peer_name,sock_data->cond);
+
   SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex);
 
   msg_got->expe = msg->expe;
   memcpy(msg, msg_got, sizeof(s_gras_msg_t));
   xbt_free(msg_got);
   SIMIX_mutex_unlock(remote_sock_data->mutex);
-
-  VERB3("Received a message type '%s' kind '%s' ID %lu",        // from %s",
-        msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID);
+#endif
 }
index 395137f..a8c2a70 100644 (file)
@@ -175,7 +175,7 @@ gras_socket_t gras_trp_select(double timeout)
       /* Got a socket to serve */
       ready--;
 
-      if (sock_iter->accepting && sock_iter->plugin->socket_accept) {
+      if (sock_iter->is_master && sock_iter->plugin->socket_accept) {
         /* not a socket but an ear. accept on it and serve next socket */
         gras_socket_t accepted = NULL;
 
index f9bfbb7..96e5042 100644 (file)
@@ -10,6 +10,7 @@
 #include "xbt/ex.h"
 #include "gras/Transport/transport_private.h"
 #include "gras/Virtu/virtu_sg.h"
+#include "simix/private.h"
 
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_trp);
 
@@ -27,8 +28,61 @@ gras_socket_t _gras_lastly_selected_socket = NULL;
  *
  * if timeout>0 and no message there, wait at most that amount of time before giving up.
  */
-gras_socket_t gras_trp_select(double timeout)
-{
+gras_socket_t gras_trp_select(double timeout) {
+  static int warned=0;
+  if (timeout>=0 && !warned) {
+    warned=1;
+    WARN0("Timed select not implemented in SG. Switching to blocking select");
+  }
+  gras_trp_procdata_t pd =
+    (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
+
+  gras_socket_t sock_iter, active_socket;
+  gras_trp_sg_sock_data_t *active_socket_data;
+  smx_comm_t comm;
+  unsigned int cursor;
+
+
+  /* FIXME: make sure that the ongoing comm is canceled&destroyed when the corresponding socket is closed */
+  xbt_assert(xbt_dynar_length(pd->sockets)==xbt_dynar_length(pd->comms));
+
+  /* Wait for the first terminating comm object */
+  int rank = SIMIX_network_waitany(pd->comms);
+
+  /* Don't wait on this socket until the comm object is recreated by gras_msg_recv */
+  comm = NULL;
+  xbt_dynar_set(pd->comms,rank,&comm);
+
+  /* Ok, got something. Open a socket back to the expeditor */
+  active_socket = xbt_dynar_get_as(pd->sockets,rank,gras_socket_t);
+  active_socket_data = (gras_trp_sg_sock_data_t *) active_socket->data;
+
+  /* Try to reuse an already opened socket to that expeditor */
+  DEBUG1("Open sockets size %lu", xbt_dynar_length(pd->sockets));
+  xbt_dynar_foreach(pd->sockets, cursor, sock_iter) {
+    gras_trp_sg_sock_data_t *sock_data;
+    DEBUG1("Consider %p as outgoing socket to expeditor", sock_iter);
+
+    if (sock_iter->meas || !sock_iter->outgoing)
+      continue;
+    sock_data = ((gras_trp_sg_sock_data_t *) sock_iter->data);
+
+    if ((sock_data->to_socket == active_socket) &&
+        (sock_data->to_host ==
+         SIMIX_process_get_host(active_socket_data->from_process))) {
+      xbt_dynar_cursor_unlock(pd->sockets);
+      return sock_iter;
+    }
+  }
+
+  /* Socket to expeditor not created yet */
+  DEBUG0("Create a socket to the expeditor");
+
+
+
+  return sock_iter;
+
+#if 0 /* KILLME */
   gras_socket_t res;
   gras_trp_procdata_t pd =
     (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
@@ -85,7 +139,7 @@ gras_socket_t gras_trp_select(double timeout)
 
   res->incoming = 1;
   res->outgoing = 1;
-  res->accepting = 0;
+  res->is_master = 0;
   res->sd = -1;
 
   res->port = -1;
@@ -123,6 +177,7 @@ gras_socket_t gras_trp_select(double timeout)
          SIMIX_process_get_name(sockdata->to_process), res->port);
 
   return res;
+#endif
 }
 
 
index dd976c4..f36f83d 100644 (file)
@@ -157,7 +157,7 @@ void gras_trp_socket_new(int incoming, gras_socket_t * dst)
 
   sock->incoming = incoming ? 1 : 0;
   sock->outgoing = incoming ? 0 : 1;
-  sock->accepting = incoming ? 1 : 0;
+  sock->is_master = incoming ? 1 : 0;
   sock->meas = 0;
   sock->recvd = 0;
   sock->valid = 1;
@@ -211,7 +211,7 @@ gras_socket_server_ext(unsigned short port,
     trp->socket_server(trp, sock);
     DEBUG3("in=%c out=%c accept=%c",
            sock->incoming ? 'y' : 'n',
-           sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
+           sock->outgoing ? 'y' : 'n', sock->is_master ? 'y' : 'n');
   } CATCH(e) {
 
     free(sock);
@@ -297,7 +297,7 @@ gras_socket_client_ext(const char *host,
     (*trp->socket_client) (trp, sock);
     DEBUG3("in=%c out=%c accept=%c",
            sock->incoming ? 'y' : 'n',
-           sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
+           sock->outgoing ? 'y' : 'n', sock->is_master ? 'y' : 'n');
   } CATCH(e) {
     free(sock);
     RETHROW;
@@ -340,7 +340,7 @@ void gras_socket_close_voidp(void *sock) {
 /** \brief Close socket */
 void gras_socket_close(gras_socket_t sock)
 {
-  xbt_dynar_t sockets =
+  xbt_dynar_t my_sockets =
     ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets;
   gras_socket_t sock_iter = NULL;
   unsigned int cursor;
@@ -358,23 +358,17 @@ void gras_socket_close(gras_socket_t sock)
   }
 
   /* FIXME: Issue an event when the socket is closed */
-  DEBUG1("sockets pointer before %p", sockets);
+  DEBUG1("sockets pointer before %p", my_sockets);
   if (sock) {
     /* FIXME: Cannot get the dynar mutex, because it can be already locked */
 //              _xbt_dynar_foreach(sockets,cursor,sock_iter) {
-    for (cursor = 0; cursor < xbt_dynar_length(sockets); cursor++) {
-      _xbt_dynar_cursor_get(sockets, cursor, &sock_iter);
+    for (cursor = 0; cursor < xbt_dynar_length(my_sockets); cursor++) {
+      _xbt_dynar_cursor_get(my_sockets, cursor, &sock_iter);
       if (sock == sock_iter) {
         DEBUG2("remove sock cursor %d dize %lu\n", cursor,
-               xbt_dynar_length(sockets));
-        xbt_dynar_cursor_rm(sockets, &cursor);
-        if (sock->plugin->socket_close)
-          (*sock->plugin->socket_close) (sock);
-
-        /* free the memory */
-        if (sock->peer_name)
-          free(sock->peer_name);
-        free(sock);
+               xbt_dynar_length(my_sockets));
+        xbt_dynar_cursor_rm(my_sockets, &cursor);
+        gras_msg_listener_close_socket(sock);
         XBT_OUT;
         return;
       }
@@ -574,7 +568,7 @@ gras_socket_t gras_socket_meas_accept(gras_socket_t peer)
   xbt_assert0(peer->meas,
               "No need to accept on non-measurement sockets (it's automatic)");
 
-  if (!peer->accepting) {
+  if (!peer->is_master) {
     /* nothing to accept here (must be in SG) */
     /* BUG: FIXME: this is BAD! it makes tricky to free the accepted socket */
     return peer;
@@ -598,6 +592,7 @@ static void *gras_trp_procdata_new(void)
   res->name = xbt_strdup("gras_trp");
   res->name_len = 0;
   res->sockets = xbt_dynar_new_sync(sizeof(gras_socket_t *), NULL);
+  res->comms = xbt_dynar_new(sizeof(void*),NULL); /* stores some smx_comm_t in SG (not used in RL) */
   res->myport = 0;
 
   return (void *) res;
@@ -611,6 +606,7 @@ static void gras_trp_procdata_free(void *data)
   gras_trp_procdata_t res = (gras_trp_procdata_t) data;
 
   xbt_dynar_free(&(res->sockets));
+  xbt_dynar_free(&(res->comms));
   free(res->name);
   free(res);
 }
index 6b84713..21c52d7 100644 (file)
@@ -99,12 +99,8 @@ XBT_PUBLIC(gras_trp_plugin_t)
        int myport;              /* Port on which I listen myself */
 
        xbt_dynar_t sockets;     /* all sockets known to this process */
-
-       /* SG only elements. In RL, they are part of the OS ;) */
-
-       /* List of sockets ready to be select()ed */
-       xbt_queue_t msg_selectable_sockets;      /* regular sockets  */
-       xbt_queue_t meas_selectable_sockets;     /* measurement ones */
+       xbt_dynar_t comms; /* SG cruft: the ongoing communications */
+       xbt_dynar_t sockets_to_close; /* The listener is in charge of closing the sockets */
 
      } s_gras_trp_procdata_t, *gras_trp_procdata_t;
 
index c8a13d0..ca8aadf 100644 (file)
@@ -98,7 +98,7 @@ gras_socket_t gras_socket_client_from_file(const char *path)
          path,
          res->sd,
          res->incoming ? 'y' : 'n',
-         res->outgoing ? 'y' : 'n', res->accepting ? 'y' : 'n');
+         res->outgoing ? 'y' : 'n', res->is_master ? 'y' : 'n');
 
   xbt_dynar_push(((gras_trp_procdata_t)
                   gras_libdata_by_id(gras_trp_libdata_id))->sockets, &res);
@@ -138,7 +138,7 @@ gras_socket_t gras_socket_server_from_file(const char *path)
   DEBUG4("sd=%d in=%c out=%c accept=%c",
          res->sd,
          res->incoming ? 'y' : 'n',
-         res->outgoing ? 'y' : 'n', res->accepting ? 'y' : 'n');
+         res->outgoing ? 'y' : 'n', res->is_master ? 'y' : 'n');
 
   xbt_dynar_push(((gras_trp_procdata_t)
                   gras_libdata_by_id(gras_trp_libdata_id))->sockets, &res);
index 9ebfd79..a80c331 100644 (file)
@@ -24,10 +24,6 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_sg, gras_trp,
  *** Prototypes 
  ***/
 
-/* retrieve the port record associated to a numerical port on an host */
-static void find_port(gras_hostdata_t * hd, int port,
-                      gras_sg_portrec_t * hpd);
-
 
 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
                                /* OUT */ gras_socket_t sock);
@@ -48,36 +44,27 @@ int gras_trp_sg_chunk_recv(gras_socket_t sd,
  *** Specific plugin part
  ***/
 typedef struct {
-  int placeholder;              /* nothing plugin specific so far */
-} gras_trp_sg_plug_data_t;
+  xbt_dict_t sockets; /* all known sockets */
+} s_gras_trp_sg_plug_data_t,*gras_trp_sg_plug_data_t;
 
 
 /***
  *** Code
  ***/
-static void find_port(gras_hostdata_t * hd, int port, gras_sg_portrec_t * hpd)
-{
-  unsigned int cpt;
-  gras_sg_portrec_t pr;
 
-  xbt_assert0(hd, "Please run gras_process_init on each process");
-
-  xbt_dynar_foreach(hd->ports, cpt, pr) {
-    if (pr.port == port) {
-      memcpy(hpd, &pr, sizeof(gras_sg_portrec_t));
-      return;
-    }
-  }
-  THROW1(mismatch_error, 0, "Unable to find any portrec for port #%d", port);
+static void gras_trp_sg_exit(gras_trp_plugin_t plug){
+  gras_trp_sg_plug_data_t mydata = (gras_trp_sg_plug_data_t) plug->data;
+  xbt_dict_free(&(mydata->sockets));
+  xbt_free(plug->data);
 }
-
-
 void gras_trp_sg_setup(gras_trp_plugin_t plug)
 {
-
-  gras_trp_sg_plug_data_t *data = xbt_new(gras_trp_sg_plug_data_t, 1);
+  gras_trp_sg_plug_data_t data = xbt_new(s_gras_trp_sg_plug_data_t, 1);
 
   plug->data = data;
+  data->sockets = xbt_dict_new();
+
+  plug->exit = gras_trp_sg_exit;
 
   plug->socket_client = gras_trp_sg_socket_client;
   plug->socket_server = gras_trp_sg_socket_server;
@@ -91,155 +78,120 @@ void gras_trp_sg_setup(gras_trp_plugin_t plug)
 }
 
 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
-                               /* OUT */ gras_socket_t sock)
-{
-  xbt_ex_t e;
+                               /* OUT */ gras_socket_t sock) {
 
   smx_host_t peer;
-  gras_hostdata_t *hd;
   gras_trp_sg_sock_data_t *data;
-  gras_sg_portrec_t pr;
+  gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
+
 
-  /* make sure this socket will reach someone */
   if (!(peer = SIMIX_host_get_by_name(sock->peer_name)))
     THROW1(mismatch_error, 0,
-           "Can't connect to %s: no such host.\n", sock->peer_name);
+           "Can't connect to %s: no such host", sock->peer_name);
 
-  if (!(hd = (gras_hostdata_t *) SIMIX_host_get_data(peer)))
-    THROW1(mismatch_error, 0,
-           "can't connect to %s: no process on this host", sock->peer_name);
+  /* make sure this socket will reach someone */
+  xbt_dict_t all_sockets = ((gras_trp_sg_plug_data_t)self->data)->sockets;
+  char *sock_name=bprintf("%s:%d",sock->peer_name,sock->peer_port);
+  gras_socket_t server = xbt_dict_get_or_null(all_sockets,sock_name);
+  free(sock_name);
 
-  TRY {
-    find_port(hd, sock->peer_port, &pr);
-  }
-  CATCH(e) {
-    if (e.category == mismatch_error) {
-      xbt_ex_free(e);
-      THROW2(mismatch_error, 0,
-             "can't connect to %s:%d, no process listen on this port",
-             sock->peer_name, sock->peer_port);
-    }
-    RETHROW;
-  }
+  if (!server)
+    THROW2(mismatch_error, 0,
+           "can't connect to %s:%d, no process listen on this port",
+           sock->peer_name, sock->peer_port);
 
-  if (pr.meas && !sock->meas) {
+  if (server->meas && !sock->meas) {
     THROW2(mismatch_error, 0,
            "can't connect to %s:%d in regular mode, the process listen "
            "in measurement mode on this port", sock->peer_name,
            sock->peer_port);
   }
-  if (!pr.meas && sock->meas) {
+  if (!server->meas && sock->meas) {
     THROW2(mismatch_error, 0,
            "can't connect to %s:%d in measurement mode, the process listen "
            "in regular mode on this port", sock->peer_name, sock->peer_port);
   }
   /* create the socket */
-  data = xbt_new(gras_trp_sg_sock_data_t, 1);
+  data = xbt_new0(gras_trp_sg_sock_data_t, 1);
+  data->rdv = ((gras_trp_sg_sock_data_t *)server->data)->rdv;
   data->from_process = SIMIX_process_self();
-  data->to_process = pr.process;
-  data->to_host = peer;
-
-  /* initialize mutex and condition of the socket */
-  data->mutex = SIMIX_mutex_init();
-  data->cond = SIMIX_cond_init();
-  data->to_socket = pr.socket;
 
   sock->data = data;
   sock->incoming = 1;
 
+  /* Create a smx comm object about this socket */
+  data->ongoing_msg_size = sizeof(s_gras_msg_t);
+  smx_comm_t comm = SIMIX_network_irecv(data->rdv,&(data->ongoing_msg),&(data->ongoing_msg_size));
+  xbt_dynar_push(pd->comms,&comm);
+
   DEBUG5("%s (PID %d) connects in %s mode to %s:%d",
          SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
          sock->meas ? "meas" : "regular", sock->peer_name, sock->peer_port);
 }
 
-void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock)
-{
-
-  gras_hostdata_t *hd =
-    (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
-  gras_sg_portrec_t pr;
-  gras_trp_sg_sock_data_t *data;
-  volatile int found;
-
-  const char *host = SIMIX_host_get_name(SIMIX_host_self());
-
-  xbt_ex_t e;
-
-  xbt_assert0(hd, "Please run gras_process_init on each process");
-
-  sock->accepting = 0;          /* no such nuisance in SG */
-  found = 0;
-  TRY {
-    find_port(hd, sock->port, &pr);
-    found = 1;
-  } CATCH(e) {
-    if (e.category == mismatch_error)
-      xbt_ex_free(e);
-    else
-      RETHROW;
-  }
+void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock) {
+  gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
+  xbt_dict_t all_sockets = ((gras_trp_sg_plug_data_t)self->data)->sockets;
 
-  if (found)
-    THROW2(mismatch_error, 0,
-           "can't listen on address %s:%d: port already in use.",
-           host, sock->port);
+  /* Make sure that this socket was not opened so far */
+  char *sock_name=bprintf("%s:%d",gras_os_myname(),sock->port);
+  gras_socket_t old = xbt_dict_get_or_null(all_sockets,sock_name);
+  if (old)
+    THROW1(mismatch_error, 0,
+           "can't listen on address %s: port already in use.",
+           sock_name);
 
-  pr.port = sock->port;
-  pr.meas = sock->meas;
-  pr.socket = sock;
-  pr.process = SIMIX_process_self();
-  xbt_dynar_push(hd->ports, &pr);
 
-  /* Create the socket */
-  data = xbt_new(gras_trp_sg_sock_data_t, 1);
+  /* Create the data associated to the socket */
+  gras_trp_sg_sock_data_t *data = xbt_new0(gras_trp_sg_sock_data_t, 1);
+  data->rdv = SIMIX_rdv_create(sock_name);
   data->from_process = SIMIX_process_self();
-  data->to_process = NULL;
-  data->to_host = SIMIX_host_self();
+  SIMIX_rdv_set_data(data->rdv,sock);
 
-  data->cond = SIMIX_cond_init();
-  data->mutex = SIMIX_mutex_init();
+  sock->is_master = 1;
+  sock->incoming = 1;
 
   sock->data = data;
 
+  /* Register the socket to the set of sockets known simulation-wide */
+  xbt_dict_set(all_sockets,sock_name,sock,NULL); /* FIXME: add a function to raise a warning at simulation end for non-closed sockets */
+
+  /* Create a smx comm object about this socket */
+  data->ongoing_msg_size = sizeof(s_gras_msg_t);
+  smx_comm_t comm = SIMIX_network_irecv(data->rdv,&(data->ongoing_msg),&(data->ongoing_msg_size));
+  INFO2("irecv comm %p onto %p",comm,&(data->ongoing_msg));
+  xbt_dynar_push(pd->comms,&comm);
+
   VERB6("'%s' (%d) ears on %s:%d%s (%p)",
         SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
-        host, sock->port, sock->meas ? " (mode meas)" : "", sock);
-
+        gras_os_myname(), sock->port, sock->meas ? " (mode meas)" : "", sock);
+  free(sock_name);
 }
 
-void gras_trp_sg_socket_close(gras_socket_t sock)
-{
-  gras_hostdata_t *hd =
-    (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
-  unsigned int cpt;
-  gras_sg_portrec_t pr;
-
-  XBT_IN1(" (sock=%p)", sock);
+void gras_trp_sg_socket_close(gras_socket_t sock) {
+  gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
+  if (sock->is_master) {
+    /* server mode socket. Unregister it from 'OS' tables */
+    char *sock_name=bprintf("%s:%d",gras_os_myname(),sock->port);
 
-  if (!sock)
-    return;
+    xbt_dict_t sockets = ((gras_trp_sg_plug_data_t)sock->plugin->data)->sockets;
+    gras_socket_t old = xbt_dict_get_or_null(sockets,sock_name);
+    if (!old)
+      WARN2("socket_close called on the unknown server socket %p (port=%d)",
+            sock, sock->port);
+    xbt_dict_remove(sockets,sock_name);
+    free(sock_name);
 
-  xbt_assert0(hd, "Please run gras_process_init on each process");
+  }
 
   if (sock->data) {
-    SIMIX_cond_destroy(((gras_trp_sg_sock_data_t *) sock->data)->cond);
-    SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t *) sock->data)->mutex);
+    SIMIX_rdv_destroy(((gras_trp_sg_sock_data_t *) sock->data)->rdv);
     free(sock->data);
   }
 
-  if (sock->incoming && !sock->outgoing && sock->port >= 0) {
-    /* server mode socket. Unregister it from 'OS' tables */
-    xbt_dynar_foreach(hd->ports, cpt, pr) {
-      DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports));
-      if (pr.port == sock->port) {
-        xbt_dynar_cursor_rm(hd->ports, &cpt);
-        XBT_OUT;
-        return;
-      }
-    }
-    WARN2("socket_close called on the unknown incoming socket %p (port=%d)",
-          sock, sock->port);
-  }
+  xbt_dynar_push(pd->sockets_to_close,&sock);
+  gras_msg_listener_awake();
+
   XBT_OUT;
 }
 
@@ -261,10 +213,7 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
   char name[256];
   static unsigned int count = 0;
 
-  smx_action_t act;             /* simix action */
   gras_trp_sg_sock_data_t *sock_data;
-  gras_trp_procdata_t trp_remote_proc;
-  gras_msg_procdata_t msg_remote_proc;
   gras_msg_t msg;               /* message to send */
 
   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
@@ -272,7 +221,6 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
   xbt_assert0(sock->meas,
               "SG chunk exchange shouldn't be used on non-measurement sockets");
 
-  SIMIX_mutex_lock(sock_data->mutex);
   sprintf(name, "Chunk[%d]", count++);
   /*initialize gras message */
   msg = xbt_new(s_gras_msg_t, 1);
@@ -285,73 +233,22 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
   } else {
     msg->payl = NULL;
   }
-
-
-  /* put his socket on the selectable socket queue */
-  trp_remote_proc = (gras_trp_procdata_t)
-    gras_libdata_by_name_from_remote("gras_trp", sock_data->to_process);
-  xbt_queue_push(trp_remote_proc->meas_selectable_sockets, &sock);
-
-  /* put message on msg_queue */
-  msg_remote_proc = (gras_msg_procdata_t)
-    gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
-
-  xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas, msg);
-
-  /* wait for the receiver */
-  SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
-
-  /* creates simix action and waits its ends, waits in the sender host
-     condition */
-  DEBUG5("send chunk %s from %s to  %s:%d (size=%ld)",
-         name, SIMIX_host_get_name(SIMIX_host_self()),
-         SIMIX_host_get_name(sock_data->to_host), sock->peer_port, size);
-
-  act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host,
-                                 name, size, -1);
-  SIMIX_register_action_to_condition(act, sock_data->cond);
-  SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
-  SIMIX_unregister_action_to_condition(act, sock_data->cond);
-  /* error treatmeant (FIXME) */
-
-  /* cleanup structures */
-  SIMIX_action_destroy(act);
-
-  SIMIX_mutex_unlock(sock_data->mutex);
+  SIMIX_network_send(sock_data->rdv,size,-1.,-1.,&msg,sizeof(msg),(smx_comm_t*)&(msg->comm),msg);
 }
 
 int gras_trp_sg_chunk_recv(gras_socket_t sock,
                            char *data, unsigned long int size)
 {
   gras_trp_sg_sock_data_t *sock_data;
-  gras_trp_sg_sock_data_t *remote_sock_data;
-  gras_socket_t remote_socket = NULL;
   gras_msg_t msg_got;
-  gras_msg_procdata_t msg_procdata =
-    (gras_msg_procdata_t) gras_libdata_by_name("gras_msg");
-  gras_trp_procdata_t trp_proc =
-    (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
+  smx_comm_t comm;
 
   xbt_assert0(sock->meas,
               "SG chunk exchange shouldn't be used on non-measurement sockets");
-  xbt_queue_shift_timed(trp_proc->meas_selectable_sockets,
-                        &remote_socket, 60);
-
-  if (remote_socket == NULL) {
-    THROW0(timeout_error, 0, "Timeout");
-  }
-
-  remote_sock_data = (gras_trp_sg_sock_data_t *) remote_socket->data;
-  msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas);
 
   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
 
-  /* ok, I'm here, you can continue the communication */
-  SIMIX_cond_signal(remote_sock_data->cond);
-
-  SIMIX_mutex_lock(remote_sock_data->mutex);
-  /* wait for communication end */
-  SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex);
+  SIMIX_network_recv(sock_data->rdv,-1.,&msg_got,NULL,&comm);
 
   if (msg_got->payl_size != size)
     THROW5(mismatch_error, 0,
@@ -367,6 +264,5 @@ int gras_trp_sg_chunk_recv(gras_socket_t sock,
     xbt_free(msg_got->payl);
 
   xbt_free(msg_got);
-  SIMIX_mutex_unlock(remote_sock_data->mutex);
   return 0;
 }
index de76134..ddbf0f8 100644 (file)
@@ -225,7 +225,7 @@ static gras_socket_t gras_trp_sock_socket_accept(gras_socket_t sock)
   res->plugin = sock->plugin;
   res->incoming = sock->incoming;
   res->outgoing = sock->outgoing;
-  res->accepting = 0;
+  res->is_master = 0;
   res->sd = sd;
   res->port = -1;
 
@@ -266,8 +266,10 @@ static void gras_trp_sock_socket_close(gras_socket_t sock)
 
   VERB1("close tcp connection %d", sock->sd);
 
-  /* ask the listener to close the socket */
-  gras_msg_listener_close_socket(sock->sd);
+  if (tcp_close(sock->sd) < 0) {
+    WARN3("error while closing tcp socket %d: %d (%s)\n",
+          sock->sd, sock_errno, sock_errstr(sock_errno));
+  }
 }
 
 /************************************/
index 2588813..b242ccb 100644 (file)
@@ -56,7 +56,7 @@ typedef struct s_gras_socket {
 
   int incoming:1;               /* true if we can read from this sock */
   int outgoing:1;               /* true if we can write on this sock */
-  int accepting:1;              /* true if master incoming sock in tcp */
+  int is_master:1;              /* true if master incoming sock */
   int meas:1;                   /* true if this is an experiment socket instead of messaging */
   int valid:1;                  /* false if a select returned that the peer quitted, forcing us to "close" the socket */
   int moredata:1;               /* TCP socket use a buffer and read operation get as much 
index fea5400..f7fb45f 100644 (file)
@@ -103,10 +103,10 @@ void *gras_libdata_by_name_from_procdata(const char *name,
   }
   return res;
 }
-
-void *gras_libdata_by_id(int id)
-{
-  gras_procdata_t *pd = gras_procdata_get();
+void *gras_libdata_by_id(int id) {
+  return gras_libdata_by_id_from_procdata(id,gras_procdata_get());
+}
+void *gras_libdata_by_id_from_procdata(int id,gras_procdata_t *pd) {
   if (xbt_set_length(pd->libdata) < xbt_dynar_length(_gras_procdata_fabrics)) {
     /* Damn, some new modules were added since procdata_init(). Amok? */
     /* Get 'em all */
index bc39cf2..569fd3a 100644 (file)
@@ -48,7 +48,6 @@ void gras_process_init()
     /* First process on this host */
     hd = xbt_new(gras_hostdata_t, 1);
     hd->refcount = 1;
-    hd->ports = xbt_dynar_new(sizeof(gras_sg_portrec_t), NULL);
     SIMIX_host_set_data(SIMIX_host_self(), (void *) hd);
   } else {
     hd->refcount++;
@@ -62,9 +61,7 @@ void gras_process_init()
   } else
     pd->ppid = -1;
 
-  trp_pd->msg_selectable_sockets = xbt_queue_new(0, sizeof(gras_socket_t));
-
-  trp_pd->meas_selectable_sockets = xbt_queue_new(0, sizeof(gras_socket_t));
+  trp_pd->sockets_to_close = xbt_dynar_new(sizeof(gras_socket_t),NULL);
 
   VERB2("Creating process '%s' (%d)",
         SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid());
@@ -86,9 +83,7 @@ void gras_process_exit()
   gras_trp_procdata_t trp_pd =
     (gras_trp_procdata_t) gras_libdata_by_name("gras_trp");
 
-  xbt_queue_free(&trp_pd->msg_selectable_sockets);
-
-  xbt_queue_free(&trp_pd->meas_selectable_sockets);
+  xbt_dynar_free(&trp_pd->sockets_to_close);
 
 
   xbt_assert0(hd, "Run gras_process_init (ie, gras_init)!!");
@@ -115,7 +110,6 @@ void gras_process_exit()
     gras_socket_close(sock_iter);
   }
   if (!--(hd->refcount)) {
-    xbt_dynar_free(&hd->ports);
     free(hd);
   }
   gras_procdata_exit();
index 9162237..39a31d6 100644 (file)
@@ -44,5 +44,7 @@ typedef struct {
 gras_procdata_t *gras_procdata_get(void);
 void *gras_libdata_by_name_from_procdata(const char *name,
                                          gras_procdata_t * pd);
+void *gras_libdata_by_id_from_procdata(int id,gras_procdata_t *pd);
+
 
 #endif /* GRAS_VIRTU_PRIVATE_H */
index 51090f7..dab2df5 100644 (file)
@@ -26,20 +26,21 @@ typedef struct {
 typedef struct {
   int refcount;
 
-  xbt_dynar_t ports;
-
+  /* Nothing in particular (anymore) */
 } gras_hostdata_t;
 
 /* data for each socket (FIXME: find a better location for that)*/
 typedef struct {
-  smx_process_t from_process;
-  smx_process_t to_process;
+  smx_rdv_t rdv;
 
+  smx_process_t from_process; /* the one who created the socket */
   smx_host_t to_host;           /* Who's on other side */
+  smx_comm_t comm; /* Ongoing communication */
+
+  s_gras_msg_t ongoing_msg;
+  size_t ongoing_msg_size;
 
-  smx_cond_t cond;
-  smx_mutex_t mutex;
-  gras_socket_t to_socket;
+  gras_socket_t to_socket; /* If != NULL, this socket was created as accept when receiving onto to_socket */
 } gras_trp_sg_sock_data_t;
 
 
index 82785d0..d746e71 100644 (file)
@@ -207,7 +207,7 @@ XBT_PUBLIC(void*) SIMIX_rdv_get_data(smx_rdv_t rdv);
 /*****Communication Requests*****/
 XBT_PUBLIC(void) SIMIX_communication_cancel(smx_comm_t comm);
 XBT_PUBLIC(double) SIMIX_communication_get_remains(smx_comm_t comm);
-XBT_PUBLIC(void *) SIMIX_communication_get_data(smx_comm_t comm);
+XBT_PUBLIC(void *) SIMIX_communication_get_sentdata(smx_comm_t comm);
 
 /*****Networking*****/
 XBT_PUBLIC(void) SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate,
index 3425a41..f6f3230 100644 (file)
@@ -64,7 +64,7 @@ m_task_t MSG_mailbox_get_head(msg_mailbox_t mailbox)
   if(!comm)
     return NULL; 
   
-  return (m_task_t)SIMIX_communication_get_data(comm);
+  return (m_task_t)SIMIX_communication_get_sentdata(comm);
 }
 
 int
index 26e0c5e..9df8f76 100644 (file)
@@ -19,6 +19,7 @@ XBT_LOG_EXTERNAL_CATEGORY(simix_host);
 XBT_LOG_EXTERNAL_CATEGORY(simix_process);
 XBT_LOG_EXTERNAL_CATEGORY(simix_synchro);
 XBT_LOG_EXTERNAL_CATEGORY(simix_context);
+XBT_LOG_EXTERNAL_CATEGORY(simix_network);
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_kernel, simix,
                                 "Logging specific to SIMIX (kernel)");
 
@@ -57,6 +58,7 @@ void SIMIX_global_init(int *argc, char **argv)
     XBT_LOG_CONNECT(simix_process, simix);
     XBT_LOG_CONNECT(simix_synchro, simix);
     XBT_LOG_CONNECT(simix_context, simix);
+    XBT_LOG_CONNECT(simix_network, simix);
 
     simix_global = xbt_new0(s_SIMIX_Global_t, 1);
 
@@ -417,8 +419,10 @@ double SIMIX_solve(xbt_fifo_t actions_done, xbt_fifo_t actions_failed)
         if (smx_action) {
           /* Copy the transfered data of the completed communication actions */
           /* FIXME: find a better way to determine if its a comm action */
-          if(smx_action->data != NULL)
+          if(smx_action->data != NULL) {
+            CDEBUG1(simix_network,"Communication %p finished",smx_action->data);
             SIMIX_network_copy_data((smx_comm_t)smx_action->data);
+          }
           SIMIX_action_signal_all(smx_action);      
         }
       }
index 7008ec3..4c32439 100644 (file)
@@ -87,7 +87,7 @@ smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rdv, smx_comm_type_t type)
   }
 
   /* no relevant request found. Return NULL */
-  DEBUG0("Communication request not found");
+  DEBUG0("Communication request not found. I assume that other side will arrive later on.");
   return NULL;
 }
 
@@ -300,36 +300,32 @@ double SIMIX_communication_get_remains(smx_comm_t comm)
  */
 void SIMIX_network_copy_data(smx_comm_t comm)
 {
-  /* If there is no data to be copy then return */
-  if(!comm->src_buff || !comm->dst_buff)
-    return;
-  
   size_t src_buff_size = comm->src_buff_size;
   size_t dst_buff_size = *comm->dst_buff_size;
-  
-  /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
-  dst_buff_size = MIN(dst_buff_size, src_buff_size);
-  
-  /* Update the receiver's buffer size to the copied amount */
-  if (comm->dst_buff_size)
-    *comm->dst_buff_size = dst_buff_size;
 
-  if(dst_buff_size == 0)
+  xbt_assert(src_buff_size == dst_buff_size);
+
+  /* If there is no data to copy then return */
+  if(!comm->src_buff || !comm->dst_buff || dst_buff_size == 0)
     return;
 
   memcpy(comm->dst_buff, comm->src_buff, dst_buff_size);
 
-  DEBUG4("Copying comm %p data from %s -> %s (%zu bytes)", 
+  DEBUG6("Copying comm %p data from %s -> %s (%zu bytes %p->%p)",
          comm, comm->src_proc->smx_host->name, comm->dst_proc->smx_host->name,
-         dst_buff_size);
+         dst_buff_size,comm->src_buff,comm->dst_buff);
 }
 
 /**
  *  \brief Return the user data associated to the communication
+ *
+ *  In MSG and GRAS, that data is the exchanged task/msg itself, since
+ *  (i) In MSG, the receiver still wants to read the task although the communication didn't complete.
+ *  (ii) In GRAS, we need to retrieve that gras_msg_t during the select
  *  \param comm The communication
  *  \return the user data
  */
-void *SIMIX_communication_get_data(smx_comm_t comm)
+void *SIMIX_communication_get_sentdata(smx_comm_t comm)
 {
   return comm->data;
 }
@@ -427,6 +423,7 @@ smx_comm_t SIMIX_network_irecv(smx_rdv_t rdv, void *dst_buff, size_t *dst_buff_s
   comm->dst_buff = dst_buff;
   comm->dst_buff_size = dst_buff_size;
 
+  DEBUG2("Receive data for %p into %p",comm,comm->dst_buff);
   SIMIX_communication_start(comm);
   return comm;
 }