Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Revert "try to port the gras simulation side to the new smx_network infrastructure...
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 10 Nov 2009 12:32:54 +0000 (12:32 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 10 Nov 2009 12:32:54 +0000 (12:32 +0000)
This git branch is not ready for public consumption yet (sorry)

This reverts commit 063c63642a29000a011c0d6176d30eb62a4e0dca.

git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6836 48e7efb5-ca39-0410-a469-dd3cf9ba447f

20 files changed:
include/gras/messages.h
src/gras/Msg/gras_msg_listener.c
src/gras/Msg/msg_interface.h
src/gras/Msg/sg_msg.c
src/gras/Transport/rl_transport.c
src/gras/Transport/sg_transport.c
src/gras/Transport/transport.c
src/gras/Transport/transport_interface.h
src/gras/Transport/transport_plugin_file.c
src/gras/Transport/transport_plugin_sg.c
src/gras/Transport/transport_plugin_tcp.c
src/gras/Transport/transport_private.h
src/gras/Virtu/process.c
src/gras/Virtu/sg_process.c
src/gras/Virtu/virtu_private.h
src/gras/Virtu/virtu_sg.h
src/include/simix/simix.h
src/msg/msg_mailbox.c
src/simix/smx_global.c
src/simix/smx_network.c

index c522c53..e748515 100644 (file)
@@ -281,7 +281,6 @@ XBT_PUBLIC(void) gras_msg_rpcreturn(double timeOut, gras_msg_cb_ctx_t ctx,
        gras_msgtype_t type;
        unsigned long int ID;
        void *payl;
        gras_msgtype_t type;
        unsigned long int ID;
        void *payl;
-       void *comm; /* simix_comm in SG */
        int payl_size;
      } s_gras_msg_t, *gras_msg_t;
 
        int payl_size;
      } s_gras_msg_t, *gras_msg_t;
 
index c66c18a..b583d58 100644 (file)
@@ -26,14 +26,6 @@ typedef struct s_gras_msg_listener_ {
   xbt_thread_t listener;
 } s_gras_msg_listener_t;
 
   xbt_thread_t listener;
 } s_gras_msg_listener_t;
 
-static void do_close_socket(gras_socket_t sock) {
-  if (sock->plugin->socket_close)
-     (*sock->plugin->socket_close) (sock);
-   /* free the memory */
-   if (sock->peer_name)
-     free(sock->peer_name);
-   free(sock);
-}
 static void listener_function(void *p)
 {
   gras_msg_listener_t me = (gras_msg_listener_t) p;
 static void listener_function(void *p)
 {
   gras_msg_listener_t me = (gras_msg_listener_t) p;
@@ -64,9 +56,12 @@ static void listener_function(void *p)
     /* empty the list of sockets to trash */
     TRY {
       while (1) {
     /* empty the list of sockets to trash */
     TRY {
       while (1) {
-        gras_socket_t sock;
+        int sock;
         xbt_queue_shift_timed(me->socks_to_close, &sock, 0);
         xbt_queue_shift_timed(me->socks_to_close, &sock, 0);
-        do_close_socket(sock);
+        if (tcp_close(sock) < 0) {
+          WARN3("error while closing tcp socket %d: %d (%s)\n",
+                sock, sock_errno, sock_errstr(sock_errno));
+        }
       }
     }
     CATCH(e) {
       }
     }
     CATCH(e) {
@@ -139,14 +134,14 @@ void gras_msg_listener_awake()
   }
 }
 
   }
 }
 
-void gras_msg_listener_close_socket(gras_socket_t sock)
+void gras_msg_listener_close_socket(int sd)
 {
   gras_procdata_t *pd = gras_procdata_get();
   if (pd->listener) {
 {
   gras_procdata_t *pd = gras_procdata_get();
   if (pd->listener) {
-    xbt_queue_push(pd->listener->socks_to_close, &sock);
+    xbt_queue_push(pd->listener->socks_to_close, &sd);
     gras_msg_listener_awake();
   } else {
     /* do it myself */
     gras_msg_listener_awake();
   } else {
     /* do it myself */
-    do_close_socket(sock);
+    tcp_close(sd);
   }
 }
   }
 }
