Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Ongoing work to port GRAS to smx_network. Not working yet
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 18 May 2010 09:27:58 +0000 (09:27 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 18 May 2010 09:27:58 +0000 (09:27 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@7766 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/gras/Msg/gras_msg_listener.c
src/gras/Msg/msg_private.h
src/gras/Msg/rl_msg.c
src/gras/Msg/sg_msg.c
src/gras/Transport/sg_transport.c
src/gras/Transport/transport_plugin_sg.c
src/gras/Virtu/virtu_sg.h

index 4bc0bf2..5d8ef35 100644 (file)
@@ -18,7 +18,7 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg_read, gras_msg,
 #include "gras/Transport/transport_interface.h" /* gras_select */
 
 typedef struct s_gras_msg_listener_ {
-  xbt_queue_t incomming_messages;
+  xbt_queue_t incomming_messages; /* messages received from the wire and still to be used by master */
   xbt_queue_t socks_to_close;   /* let the listener close the sockets, since it may be selecting on them. Darwin don't like this trick */
   gras_socket_t wakeup_sock_listener_side;
   gras_socket_t wakeup_sock_master_side;
@@ -28,27 +28,26 @@ typedef struct s_gras_msg_listener_ {
 static void listener_function(void *p)
 {
   gras_msg_listener_t me = (gras_msg_listener_t) p;
-  s_gras_msg_t msg;
+  gras_msg_t msg;
   xbt_ex_t e;
   gras_msgtype_t msg_wakeup_listener_t =
     gras_msgtype_by_name("_wakeup_listener");
   DEBUG0("I'm the listener");
   while (1) {
-    DEBUG0("Selecting");
-    msg.expe = gras_trp_select(-1);
-    DEBUG0("Select returned something");
-    gras_msg_recv(msg.expe, &msg);
-    if (msg.type != msg_wakeup_listener_t)
-      xbt_queue_push(me->incomming_messages, &msg);
+    msg = gras_msg_recv_any();
+    if (msg->type != msg_wakeup_listener_t)
+      xbt_queue_push(me->incomming_messages, msg);
     else {
-      char got = *(char *) msg.payl;
+      char got = *(char *) msg->payl;
       if (got == '1') {
         VERB0("Asked to get awake");
-        free(msg.payl);
+        free(msg->payl);
+        free(msg);
       } else {
         VERB0("Asked to die");
-        //        gras_socket_close(me->wakeup_sock_listener_side);
-        free(msg.payl);
+        //gras_socket_close(me->wakeup_sock_listener_side);
+        free(msg->payl);
+        free(msg);
         return;
       }
     }
index ec64fe0..79321a2 100644 (file)
@@ -65,6 +65,7 @@ void gras_msgtype_free(void *msgtype);
 
 
 /* functions to extract msg from socket or put it on wire (depend RL vs SG) */
+gras_msg_t gras_msg_recv_any(void); /* Get first message arriving */
 void gras_msg_recv(gras_socket_t sock, gras_msg_t msg /*OUT*/);
 void gras_msg_send_ext(gras_socket_t sock,
                        e_gras_msg_kind_t kind,
index 95925fb..ff6769f 100644 (file)
 XBT_LOG_EXTERNAL_CATEGORY(gras_msg);
 XBT_LOG_DEFAULT_CATEGORY(gras_msg);
 
+void gras_msg_recv(gras_socket_t sock, gras_msg_t msg);
+
+gras_msg_t gras_msg_recv_any(void) {
+  gras_msg_t msg = xbt_new0(s_gras_msg_t,1);
+  msg->expe = gras_trp_select(-1);
+  DEBUG0("Select returned something");
+  gras_msg_recv(msg->expe, msg);
+  return msg;
+}
+
 void gras_msg_send_ext(gras_socket_t sock,
                        e_gras_msg_kind_t kind,
                        unsigned long int ID,
index 7a690bc..6566719 100644 (file)
 #include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
 #include "gras/Transport/transport_private.h"   /* sock->data */
 
-XBT_LOG_EXTERNAL_CATEGORY(gras_msg);
-XBT_LOG_DEFAULT_CATEGORY(gras_msg);
+XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_msg);
 
 typedef void *gras_trp_bufdata_;
 
+gras_msg_t gras_msg_recv_any(void) {
+  gras_trp_procdata_t trp_proc =
+      (gras_trp_procdata_t) gras_libdata_by_name("gras_trp");
+  gras_msg_t msg;
+  /* Build a dynar of all communications I could get something from */
+  xbt_dynar_t comms = xbt_dynar_new(sizeof(smx_comm_t),NULL);
+  unsigned int cursor;
+  gras_socket_t sock;
+  gras_trp_sg_sock_data_t *sock_data;
+  xbt_dynar_foreach(trp_proc->sockets,cursor,sock) {
+    sock_data = (gras_trp_sg_sock_data_t *) sock->data;
+    if (sock_data->comm_recv) {
+      INFO2("Copy %p of size %d",sock_data->comm_recv,sizeof(smx_comm_t));
+      xbt_dynar_push(comms,&(sock_data->comm_recv));
+    }
+  }
+  VERB1("Wait on %ld 'sockets'",xbt_dynar_length(comms));
+  /* Wait for the end of any of these communications */
+  int got = SIMIX_network_waitany(comms);
+  smx_comm_t comm;
+
+  /* retrieve the message sent in that communication */
+  xbt_dynar_get_cpy(comms,got,&(comm));
+  msg=SIMIX_communication_get_data(comm);
+  VERB1("Got something. Communication %p's over",comm);
+
+  /* Reinstall a waiting communication on that rdv */
+  /* Get the sock again */
+  xbt_dynar_foreach(trp_proc->sockets,cursor,sock) {
+    sock_data = (gras_trp_sg_sock_data_t *) sock->data;
+    if (sock_data->comm_recv && sock_data->comm_recv == comm)
+      break;
+  }
+  sock_data = (gras_trp_sg_sock_data_t *) sock->data;
+  sock_data->comm_recv = SIMIX_network_irecv(
+      sock_data->im_server?sock_data->rdv_server:sock_data->rdv_client,
+      NULL,0);
+  SIMIX_communication_destroy(comm);
+
+  return msg;
+}
+
+
 void gras_msg_send_ext(gras_socket_t sock,
                        e_gras_msg_kind_t kind,
                        unsigned long int ID,
                        gras_msgtype_t msgtype, void *payload)
 {
-
-  smx_action_t act;             /* simix action */
-  gras_trp_sg_sock_data_t *sock_data;
-  gras_hostdata_t *hd;
-  gras_trp_procdata_t trp_remote_proc;
-  gras_msg_procdata_t msg_remote_proc;
-  gras_msg_t msg;               /* message to send */
   int whole_payload_size = 0;   /* msg->payload_size is used to memcpy the payload.
                                    This is used to report the load onto the simulator. It also counts the size of pointed stuff */
-
-  sock_data = (gras_trp_sg_sock_data_t *) sock->data;
-
-  hd = (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
-
-  xbt_assert1(!gras_socket_is_meas(sock),
-              "Asked to send a message on the measurement socket %p", sock);
+  gras_msg_t msg;               /* message to send */
 
   /*initialize gras message */
   msg = xbt_new(s_gras_msg_t, 1);
@@ -50,7 +79,7 @@ void gras_msg_send_ext(gras_socket_t sock,
   msg->type = msgtype;
   msg->ID = ID;
   if (kind == e_gras_msg_kind_rpcerror) {
-    /* error on remote host, carfull, payload is an exception */
+    /* error on remote host, careful, payload is an exception */
     msg->payl_size = gras_datadesc_size(gras_datadesc_by_name("ex_t"));
     msg->payl = xbt_malloc(msg->payl_size);
     whole_payload_size = gras_datadesc_memcpy(gras_datadesc_by_name("ex_t"),
@@ -72,6 +101,24 @@ void gras_msg_send_ext(gras_socket_t sock,
       whole_payload_size = gras_datadesc_memcpy(msgtype->ctn_type,
                                                 payload, msg->payl);
   }
+  gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *) sock->data;
+  smx_comm_t comm;
+  SIMIX_network_send(sock_data->im_server ? sock_data->rdv_client : sock_data->rdv_client,
+      whole_payload_size,-1,-1,&msg,sizeof(void*),&comm,msg);
+
+#ifdef KILLME
+  smx_action_t act;             /* simix action */
+  gras_hostdata_t *hd;
+  gras_trp_procdata_t trp_remote_proc;
+  gras_msg_procdata_t msg_remote_proc;
+
+  sock_data = (gras_trp_sg_sock_data_t *) sock->data;
+
+  hd = (gras_hostdata_t *) SIMIX_host_get_data(SIMIX_host_self());
+
+  xbt_assert1(!gras_socket_is_meas(sock),
+              "Asked to send a message on the measurement socket %p", sock);
+
 
   /* put the selectable socket on the queue */
   trp_remote_proc = (gras_trp_procdata_t)
@@ -106,28 +153,34 @@ void gras_msg_send_ext(gras_socket_t sock,
   /* cleanup structures */
   SIMIX_action_destroy(act);
   SIMIX_mutex_unlock(sock_data->mutex);
-
+#endif
   VERB0("Message sent");
 
 }
 
+#ifdef KILLMETOO
 /*
  * receive the next message on the given socket.
  */
 void gras_msg_recv(gras_socket_t sock, gras_msg_t msg)
 {
 
-  gras_trp_sg_sock_data_t *sock_data;
-  gras_trp_sg_sock_data_t *remote_sock_data;
-  gras_hostdata_t *remote_hd;
+  gras_trp_sg_sock_data_t *sock_data =
+       (gras_trp_sg_sock_data_t *) sock->data;
   gras_msg_t msg_got;
-  gras_msg_procdata_t msg_procdata =
-    (gras_msg_procdata_t) gras_libdata_by_name("gras_msg");
+  size_t size_got = sizeof(void*);
 
   xbt_assert1(!gras_socket_is_meas(sock),
               "Asked to receive a message on the measurement socket %p",
               sock);
 
+  SIMIX_network_recv(sock_data->rdv,-1,&msg_got,&size_got,NULL);
+#ifdef KILLME
+  gras_trp_sg_sock_data_t *remote_sock_data;
+  gras_hostdata_t *remote_hd;
+  gras_msg_procdata_t msg_procdata =
+    (gras_msg_procdata_t) gras_libdata_by_name("gras_msg");
+
   xbt_assert0(msg, "msg is an out parameter of gras_msg_recv...");
 
   sock_data = (gras_trp_sg_sock_data_t *) sock->data;
@@ -156,7 +209,8 @@ void gras_msg_recv(gras_socket_t sock, gras_msg_t msg)
   memcpy(msg, msg_got, sizeof(s_gras_msg_t));
   xbt_free(msg_got);
   SIMIX_mutex_unlock(remote_sock_data->mutex);
-
+#endif
   VERB3("Received a message type '%s' kind '%s' ID %lu",        // from %s",
         msg->type->name, e_gras_msg_kind_names[msg->kind], msg->ID);
 }
+#endif
index b61ea49..83586d2 100644 (file)
 
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_trp);
 
-/* check transport_private.h for an explanation of this variable; this just need to be defined to NULL in SG */
+/* check transport_private.h for an explanation of this variable;
+ * this just need to be defined to NULL in SG */
 gras_socket_t _gras_lastly_selected_socket = NULL;
 
+#ifdef KILLME
 /**    
  * gras_trp_select:
  *
@@ -123,7 +125,7 @@ gras_socket_t gras_trp_select(double timeout)
 
   return res;
 }
-
+#endif
 
 /* dummy implementations of the functions used in RL mode */
 
index 8918cb4..7b429ac 100644 (file)
@@ -139,9 +139,9 @@ void gras_trp_sg_socket_client(gras_trp_plugin_t self,
   data->to_host = peer;
 
   /* initialize mutex and condition of the socket */
-  data->mutex = SIMIX_mutex_init();
-  data->cond = SIMIX_cond_init();
-  data->to_socket = pr.socket;
+  data->rdv_server = pr.rdv;
+  data->rdv_client = SIMIX_rdv_create(NULL);
+  data->im_server = 0;
 
   sock->data = data;
   sock->incoming = 1;
@@ -185,8 +185,8 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock)
 
   pr.port = sock->port;
   pr.meas = sock->meas;
-  pr.socket = sock;
   pr.process = SIMIX_process_self();
+  pr.rdv = SIMIX_rdv_create(NULL);
   xbt_dynar_push(hd->ports, &pr);
 
   /* Create the socket */
@@ -194,9 +194,11 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock)
   data->from_process = SIMIX_process_self();
   data->to_process = NULL;
   data->to_host = SIMIX_host_self();
-
-  data->cond = SIMIX_cond_init();
-  data->mutex = SIMIX_mutex_init();
+  data->rdv_server = pr.rdv;
+  data->rdv_client = NULL;
+  data->im_server = 0;
+  data->comm_recv = SIMIX_network_irecv(pr.rdv,NULL,0);
+  INFO1("Comm %p",data->comm_recv);
 
   sock->data = data;
 
@@ -221,8 +223,7 @@ void gras_trp_sg_socket_close(gras_socket_t sock)
   xbt_assert0(hd, "Please run gras_process_init on each process");
 
   if (sock->data) {
-    SIMIX_cond_destroy(((gras_trp_sg_sock_data_t *) sock->data)->cond);
-    SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t *) sock->data)->mutex);
+    /* FIXME: kill the rdv point if receiver side */
     free(sock->data);
   }
 
@@ -257,72 +258,39 @@ void gras_trp_sg_chunk_send(gras_socket_t sock,
 void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
                                 const char *data, unsigned long int size)
 {
+#ifdef KILLME
   char name[256];
   static unsigned int count = 0;
 
   smx_action_t act;             /* simix action */
-  gras_trp_sg_sock_data_t *sock_data;
   gras_trp_procdata_t trp_remote_proc;
   gras_msg_procdata_t msg_remote_proc;
   gras_msg_t msg;               /* message to send */
+#endif
 
-  sock_data = (gras_trp_sg_sock_data_t *) sock->data;
-
+  gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *) sock->data;
   xbt_assert0(sock->meas,
               "SG chunk exchange shouldn't be used on non-measurement sockets");
 
-  SIMIX_mutex_lock(sock_data->mutex);
-  sprintf(name, "Chunk[%d]", count++);
-  /*initialize gras message */
-  msg = xbt_new(s_gras_msg_t, 1);
-  msg->expe = sock;
-  msg->payl_size = size;
-
-  if (data) {
-    msg->payl = (void *) xbt_malloc(size);
-    memcpy(msg->payl, data, size);
-  } else {
-    msg->payl = NULL;
-  }
-
-
-  /* put his socket on the selectable socket queue */
-  trp_remote_proc = (gras_trp_procdata_t)
-    gras_libdata_by_name_from_remote("gras_trp", sock_data->to_process);
-  xbt_queue_push(trp_remote_proc->meas_selectable_sockets, &sock);
-
-  /* put message on msg_queue */
-  msg_remote_proc = (gras_msg_procdata_t)
-    gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
-
-  xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas, msg);
-
-  /* wait for the receiver */
-  SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
 
   /* creates simix action and waits its ends, waits in the sender host
      condition */
-  DEBUG5("send chunk %s from %s to  %s:%d (size=%ld)",
-         name, SIMIX_host_get_name(SIMIX_host_self()),
+  DEBUG4("send chunk from %s to  %s:%d (size=%ld)",
+         SIMIX_host_get_name(SIMIX_host_self()),
          SIMIX_host_get_name(sock_data->to_host), sock->peer_port, size);
-
-  act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host,
-                                 name, size, -1);
-  SIMIX_register_action_to_condition(act, sock_data->cond);
-  SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
-  SIMIX_unregister_action_to_condition(act, sock_data->cond);
-  /* error treatmeant (FIXME) */
-
-  /* cleanup structures */
-  SIMIX_action_destroy(act);
-
-  SIMIX_mutex_unlock(sock_data->mutex);
+  //SIMIX_network_send(sock_data->rdv,size,1,-1,NULL,0,NULL,NULL);
+  THROW_UNIMPLEMENTED;
 }
 
 int gras_trp_sg_chunk_recv(gras_socket_t sock,
                            char *data, unsigned long int size)
 {
-  gras_trp_sg_sock_data_t *sock_data;
+  //gras_trp_sg_sock_data_t *sock_data =
+  //    (gras_trp_sg_sock_data_t *) sock->data;
+
+  //SIMIX_network_recv(sock_data->rdv,-1,NULL,0,NULL);
+  THROW_UNIMPLEMENTED;
+#ifdef KILLME
   gras_trp_sg_sock_data_t *remote_sock_data;
   gras_socket_t remote_socket = NULL;
   gras_msg_t msg_got;
@@ -367,5 +335,6 @@ int gras_trp_sg_chunk_recv(gras_socket_t sock,
 
   xbt_free(msg_got);
   SIMIX_mutex_unlock(remote_sock_data->mutex);
+#endif
   return 0;
 }
index c1a57a9..f62517c 100644 (file)
@@ -17,8 +17,9 @@
 typedef struct {
   int port;                     /* list of ports used by a server socket */
   int meas;                     /* (boolean) the channel is for measurements or for messages */
-  smx_process_t process;
-  gras_socket_t socket;
+  smx_process_t process; /* process listening */
+  smx_rdv_t rdv; /* rendez-vous point to the listener */
+//  gras_socket_t socket; FIXME KILLME
 } gras_sg_portrec_t;
 
 /* Data for each host */
@@ -36,9 +37,10 @@ typedef struct {
 
   smx_host_t to_host;           /* Who's on other side */
 
-  smx_cond_t cond;
-  smx_mutex_t mutex;
-  gras_socket_t to_socket;
+  smx_rdv_t rdv_server; /* The rendez-vous point to use */
+  smx_rdv_t rdv_client; /* The rendez-vous point to use */
+  int im_server:1;
+  smx_comm_t comm_recv; /* The comm of irecv on receiving sockets */
 } gras_trp_sg_sock_data_t;