index 27c2c70..0bad57a 100644 (file)
@@ -49,7 +49,7 @@ typedef struct {
 void gras_msg_send_namev(gras_socket_t sock,
                          const char *namev, void *payload);
 void gras_msg_listener_awake(void);
 void gras_msg_send_namev(gras_socket_t sock,
                          const char *namev, void *payload);
 void gras_msg_listener_awake(void);
-void gras_msg_listener_close_socket(gras_socket_t sock);
+void gras_msg_listener_close_socket(int sd);
 
 #define GRAS_PROTOCOL_VERSION '\1';
 
 
 #define GRAS_PROTOCOL_VERSION '\1';
 
index 131ee9f..1afb8aa 100644 (file)
@@ -27,13 +27,20 @@ void gras_msg_send_ext(gras_socket_t sock,
                        unsigned long int ID,
                        gras_msgtype_t msgtype, void *payload)
 {
                        unsigned long int ID,
                        gras_msgtype_t msgtype, void *payload)
 {
+
+  smx_action_t act;             /* simix action */
   gras_trp_sg_sock_data_t *sock_data;
   gras_trp_sg_sock_data_t *sock_data;
+  gras_hostdata_t *hd;
+  gras_trp_procdata_t trp_remote_proc;
+  gras_msg_procdata_t msg_remote_proc;
   gras_msg_t msg;               /* message to send */
   int whole_payload_size = 0;   /* msg->payload_size is used to memcpy the payload.
                                    This is used to report the load onto the simulator. It also counts the size of pointed stuff */
 
   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
 
   gras_msg_t msg;               /* message to send */
   int whole_payload_size = 0;   /* msg->payload_size is used to memcpy the payload.
                                    This is used to report the load onto the simulator. It also counts the size of pointed stuff */
 
   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
 
+  hd = (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
+
   xbt_assert1(!gras_socket_is_meas(sock),
               "Asked to send a message on the measurement socket %p", sock);
 
   xbt_assert1(!gras_socket_is_meas(sock),
               "Asked to send a message on the measurement socket %p", sock);
 
@@ -44,7 +51,7 @@ void gras_msg_send_ext(gras_socket_t sock,
   msg->type = msgtype;
   msg->ID = ID;
   if (kind == e_gras_msg_kind_rpcerror) {
   msg->type = msgtype;
   msg->ID = ID;
   if (kind == e_gras_msg_kind_rpcerror) {
-    /* error on remote host, careful, payload is an exception */
+    /* error on remote host, carfull, payload is an exception */
     msg->payl_size = gras_datadesc_size(gras_datadesc_by_name("ex_t"));
     msg->payl = xbt_malloc(msg->payl_size);
     whole_payload_size = gras_datadesc_memcpy(gras_datadesc_by_name("ex_t"),
     msg->payl_size = gras_datadesc_size(gras_datadesc_by_name("ex_t"));
     msg->payl = xbt_malloc(msg->payl_size);
     whole_payload_size = gras_datadesc_memcpy(gras_datadesc_by_name("ex_t"),
@@ -67,23 +74,56 @@ void gras_msg_send_ext(gras_socket_t sock,
                                                 payload, msg->payl);
   }
 
                                                 payload, msg->payl);
   }
 
+  /* put the selectable socket on the queue */
+  trp_remote_proc = (gras_trp_procdata_t)
+    gras_libdata_by_name_from_remote("gras_trp", sock_data->to_process);
+
+  xbt_queue_push(trp_remote_proc->msg_selectable_sockets, &sock);
+
+  /* put message on msg_queue */
+  msg_remote_proc = (gras_msg_procdata_t)
+    gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
+  xbt_fifo_push(msg_remote_proc->msg_to_receive_queue, msg);
+
+  /* wait for the receiver */
+  SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
 
 
+  /* creates simix action and waits its ends, waits in the sender host
+     condition */
+  act = SIMIX_action_communicate(SIMIX_host_self(),
+                                 sock_data->to_host, msgtype->name,
+                                 (double) whole_payload_size, -1);
+  SIMIX_register_action_to_condition(act, sock_data->cond);
 
   VERB5("Sending to %s(%s) a message type '%s' kind '%s' ID %lu",
 
   VERB5("Sending to %s(%s) a message type '%s' kind '%s' ID %lu",
-        sock->peer_name,sock->peer_proc,
+        SIMIX_host_get_name(sock_data->to_host),
+        SIMIX_process_get_name(sock_data->to_process),
         msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID);
         msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID);
-  SIMIX_network_send(sock_data->rdv,whole_payload_size,-1.,-1.,msg,sizeof(s_gras_msg_t),(smx_comm_t*)&(msg->comm),&msg);
+
+  SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
+  SIMIX_unregister_action_to_condition(act, sock_data->cond);
+  /* error treatmeant (FIXME) */
+
+  /* cleanup structures */
+  SIMIX_action_destroy(act);
+  SIMIX_mutex_unlock(sock_data->mutex);
 
   VERB0("Message sent");
 
   VERB0("Message sent");
+
 }
 
 /*
  * receive the next message on the given socket.
  */
 }
 
 /*
  * receive the next message on the given socket.
  */
-void gras_msg_recv(gras_socket_t sock, gras_msg_t msg) {
-  gras_trp_procdata_t pd =
-    (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
+void gras_msg_recv(gras_socket_t sock, gras_msg_t msg)
+{
+
   gras_trp_sg_sock_data_t *sock_data;
   gras_trp_sg_sock_data_t *sock_data;
+  gras_trp_sg_sock_data_t *remote_sock_data;
+  gras_hostdata_t *remote_hd;
+  gras_msg_t msg_got;
+  gras_msg_procdata_t msg_procdata =
+    (gras_msg_procdata_t) gras_libdata_by_name("gras_msg");
 
   xbt_assert1(!gras_socket_is_meas(sock),
               "Asked to receive a message on the measurement socket %p",
 
   xbt_assert1(!gras_socket_is_meas(sock),
               "Asked to receive a message on the measurement socket %p",
@@ -92,28 +132,11 @@ void gras_msg_recv(gras_socket_t sock, gras_msg_t msg) {
   xbt_assert0(msg, "msg is an out parameter of gras_msg_recv...");
 
   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
   xbt_assert0(msg, "msg is an out parameter of gras_msg_recv...");
 
   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
-
-  /* The message was already received while emulating the select, so simply copy it here */
-  memcpy(msg,&(sock_data->ongoing_msg),sizeof(s_gras_msg_t));
-  msg->expe = sock;
-  VERB1("Using %p as a msg",&(sock_data->ongoing_msg));
-  VERB5("Received a message type '%s' kind '%s' ID %lu from %s(%s)",
-        msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID,
-        sock->peer_name,sock->peer_proc);
-
-  /* Recreate another comm object to replace the one which just terminated */
-  int rank = xbt_dynar_search(pd->sockets,&sock);
-  xbt_assert0(rank>=0,"Socket not found in my array");
-  sock_data->ongoing_msg_size = sizeof(s_gras_msg_t);
-  smx_comm_t comm = SIMIX_network_irecv(sock_data->rdv,&(sock_data->ongoing_msg),&(sock_data->ongoing_msg_size));
-  xbt_dynar_set(pd->comms,rank,&comm);
-
-#if 0 /* KILLME */
-  SIMIX_network_recv(sock_data->rdv,-1.,&msg_got,NULL,&comm);
-
-
   remote_sock_data =
     ((gras_trp_sg_sock_data_t *) sock->data)->to_socket->data;
   remote_sock_data =
     ((gras_trp_sg_sock_data_t *) sock->data)->to_socket->data;
+  DEBUG3("Remote host %s, Remote Port: %d Local port %d",
+         SIMIX_host_get_name(sock_data->to_host), sock->peer_port,
+         sock->port);
   remote_hd = (gras_hostdata_t *) SIMIX_host_get_data(sock_data->to_host);
 
   if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0) {
   remote_hd = (gras_hostdata_t *) SIMIX_host_get_data(sock_data->to_host);
 
   if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0) {
@@ -128,13 +151,13 @@ void gras_msg_recv(gras_socket_t sock, gras_msg_t msg) {
   SIMIX_cond_signal(remote_sock_data->cond);
 
   /* wait for communication end */
   SIMIX_cond_signal(remote_sock_data->cond);
 
   /* wait for communication end */
-  INFO2("Wait communication (from %s) termination on %p",sock->peer_name,sock_data->cond);
-
   SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex);
 
   msg_got->expe = msg->expe;
   memcpy(msg, msg_got, sizeof(s_gras_msg_t));
   xbt_free(msg_got);
   SIMIX_mutex_unlock(remote_sock_data->mutex);
   SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex);
 
   msg_got->expe = msg->expe;
   memcpy(msg, msg_got, sizeof(s_gras_msg_t));
   xbt_free(msg_got);
   SIMIX_mutex_unlock(remote_sock_data->mutex);
-#endif
+
+  VERB3("Received a message type '%s' kind '%s' ID %lu",        // from %s",
+        msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID);
 }
 }
index a8c2a70..395137f 100644 (file)
@@ -175,7 +175,7 @@ gras_socket_t gras_trp_select(double timeout)
       /* Got a socket to serve */
       ready--;
 
       /* Got a socket to serve */
       ready--;
 
-      if (sock_iter->is_master && sock_iter->plugin->socket_accept) {
+      if (sock_iter->accepting && sock_iter->plugin->socket_accept) {
         /* not a socket but an ear. accept on it and serve next socket */
         gras_socket_t accepted = NULL;
 
         /* not a socket but an ear. accept on it and serve next socket */
         gras_socket_t accepted = NULL;
 
index 96e5042..f9bfbb7 100644 (file)
@@ -10,7 +10,6 @@
 #include "xbt/ex.h"
 #include "gras/Transport/transport_private.h"
 #include "gras/Virtu/virtu_sg.h"
 #include "xbt/ex.h"
 #include "gras/Transport/transport_private.h"
 #include "gras/Virtu/virtu_sg.h"
-#include "simix/private.h"
 
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_trp);
 
 
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_trp);
 
@@ -28,61 +27,8 @@ gras_socket_t _gras_lastly_selected_socket = NULL;
  *
  * if timeout>0 and no message there, wait at most that amount of time before giving up.
  */
  *
  * if timeout>0 and no message there, wait at most that amount of time before giving up.
  */
-gras_socket_t gras_trp_select(double timeout) {
-  static int warned=0;
-  if (timeout>=0 && !warned) {
-    warned=1;
-    WARN0("Timed select not implemented in SG. Switching to blocking select");
-  }
-  gras_trp_procdata_t pd =
-    (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
-
-  gras_socket_t sock_iter, active_socket;
-  gras_trp_sg_sock_data_t *active_socket_data;
-  smx_comm_t comm;
-  unsigned int cursor;
-
-
-  /* FIXME: make sure that the ongoing comm is canceled&destroyed when the corresponding socket is closed */
-  xbt_assert(xbt_dynar_length(pd->sockets)==xbt_dynar_length(pd->comms));
-
-  /* Wait for the first terminating comm object */
-  int rank = SIMIX_network_waitany(pd->comms);
-
-  /* Don't wait on this socket until the comm object is recreated by gras_msg_recv */
-  comm = NULL;
-  xbt_dynar_set(pd->comms,rank,&comm);
-
-  /* Ok, got something. Open a socket back to the expeditor */
-  active_socket = xbt_dynar_get_as(pd->sockets,rank,gras_socket_t);
-  active_socket_data = (gras_trp_sg_sock_data_t *) active_socket->data;
-
-  /* Try to reuse an already opened socket to that expeditor */
-  DEBUG1("Open sockets size %lu", xbt_dynar_length(pd->sockets));
-  xbt_dynar_foreach(pd->sockets, cursor, sock_iter) {
-    gras_trp_sg_sock_data_t *sock_data;
-    DEBUG1("Consider %p as outgoing socket to expeditor", sock_iter);
-
-    if (sock_iter->meas || !sock_iter->outgoing)
-      continue;
-    sock_data = ((gras_trp_sg_sock_data_t *) sock_iter->data);
-
-    if ((sock_data->to_socket == active_socket) &&
-        (sock_data->to_host ==
-         SIMIX_process_get_host(active_socket_data->from_process))) {
-      xbt_dynar_cursor_unlock(pd->sockets);
-      return sock_iter;
-    }
-  }
-
-  /* Socket to expeditor not created yet */
-  DEBUG0("Create a socket to the expeditor");
-
-
-
-  return sock_iter;
-
-#if 0 /* KILLME */
+gras_socket_t gras_trp_select(double timeout)
+{
   gras_socket_t res;
   gras_trp_procdata_t pd =
     (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
   gras_socket_t res;
   gras_trp_procdata_t pd =
     (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
@@ -139,7 +85,7 @@ gras_socket_t gras_trp_select(double timeout) {
 
   res->incoming = 1;
   res->outgoing = 1;
 
   res->incoming = 1;
   res->outgoing = 1;
-  res->is_master = 0;
+  res->accepting = 0;
   res->sd = -1;
 
   res->port = -1;
   res->sd = -1;
 
   res->port = -1;
@@ -177,7 +123,6 @@ gras_socket_t gras_trp_select(double timeout) {
          SIMIX_process_get_name(sockdata->to_process), res->port);
 
   return res;
          SIMIX_process_get_name(sockdata->to_process), res->port);
 
   return res;
-#endif
 }
 
 
 }
 
 
index f36f83d..dd976c4 100644 (file)
@@ -157,7 +157,7 @@ void gras_trp_socket_new(int incoming, gras_socket_t * dst)
 
   sock->incoming = incoming ? 1 : 0;
   sock->outgoing = incoming ? 0 : 1;
 
   sock->incoming = incoming ? 1 : 0;
   sock->outgoing = incoming ? 0 : 1;
-  sock->is_master = incoming ? 1 : 0;
+  sock->accepting = incoming ? 1 : 0;
   sock->meas = 0;
   sock->recvd = 0;
   sock->valid = 1;
   sock->meas = 0;
   sock->recvd = 0;
   sock->valid = 1;
@@ -211,7 +211,7 @@ gras_socket_server_ext(unsigned short port,
     trp->socket_server(trp, sock);
     DEBUG3("in=%c out=%c accept=%c",
            sock->incoming ? 'y' : 'n',
     trp->socket_server(trp, sock);
     DEBUG3("in=%c out=%c accept=%c",
            sock->incoming ? 'y' : 'n',
-           sock->outgoing ? 'y' : 'n', sock->is_master ? 'y' : 'n');
+           sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
   } CATCH(e) {
 
     free(sock);
   } CATCH(e) {
 
     free(sock);
@@ -297,7 +297,7 @@ gras_socket_client_ext(const char *host,
     (*trp->socket_client) (trp, sock);
     DEBUG3("in=%c out=%c accept=%c",
            sock->incoming ? 'y' : 'n',
     (*trp->socket_client) (trp, sock);
     DEBUG3("in=%c out=%c accept=%c",
            sock->incoming ? 'y' : 'n',
-           sock->outgoing ? 'y' : 'n', sock->is_master ? 'y' : 'n');
+           sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
   } CATCH(e) {
     free(sock);
     RETHROW;
   } CATCH(e) {
     free(sock);
     RETHROW;
@@ -340,7 +340,7 @@ void gras_socket_close_voidp(void *sock) {
 /** \brief Close socket */
 void gras_socket_close(gras_socket_t sock)
 {
 /** \brief Close socket */
 void gras_socket_close(gras_socket_t sock)
 {
-  xbt_dynar_t my_sockets =
+  xbt_dynar_t sockets =
     ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets;
   gras_socket_t sock_iter = NULL;
   unsigned int cursor;
     ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->sockets;
   gras_socket_t sock_iter = NULL;
   unsigned int cursor;
@@ -358,17 +358,23 @@ void gras_socket_close(gras_socket_t sock)
   }
 
   /* FIXME: Issue an event when the socket is closed */
   }
 
   /* FIXME: Issue an event when the socket is closed */
-  DEBUG1("sockets pointer before %p", my_sockets);
+  DEBUG1("sockets pointer before %p", sockets);
   if (sock) {
     /* FIXME: Cannot get the dynar mutex, because it can be already locked */
 //              _xbt_dynar_foreach(sockets,cursor,sock_iter) {
   if (sock) {
     /* FIXME: Cannot get the dynar mutex, because it can be already locked */
 //              _xbt_dynar_foreach(sockets,cursor,sock_iter) {
-    for (cursor = 0; cursor < xbt_dynar_length(my_sockets); cursor++) {
-      _xbt_dynar_cursor_get(my_sockets, cursor, &sock_iter);
+    for (cursor = 0; cursor < xbt_dynar_length(sockets); cursor++) {
+      _xbt_dynar_cursor_get(sockets, cursor, &sock_iter);
       if (sock == sock_iter) {
         DEBUG2("remove sock cursor %d dize %lu\n", cursor,
       if (sock == sock_iter) {
         DEBUG2("remove sock cursor %d dize %lu\n", cursor,
-               xbt_dynar_length(my_sockets));
-        xbt_dynar_cursor_rm(my_sockets, &cursor);
-        gras_msg_listener_close_socket(sock);
+               xbt_dynar_length(sockets));
+        xbt_dynar_cursor_rm(sockets, &cursor);
+        if (sock->plugin->socket_close)
+          (*sock->plugin->socket_close) (sock);
+
+        /* free the memory */
+        if (sock->peer_name)
+          free(sock->peer_name);
+        free(sock);
         XBT_OUT;
         return;
       }
         XBT_OUT;
         return;
       }
@@ -568,7 +574,7 @@ gras_socket_t gras_socket_meas_accept(gras_socket_t peer)
   xbt_assert0(peer->meas,
               "No need to accept on non-measurement sockets (it's automatic)");
 
   xbt_assert0(peer->meas,
               "No need to accept on non-measurement sockets (it's automatic)");
 
-  if (!peer->is_master) {
+  if (!peer->accepting) {
     /* nothing to accept here (must be in SG) */
     /* BUG: FIXME: this is BAD! it makes tricky to free the accepted socket */
     return peer;
     /* nothing to accept here (must be in SG) */
     /* BUG: FIXME: this is BAD! it makes tricky to free the accepted socket */
     return peer;
@@ -592,7 +598,6 @@ static void *gras_trp_procdata_new(void)
   res->name = xbt_strdup("gras_trp");
   res->name_len = 0;
   res->sockets = xbt_dynar_new_sync(sizeof(gras_socket_t *), NULL);
   res->name = xbt_strdup("gras_trp");
   res->name_len = 0;
   res->sockets = xbt_dynar_new_sync(sizeof(gras_socket_t *), NULL);
-  res->comms = xbt_dynar_new(sizeof(void*),NULL); /* stores some smx_comm_t in SG (not used in RL) */
   res->myport = 0;
 
   return (void *) res;
   res->myport = 0;
 
   return (void *) res;
@@ -606,7 +611,6 @@ static void gras_trp_procdata_free(void *data)
   gras_trp_procdata_t res = (gras_trp_procdata_t) data;
 
   xbt_dynar_free(&(res->sockets));
   gras_trp_procdata_t res = (gras_trp_procdata_t) data;
 
   xbt_dynar_free(&(res->sockets));
-  xbt_dynar_free(&(res->comms));
   free(res->name);
   free(res);
 }
   free(res->name);
   free(res);
 }
index 21c52d7..6b84713 100644 (file)
@@ -99,8 +99,12 @@ XBT_PUBLIC(gras_trp_plugin_t)
        int myport;              /* Port on which I listen myself */
 
        xbt_dynar_t sockets;     /* all sockets known to this process */
        int myport;              /* Port on which I listen myself */
 
        xbt_dynar_t sockets;     /* all sockets known to this process */
-       xbt_dynar_t comms; /* SG cruft: the ongoing communications */
-       xbt_dynar_t sockets_to_close; /* The listener is in charge of closing the sockets */
+
+       /* SG only elements. In RL, they are part of the OS ;) */
+
+       /* List of sockets ready to be select()ed */
+       xbt_queue_t msg_selectable_sockets;      /* regular sockets  */
+       xbt_queue_t meas_selectable_sockets;     /* measurement ones */
 
      } s_gras_trp_procdata_t, *gras_trp_procdata_t;
 
 
      } s_gras_trp_procdata_t, *gras_trp_procdata_t;
 
index ca8aadf..c8a13d0 100644 (file)
@@ -98,7 +98,7 @@ gras_socket_t gras_socket_client_from_file(const char *path)
          path,
          res->sd,
          res->incoming ? 'y' : 'n',
          path,
          res->sd,
          res->incoming ? 'y' : 'n',
-         res->outgoing ? 'y' : 'n', res->is_master ? 'y' : 'n');
+         res->outgoing ? 'y' : 'n', res->accepting ? 'y' : 'n');
 
   xbt_dynar_push(((gras_trp_procdata_t)
                   gras_libdata_by_id(gras_trp_libdata_id))->sockets, &res);
 
   xbt_dynar_push(((gras_trp_procdata_t)
                   gras_libdata_by_id(gras_trp_libdata_id))->sockets, &res);
@@ -138,7 +138,7 @@ gras_socket_t gras_socket_server_from_file(const char *path)
   DEBUG4("sd=%d in=%c out=%c accept=%c",
          res->sd,
          res->incoming ? 'y' : 'n',
   DEBUG4("sd=%d in=%c out=%c accept=%c",
          res->sd,
          res->incoming ? 'y' : 'n',
-         res->outgoing ? 'y' : 'n', res->is_master ? 'y' : 'n');
+         res->outgoing ? 'y' : 'n', res->accepting ? 'y' : 'n');
 
   xbt_dynar_push(((gras_trp_procdata_t)
                   gras_libdata_by_id(gras_trp_libdata_id))->sockets, &res);
 
   xbt_dynar_push(((gras_trp_procdata_t)
                   gras_libdata_by_id(gras_trp_libdata_id))->sockets, &res);
index a80c331..9ebfd79 100644 (file)
@@ -24,6 +24,10 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp_sg, gras_trp,
  *** Prototypes 
  ***/
 
  *** Prototypes 
  ***/
 
+/* retrieve the port record associated to a numerical port on an host */
+static void find_port(gras_hostdata_t * hd, int port,
+                      gras_sg_portrec_t * hpd);
+
 
 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
                                /* OUT */ gras_socket_t sock);
 
 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
                                /* OUT */ gras_socket_t sock);
@@ -44,27 +48,36 @@ int gras_trp_sg_chunk_recv(gras_socket_t sd,
  *** Specific plugin part
  ***/
 typedef struct {
  *** Specific plugin part
  ***/
 typedef struct {
-  xbt_dict_t sockets; /* all known sockets */
-} s_gras_trp_sg_plug_data_t,*gras_trp_sg_plug_data_t;
+  int placeholder;              /* nothing plugin specific so far */
+} gras_trp_sg_plug_data_t;
 
 
 /***
  *** Code
  ***/
 
 
 /***
  *** Code
  ***/
+static void find_port(gras_hostdata_t * hd, int port, gras_sg_portrec_t * hpd)
+{
+  unsigned int cpt;
+  gras_sg_portrec_t pr;
 
 
-static void gras_trp_sg_exit(gras_trp_plugin_t plug){
-  gras_trp_sg_plug_data_t mydata = (gras_trp_sg_plug_data_t) plug->data;
-  xbt_dict_free(&(mydata->sockets));
-  xbt_free(plug->data);
+  xbt_assert0(hd, "Please run gras_process_init on each process");
+
+  xbt_dynar_foreach(hd->ports, cpt, pr) {
+    if (pr.port == port) {
+      memcpy(hpd, &pr, sizeof(gras_sg_portrec_t));
+      return;
+    }
+  }
+  THROW1(mismatch_error, 0, "Unable to find any portrec for port #%d", port);
 }
 }
+
+
 void gras_trp_sg_setup(gras_trp_plugin_t plug)
 {
 void gras_trp_sg_setup(gras_trp_plugin_t plug)
 {
-  gras_trp_sg_plug_data_t data = xbt_new(s_gras_trp_sg_plug_data_t, 1);
 
 
-  plug->data = data;
-  data->sockets = xbt_dict_new();
+  gras_trp_sg_plug_data_t *data = xbt_new(gras_trp_sg_plug_data_t, 1);
 
 
-  plug->exit = gras_trp_sg_exit;
+  plug->data = data;
 
   plug->socket_client = gras_trp_sg_socket_client;
   plug->socket_server = gras_trp_sg_socket_server;
 
   plug->socket_client = gras_trp_sg_socket_client;
   plug->socket_server = gras_trp_sg_socket_server;
@@ -78,120 +91,155 @@ void gras_trp_sg_setup(gras_trp_plugin_t plug)
 }
 
 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
 }
 
 void gras_trp_sg_socket_client(gras_trp_plugin_t self,
-                               /* OUT */ gras_socket_t sock) {
+                               /* OUT */ gras_socket_t sock)
+{
+  xbt_ex_t e;
 
   smx_host_t peer;
 
   smx_host_t peer;
+  gras_hostdata_t *hd;
   gras_trp_sg_sock_data_t *data;
   gras_trp_sg_sock_data_t *data;
-  gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
-
+  gras_sg_portrec_t pr;
 
 
+  /* make sure this socket will reach someone */
   if (!(peer = SIMIX_host_get_by_name(sock->peer_name)))
     THROW1(mismatch_error, 0,
   if (!(peer = SIMIX_host_get_by_name(sock->peer_name)))
     THROW1(mismatch_error, 0,
-           "Can't connect to %s: no such host", sock->peer_name);
+           "Can't connect to %s: no such host.\n", sock->peer_name);
 
 
-  /* make sure this socket will reach someone */
-  xbt_dict_t all_sockets = ((gras_trp_sg_plug_data_t)self->data)->sockets;
-  char *sock_name=bprintf("%s:%d",sock->peer_name,sock->peer_port);
-  gras_socket_t server = xbt_dict_get_or_null(all_sockets,sock_name);
-  free(sock_name);
+  if (!(hd = (gras_hostdata_t *) SIMIX_host_get_data(peer)))
+    THROW1(mismatch_error, 0,
+           "can't connect to %s: no process on this host", sock->peer_name);
 
 
-  if (!server)
-    THROW2(mismatch_error, 0,
-           "can't connect to %s:%d, no process listen on this port",
-           sock->peer_name, sock->peer_port);
+  TRY {
+    find_port(hd, sock->peer_port, &pr);
+  }
+  CATCH(e) {
+    if (e.category == mismatch_error) {
+      xbt_ex_free(e);
+      THROW2(mismatch_error, 0,
+             "can't connect to %s:%d, no process listen on this port",
+             sock->peer_name, sock->peer_port);
+    }
+    RETHROW;
+  }
 
 
-  if (server->meas && !sock->meas) {
+  if (pr.meas && !sock->meas) {
     THROW2(mismatch_error, 0,
            "can't connect to %s:%d in regular mode, the process listen "
            "in measurement mode on this port", sock->peer_name,
            sock->peer_port);
   }
     THROW2(mismatch_error, 0,
            "can't connect to %s:%d in regular mode, the process listen "
            "in measurement mode on this port", sock->peer_name,
            sock->peer_port);
   }
-  if (!server->meas && sock->meas) {
+  if (!pr.meas && sock->meas) {
     THROW2(mismatch_error, 0,
            "can't connect to %s:%d in measurement mode, the process listen "
            "in regular mode on this port", sock->peer_name, sock->peer_port);
   }
   /* create the socket */
     THROW2(mismatch_error, 0,
            "can't connect to %s:%d in measurement mode, the process listen "
            "in regular mode on this port", sock->peer_name, sock->peer_port);
   }
   /* create the socket */
-  data = xbt_new0(gras_trp_sg_sock_data_t, 1);
-  data->rdv = ((gras_trp_sg_sock_data_t *)server->data)->rdv;
+  data = xbt_new(gras_trp_sg_sock_data_t, 1);
   data->from_process = SIMIX_process_self();
   data->from_process = SIMIX_process_self();
+  data->to_process = pr.process;
+  data->to_host = peer;
+
+  /* initialize mutex and condition of the socket */
+  data->mutex = SIMIX_mutex_init();
+  data->cond = SIMIX_cond_init();
+  data->to_socket = pr.socket;
 
   sock->data = data;
   sock->incoming = 1;
 
 
   sock->data = data;
   sock->incoming = 1;
 
-  /* Create a smx comm object about this socket */
-  data->ongoing_msg_size = sizeof(s_gras_msg_t);
-  smx_comm_t comm = SIMIX_network_irecv(data->rdv,&(data->ongoing_msg),&(data->ongoing_msg_size));
-  xbt_dynar_push(pd->comms,&comm);
-
   DEBUG5("%s (PID %d) connects in %s mode to %s:%d",
          SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
          sock->meas ? "meas" : "regular", sock->peer_name, sock->peer_port);
 }
 
   DEBUG5("%s (PID %d) connects in %s mode to %s:%d",
          SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
          sock->meas ? "meas" : "regular", sock->peer_name, sock->peer_port);
 }
 
-void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock) {
-  gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
-  xbt_dict_t all_sockets = ((gras_trp_sg_plug_data_t)self->data)->sockets;
+void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock)
+{
 
 
-  /* Make sure that this socket was not opened so far */
-  char *sock_name=bprintf("%s:%d",gras_os_myname(),sock->port);
-  gras_socket_t old = xbt_dict_get_or_null(all_sockets,sock_name);
-  if (old)
-    THROW1(mismatch_error, 0,
-           "can't listen on address %s: port already in use.",
-           sock_name);
+  gras_hostdata_t *hd =
+    (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
+  gras_sg_portrec_t pr;
+  gras_trp_sg_sock_data_t *data;
+  volatile int found;
 
 
+  const char *host = SIMIX_host_get_name(SIMIX_host_self());
 
 
-  /* Create the data associated to the socket */
-  gras_trp_sg_sock_data_t *data = xbt_new0(gras_trp_sg_sock_data_t, 1);
-  data->rdv = SIMIX_rdv_create(sock_name);
-  data->from_process = SIMIX_process_self();
-  SIMIX_rdv_set_data(data->rdv,sock);
+  xbt_ex_t e;
 
 
-  sock->is_master = 1;
-  sock->incoming = 1;
+  xbt_assert0(hd, "Please run gras_process_init on each process");
 
 
-  sock->data = data;
+  sock->accepting = 0;          /* no such nuisance in SG */
+  found = 0;
+  TRY {
+    find_port(hd, sock->port, &pr);
+    found = 1;
+  } CATCH(e) {
+    if (e.category == mismatch_error)
+      xbt_ex_free(e);
+    else
+      RETHROW;
+  }
+
+  if (found)
+    THROW2(mismatch_error, 0,
+           "can't listen on address %s:%d: port already in use.",
+           host, sock->port);
+
+  pr.port = sock->port;
+  pr.meas = sock->meas;
+  pr.socket = sock;
+  pr.process = SIMIX_process_self();
+  xbt_dynar_push(hd->ports, &pr);
 
 
-  /* Register the socket to the set of sockets known simulation-wide */
-  xbt_dict_set(all_sockets,sock_name,sock,NULL); /* FIXME: add a function to raise a warning at simulation end for non-closed sockets */
+  /* Create the socket */
+  data = xbt_new(gras_trp_sg_sock_data_t, 1);
+  data->from_process = SIMIX_process_self();
+  data->to_process = NULL;
+  data->to_host = SIMIX_host_self();
 
 
-  /* Create a smx comm object about this socket */
-  data->ongoing_msg_size = sizeof(s_gras_msg_t);
-  smx_comm_t comm = SIMIX_network_irecv(data->rdv,&(data->ongoing_msg),&(data->ongoing_msg_size));
-  INFO2("irecv comm %p onto %p",comm,&(data->ongoing_msg));
-  xbt_dynar_push(pd->comms,&comm);
+  data->cond = SIMIX_cond_init();
+  data->mutex = SIMIX_mutex_init();
+
+  sock->data = data;
 
   VERB6("'%s' (%d) ears on %s:%d%s (%p)",
         SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
 
   VERB6("'%s' (%d) ears on %s:%d%s (%p)",
         SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
-        gras_os_myname(), sock->port, sock->meas ? " (mode meas)" : "", sock);
-  free(sock_name);
+        host, sock->port, sock->meas ? " (mode meas)" : "", sock);
+
 }
 
 }
 
-void gras_trp_sg_socket_close(gras_socket_t sock) {
-  gras_trp_procdata_t pd= (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
-  if (sock->is_master) {
-    /* server mode socket. Unregister it from 'OS' tables */
-    char *sock_name=bprintf("%s:%d",gras_os_myname(),sock->port);
+void gras_trp_sg_socket_close(gras_socket_t sock)
+{
+  gras_hostdata_t *hd =
+    (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
+  unsigned int cpt;
+  gras_sg_portrec_t pr;
 
 
-    xbt_dict_t sockets = ((gras_trp_sg_plug_data_t)sock->plugin->data)->sockets;
-    gras_socket_t old = xbt_dict_get_or_null(sockets,sock_name);
-    if (!old)
-      WARN2("socket_close called on the unknown server socket %p (port=%d)",
-            sock, sock->port);
-    xbt_dict_remove(sockets,sock_name);
-    free(sock_name);
+  XBT_IN1(" (sock=%p)", sock);
 
 
-  }
+  if (!sock)
+    return;
+
+  xbt_assert0(hd, "Please run gras_process_init on each process");
 
   if (sock->data) {
 
   if (sock->data) {
-    SIMIX_rdv_destroy(((gras_trp_sg_sock_data_t *) sock->data)->rdv);
+    SIMIX_cond_destroy(((gras_trp_sg_sock_data_t *) sock->data)->cond);
+    SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t *) sock->data)->mutex);
     free(sock->data);
   }
 
     free(sock->data);
   }
 
-  xbt_dynar_push(pd->sockets_to_close,&sock);
-  gras_msg_listener_awake();
-
+  if (sock->incoming && !sock->outgoing && sock->port >= 0) {
+    /* server mode socket. Unregister it from 'OS' tables */
+    xbt_dynar_foreach(hd->ports, cpt, pr) {
+      DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports));
+      if (pr.port == sock->port) {
+        xbt_dynar_cursor_rm(hd->ports, &cpt);
+        XBT_OUT;
+        return;
+      }
+    }
+    WARN2("socket_close called on the unknown incoming socket %p (port=%d)",
+          sock, sock->port);
+  }
   XBT_OUT;
 }
 
   XBT_OUT;
 }
 
@@ -213,7 +261,10 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
   char name[256];
   static unsigned int count = 0;
 
   char name[256];
   static unsigned int count = 0;
 
+  smx_action_t act;             /* simix action */
   gras_trp_sg_sock_data_t *sock_data;
   gras_trp_sg_sock_data_t *sock_data;
+  gras_trp_procdata_t trp_remote_proc;
+  gras_msg_procdata_t msg_remote_proc;
   gras_msg_t msg;               /* message to send */
 
   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
   gras_msg_t msg;               /* message to send */
 
   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
@@ -221,6 +272,7 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
   xbt_assert0(sock->meas,
               "SG chunk exchange shouldn't be used on non-measurement sockets");
 
   xbt_assert0(sock->meas,
               "SG chunk exchange shouldn't be used on non-measurement sockets");
 
+  SIMIX_mutex_lock(sock_data->mutex);
   sprintf(name, "Chunk[%d]", count++);
   /*initialize gras message */
   msg = xbt_new(s_gras_msg_t, 1);
   sprintf(name, "Chunk[%d]", count++);
   /*initialize gras message */
   msg = xbt_new(s_gras_msg_t, 1);
@@ -233,22 +285,73 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
   } else {
     msg->payl = NULL;
   }
   } else {
     msg->payl = NULL;
   }
-  SIMIX_network_send(sock_data->rdv,size,-1.,-1.,&msg,sizeof(msg),(smx_comm_t*)&(msg->comm),msg);
+
+
+  /* put his socket on the selectable socket queue */
+  trp_remote_proc = (gras_trp_procdata_t)
+    gras_libdata_by_name_from_remote("gras_trp", sock_data->to_process);
+  xbt_queue_push(trp_remote_proc->meas_selectable_sockets, &sock);
+
+  /* put message on msg_queue */
+  msg_remote_proc = (gras_msg_procdata_t)
+    gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
+
+  xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas, msg);
+
+  /* wait for the receiver */
+  SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
+
+  /* creates simix action and waits its ends, waits in the sender host
+     condition */
+  DEBUG5("send chunk %s from %s to  %s:%d (size=%ld)",
+         name, SIMIX_host_get_name(SIMIX_host_self()),
+         SIMIX_host_get_name(sock_data->to_host), sock->peer_port, size);
+
+  act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host,
+                                 name, size, -1);
+  SIMIX_register_action_to_condition(act, sock_data->cond);
+  SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
+  SIMIX_unregister_action_to_condition(act, sock_data->cond);
+  /* error treatmeant (FIXME) */
+
+  /* cleanup structures */
+  SIMIX_action_destroy(act);
+
+  SIMIX_mutex_unlock(sock_data->mutex);
 }
 
 int gras_trp_sg_chunk_recv(gras_socket_t sock,
                            char *data, unsigned long int size)
 {
   gras_trp_sg_sock_data_t *sock_data;
 }
 
 int gras_trp_sg_chunk_recv(gras_socket_t sock,
                            char *data, unsigned long int size)
 {
   gras_trp_sg_sock_data_t *sock_data;
+  gras_trp_sg_sock_data_t *remote_sock_data;
+  gras_socket_t remote_socket = NULL;
   gras_msg_t msg_got;
   gras_msg_t msg_got;
-  smx_comm_t comm;
+  gras_msg_procdata_t msg_procdata =
+    (gras_msg_procdata_t) gras_libdata_by_name("gras_msg");
+  gras_trp_procdata_t trp_proc =
+    (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
 
   xbt_assert0(sock->meas,
               "SG chunk exchange shouldn't be used on non-measurement sockets");
 
   xbt_assert0(sock->meas,
               "SG chunk exchange shouldn't be used on non-measurement sockets");
+  xbt_queue_shift_timed(trp_proc->meas_selectable_sockets,
+                        &remote_socket, 60);
+
+  if (remote_socket == NULL) {
+    THROW0(timeout_error, 0, "Timeout");
+  }
+
+  remote_sock_data = (gras_trp_sg_sock_data_t *) remote_socket->data;
+  msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas);
 
   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
 
 
   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
 
-  SIMIX_network_recv(sock_data->rdv,-1.,&msg_got,NULL,&comm);
+  /* ok, I'm here, you can continue the communication */
+  SIMIX_cond_signal(remote_sock_data->cond);
+
+  SIMIX_mutex_lock(remote_sock_data->mutex);
+  /* wait for communication end */
+  SIMIX_cond_wait(remote_sock_data->cond, remote_sock_data->mutex);
 
   if (msg_got->payl_size != size)
     THROW5(mismatch_error, 0,
 
   if (msg_got->payl_size != size)
     THROW5(mismatch_error, 0,
@@ -264,5 +367,6 @@ int gras_trp_sg_chunk_recv(gras_socket_t sock,
     xbt_free(msg_got->payl);
 
   xbt_free(msg_got);
     xbt_free(msg_got->payl);
 
   xbt_free(msg_got);
+  SIMIX_mutex_unlock(remote_sock_data->mutex);
   return 0;
 }
   return 0;
 }
index ddbf0f8..de76134 100644 (file)
@@ -225,7 +225,7 @@ static gras_socket_t gras_trp_sock_socket_accept(gras_socket_t sock)
   res->plugin = sock->plugin;
   res->incoming = sock->incoming;
   res->outgoing = sock->outgoing;
   res->plugin = sock->plugin;
   res->incoming = sock->incoming;
   res->outgoing = sock->outgoing;
-  res->is_master = 0;
+  res->accepting = 0;
   res->sd = sd;
   res->port = -1;
 
   res->sd = sd;
   res->port = -1;
 
@@ -266,10 +266,8 @@ static void gras_trp_sock_socket_close(gras_socket_t sock)
 
   VERB1("close tcp connection %d", sock->sd);
 
 
   VERB1("close tcp connection %d", sock->sd);
 
-  if (tcp_close(sock->sd) < 0) {
-    WARN3("error while closing tcp socket %d: %d (%s)\n",
-          sock->sd, sock_errno, sock_errstr(sock_errno));
-  }
+  /* ask the listener to close the socket */
+  gras_msg_listener_close_socket(sock->sd);
 }
 
 /************************************/
 }
 
 /************************************/
index b242ccb..2588813 100644 (file)
@@ -56,7 +56,7 @@ typedef struct s_gras_socket {
 
   int incoming:1;               /* true if we can read from this sock */
   int outgoing:1;               /* true if we can write on this sock */
 
   int incoming:1;               /* true if we can read from this sock */
   int outgoing:1;               /* true if we can write on this sock */
-  int is_master:1;              /* true if master incoming sock */
+  int accepting:1;              /* true if master incoming sock in tcp */
   int meas:1;                   /* true if this is an experiment socket instead of messaging */
   int valid:1;                  /* false if a select returned that the peer quitted, forcing us to "close" the socket */
   int moredata:1;               /* TCP socket use a buffer and read operation get as much 
   int meas:1;                   /* true if this is an experiment socket instead of messaging */
   int valid:1;                  /* false if a select returned that the peer quitted, forcing us to "close" the socket */
   int moredata:1;               /* TCP socket use a buffer and read operation get as much 
index f7fb45f..fea5400 100644 (file)
@@ -103,10 +103,10 @@ void *gras_libdata_by_name_from_procdata(const char *name,
   }
   return res;
 }
   }
   return res;
 }
-void *gras_libdata_by_id(int id) {
-  return gras_libdata_by_id_from_procdata(id,gras_procdata_get());
-}
-void *gras_libdata_by_id_from_procdata(int id,gras_procdata_t *pd) {
+
+void *gras_libdata_by_id(int id)
+{
+  gras_procdata_t *pd = gras_procdata_get();
   if (xbt_set_length(pd->libdata) < xbt_dynar_length(_gras_procdata_fabrics)) {
     /* Damn, some new modules were added since procdata_init(). Amok? */
     /* Get 'em all */
   if (xbt_set_length(pd->libdata) < xbt_dynar_length(_gras_procdata_fabrics)) {
     /* Damn, some new modules were added since procdata_init(). Amok? */
     /* Get 'em all */
index 569fd3a..bc39cf2 100644 (file)
@@ -48,6 +48,7 @@ void gras_process_init()
     /* First process on this host */
     hd = xbt_new(gras_hostdata_t, 1);
     hd->refcount = 1;
     /* First process on this host */
     hd = xbt_new(gras_hostdata_t, 1);
     hd->refcount = 1;
+    hd->ports = xbt_dynar_new(sizeof(gras_sg_portrec_t), NULL);
     SIMIX_host_set_data(SIMIX_host_self(), (void *) hd);
   } else {
     hd->refcount++;
     SIMIX_host_set_data(SIMIX_host_self(), (void *) hd);
   } else {
     hd->refcount++;
@@ -61,7 +62,9 @@ void gras_process_init()
   } else
     pd->ppid = -1;
 
   } else
     pd->ppid = -1;
 
-  trp_pd->sockets_to_close = xbt_dynar_new(sizeof(gras_socket_t),NULL);
+  trp_pd->msg_selectable_sockets = xbt_queue_new(0, sizeof(gras_socket_t));
+
+  trp_pd->meas_selectable_sockets = xbt_queue_new(0, sizeof(gras_socket_t));
 
   VERB2("Creating process '%s' (%d)",
         SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid());
 
   VERB2("Creating process '%s' (%d)",
         SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid());
@@ -83,7 +86,9 @@ void gras_process_exit()
   gras_trp_procdata_t trp_pd =
     (gras_trp_procdata_t) gras_libdata_by_name("gras_trp");
 
   gras_trp_procdata_t trp_pd =
     (gras_trp_procdata_t) gras_libdata_by_name("gras_trp");
 
-  xbt_dynar_free(&trp_pd->sockets_to_close);
+  xbt_queue_free(&trp_pd->msg_selectable_sockets);
+
+  xbt_queue_free(&trp_pd->meas_selectable_sockets);
 
 
   xbt_assert0(hd, "Run gras_process_init (ie, gras_init)!!");
 
 
   xbt_assert0(hd, "Run gras_process_init (ie, gras_init)!!");
@@ -110,6 +115,7 @@ void gras_process_exit()
     gras_socket_close(sock_iter);
   }
   if (!--(hd->refcount)) {
     gras_socket_close(sock_iter);
   }
   if (!--(hd->refcount)) {
+    xbt_dynar_free(&hd->ports);
     free(hd);
   }
   gras_procdata_exit();
     free(hd);
   }
   gras_procdata_exit();
index 39a31d6..9162237 100644 (file)
@@ -44,7 +44,5 @@ typedef struct {
 gras_procdata_t *gras_procdata_get(void);
 void *gras_libdata_by_name_from_procdata(const char *name,
                                          gras_procdata_t * pd);
 gras_procdata_t *gras_procdata_get(void);
 void *gras_libdata_by_name_from_procdata(const char *name,
                                          gras_procdata_t * pd);
-void *gras_libdata_by_id_from_procdata(int id,gras_procdata_t *pd);
-
 
 #endif /* GRAS_VIRTU_PRIVATE_H */
 
 #endif /* GRAS_VIRTU_PRIVATE_H */
index dab2df5..51090f7 100644 (file)
@@ -26,21 +26,20 @@ typedef struct {
 typedef struct {
   int refcount;
 
 typedef struct {
   int refcount;
 
-  /* Nothing in particular (anymore) */
+  xbt_dynar_t ports;
+
 } gras_hostdata_t;
 
 /* data for each socket (FIXME: find a better location for that)*/
 typedef struct {
 } gras_hostdata_t;
 
 /* data for each socket (FIXME: find a better location for that)*/
 typedef struct {
-  smx_rdv_t rdv;
+  smx_process_t from_process;
+  smx_process_t to_process;
 
 
-  smx_process_t from_process; /* the one who created the socket */
   smx_host_t to_host;           /* Who's on other side */
   smx_host_t to_host;           /* Who's on other side */
-  smx_comm_t comm; /* Ongoing communication */
-
-  s_gras_msg_t ongoing_msg;
-  size_t ongoing_msg_size;
 
 
-  gras_socket_t to_socket; /* If != NULL, this socket was created as accept when receiving onto to_socket */
+  smx_cond_t cond;
+  smx_mutex_t mutex;
+  gras_socket_t to_socket;
 } gras_trp_sg_sock_data_t;
 
 
 } gras_trp_sg_sock_data_t;
 
 
index d746e71..82785d0 100644 (file)
@@ -207,7 +207,7 @@ XBT_PUBLIC(void*) SIMIX_rdv_get_data(smx_rdv_t rdv);
 /*****Communication Requests*****/
 XBT_PUBLIC(void) SIMIX_communication_cancel(smx_comm_t comm);
 XBT_PUBLIC(double) SIMIX_communication_get_remains(smx_comm_t comm);
 /*****Communication Requests*****/
 XBT_PUBLIC(void) SIMIX_communication_cancel(smx_comm_t comm);
 XBT_PUBLIC(double) SIMIX_communication_get_remains(smx_comm_t comm);
-XBT_PUBLIC(void *) SIMIX_communication_get_sentdata(smx_comm_t comm);
+XBT_PUBLIC(void *) SIMIX_communication_get_data(smx_comm_t comm);
 
 /*****Networking*****/
 XBT_PUBLIC(void) SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate,
 
 /*****Networking*****/
 XBT_PUBLIC(void) SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate,
index f6f3230..3425a41 100644 (file)
@@ -64,7 +64,7 @@ m_task_t MSG_mailbox_get_head(msg_mailbox_t mailbox)
   if(!comm)
     return NULL; 
   
   if(!comm)
     return NULL; 
   
-  return (m_task_t)SIMIX_communication_get_sentdata(comm);
+  return (m_task_t)SIMIX_communication_get_data(comm);
 }
 
 int
 }
 
 int
index 9df8f76..26e0c5e 100644 (file)
@@ -19,7 +19,6 @@ XBT_LOG_EXTERNAL_CATEGORY(simix_host);
 XBT_LOG_EXTERNAL_CATEGORY(simix_process);
 XBT_LOG_EXTERNAL_CATEGORY(simix_synchro);
 XBT_LOG_EXTERNAL_CATEGORY(simix_context);
 XBT_LOG_EXTERNAL_CATEGORY(simix_process);
 XBT_LOG_EXTERNAL_CATEGORY(simix_synchro);
 XBT_LOG_EXTERNAL_CATEGORY(simix_context);
-XBT_LOG_EXTERNAL_CATEGORY(simix_network);
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_kernel, simix,
                                 "Logging specific to SIMIX (kernel)");
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_kernel, simix,
                                 "Logging specific to SIMIX (kernel)");
 
@@ -58,7 +57,6 @@ void SIMIX_global_init(int *argc, char **argv)
     XBT_LOG_CONNECT(simix_process, simix);
     XBT_LOG_CONNECT(simix_synchro, simix);
     XBT_LOG_CONNECT(simix_context, simix);
     XBT_LOG_CONNECT(simix_process, simix);
     XBT_LOG_CONNECT(simix_synchro, simix);
     XBT_LOG_CONNECT(simix_context, simix);
-    XBT_LOG_CONNECT(simix_network, simix);
 
     simix_global = xbt_new0(s_SIMIX_Global_t, 1);
 
 
     simix_global = xbt_new0(s_SIMIX_Global_t, 1);
 
@@ -419,10 +417,8 @@ double SIMIX_solve(xbt_fifo_t actions_done, xbt_fifo_t actions_failed)
         if (smx_action) {
           /* Copy the transfered data of the completed communication actions */
           /* FIXME: find a better way to determine if its a comm action */
         if (smx_action) {
           /* Copy the transfered data of the completed communication actions */
           /* FIXME: find a better way to determine if its a comm action */
-          if(smx_action->data != NULL) {
-            CDEBUG1(simix_network,"Communication %p finished",smx_action->data);
+          if(smx_action->data != NULL)
             SIMIX_network_copy_data((smx_comm_t)smx_action->data);
             SIMIX_network_copy_data((smx_comm_t)smx_action->data);
-          }
           SIMIX_action_signal_all(smx_action);      
         }
       }
           SIMIX_action_signal_all(smx_action);      
         }
       }
index 4c32439..7008ec3 100644 (file)
@@ -87,7 +87,7 @@ smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rdv, smx_comm_type_t type)
   }
 
   /* no relevant request found. Return NULL */
   }
 
   /* no relevant request found. Return NULL */
-  DEBUG0("Communication request not found. I assume that other side will arrive later on.");
+  DEBUG0("Communication request not found");
   return NULL;
 }
 
   return NULL;
 }
 
@@ -300,32 +300,36 @@ double SIMIX_communication_get_remains(smx_comm_t comm)
  */
 void SIMIX_network_copy_data(smx_comm_t comm)
 {
  */
 void SIMIX_network_copy_data(smx_comm_t comm)
 {
+  /* If there is no data to be copy then return */
+  if(!comm->src_buff || !comm->dst_buff)
+    return;
+  
   size_t src_buff_size = comm->src_buff_size;
   size_t dst_buff_size = *comm->dst_buff_size;
   size_t src_buff_size = comm->src_buff_size;
   size_t dst_buff_size = *comm->dst_buff_size;
+  
+  /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
+  dst_buff_size = MIN(dst_buff_size, src_buff_size);
+  
+  /* Update the receiver's buffer size to the copied amount */
+  if (comm->dst_buff_size)
+    *comm->dst_buff_size = dst_buff_size;
 
 
-  xbt_assert(src_buff_size == dst_buff_size);
-
-  /* If there is no data to copy then return */
-  if(!comm->src_buff || !comm->dst_buff || dst_buff_size == 0)
+  if(dst_buff_size == 0)
     return;
 
   memcpy(comm->dst_buff, comm->src_buff, dst_buff_size);
 
     return;
 
   memcpy(comm->dst_buff, comm->src_buff, dst_buff_size);
 
-  DEBUG6("Copying comm %p data from %s -> %s (%zu bytes %p->%p)",
+  DEBUG4("Copying comm %p data from %s -> %s (%zu bytes)", 
          comm, comm->src_proc->smx_host->name, comm->dst_proc->smx_host->name,
          comm, comm->src_proc->smx_host->name, comm->dst_proc->smx_host->name,
-         dst_buff_size,comm->src_buff,comm->dst_buff);
+         dst_buff_size);
 }
 
 /**
  *  \brief Return the user data associated to the communication
 }
 
 /**
  *  \brief Return the user data associated to the communication
- *
- *  In MSG and GRAS, that data is the exchanged task/msg itself, since
- *  (i) In MSG, the receiver still wants to read the task although the communication didn't complete.
- *  (ii) In GRAS, we need to retrieve that gras_msg_t during the select
  *  \param comm The communication
  *  \return the user data
  */
  *  \param comm The communication
  *  \return the user data
  */
-void *SIMIX_communication_get_sentdata(smx_comm_t comm)
+void *SIMIX_communication_get_data(smx_comm_t comm)
 {
   return comm->data;
 }
 {
   return comm->data;
 }
@@ -423,7 +427,6 @@ smx_comm_t SIMIX_network_irecv(smx_rdv_t rdv, void *dst_buff, size_t *dst_buff_s
   comm->dst_buff = dst_buff;
   comm->dst_buff_size = dst_buff_size;
 
   comm->dst_buff = dst_buff;
   comm->dst_buff_size = dst_buff_size;
 
-  DEBUG2("Receive data for %p into %p",comm,comm->dst_buff);
   SIMIX_communication_start(comm);
   return comm;
 }
   SIMIX_communication_start(comm);
   return comm;
 }