Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Ongoing work to port GRAS to smx_network. Not working yet
[simgrid.git] / src / gras / Transport / transport_plugin_sg.c
index 8918cb4..7b429ac 100644 (file)
@@ -139,9 +139,9 @@ void gras_trp_sg_socket_client(gras_trp_plugin_t self,
   data->to_host = peer;
 
   /* initialize mutex and condition of the socket */
-  data->mutex = SIMIX_mutex_init();
-  data->cond = SIMIX_cond_init();
-  data->to_socket = pr.socket;
+  data->rdv_server = pr.rdv;
+  data->rdv_client = SIMIX_rdv_create(NULL);
+  data->im_server = 0;
 
   sock->data = data;
   sock->incoming = 1;
@@ -185,8 +185,8 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock)
 
   pr.port = sock->port;
   pr.meas = sock->meas;
-  pr.socket = sock;
   pr.process = SIMIX_process_self();
+  pr.rdv = SIMIX_rdv_create(NULL);
   xbt_dynar_push(hd->ports, &pr);
 
   /* Create the socket */
@@ -194,9 +194,11 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self, gras_socket_t sock)
   data->from_process = SIMIX_process_self();
   data->to_process = NULL;
   data->to_host = SIMIX_host_self();
-
-  data->cond = SIMIX_cond_init();
-  data->mutex = SIMIX_mutex_init();
+  data->rdv_server = pr.rdv;
+  data->rdv_client = NULL;
+  data->im_server = 0;
+  data->comm_recv = SIMIX_network_irecv(pr.rdv,NULL,0);
+  INFO1("Comm %p",data->comm_recv);
 
   sock->data = data;
 
@@ -221,8 +223,7 @@ void gras_trp_sg_socket_close(gras_socket_t sock)
   xbt_assert0(hd, "Please run gras_process_init on each process");
 
   if (sock->data) {
-    SIMIX_cond_destroy(((gras_trp_sg_sock_data_t *) sock->data)->cond);
-    SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t *) sock->data)->mutex);
+    /* FIXME: kill the rdv point if receiver side */
     free(sock->data);
   }
 
@@ -257,72 +258,39 @@ void gras_trp_sg_chunk_send(gras_socket_t sock,
 void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
                                 const char *data, unsigned long int size)
 {
+#ifdef KILLME
   char name[256];
   static unsigned int count = 0;
 
   smx_action_t act;             /* simix action */
-  gras_trp_sg_sock_data_t *sock_data;
   gras_trp_procdata_t trp_remote_proc;
   gras_msg_procdata_t msg_remote_proc;
   gras_msg_t msg;               /* message to send */
+#endif
 
-  sock_data = (gras_trp_sg_sock_data_t *) sock->data;
-
+  gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *) sock->data;
   xbt_assert0(sock->meas,
               "SG chunk exchange shouldn't be used on non-measurement sockets");
 
-  SIMIX_mutex_lock(sock_data->mutex);
-  sprintf(name, "Chunk[%d]", count++);
-  /*initialize gras message */
-  msg = xbt_new(s_gras_msg_t, 1);
-  msg->expe = sock;
-  msg->payl_size = size;
-
-  if (data) {
-    msg->payl = (void *) xbt_malloc(size);
-    memcpy(msg->payl, data, size);
-  } else {
-    msg->payl = NULL;
-  }
-
-
-  /* put his socket on the selectable socket queue */
-  trp_remote_proc = (gras_trp_procdata_t)
-    gras_libdata_by_name_from_remote("gras_trp", sock_data->to_process);
-  xbt_queue_push(trp_remote_proc->meas_selectable_sockets, &sock);
-
-  /* put message on msg_queue */
-  msg_remote_proc = (gras_msg_procdata_t)
-    gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
-
-  xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas, msg);
-
-  /* wait for the receiver */
-  SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
 
   /* creates simix action and waits its ends, waits in the sender host
      condition */
-  DEBUG5("send chunk %s from %s to  %s:%d (size=%ld)",
-         name, SIMIX_host_get_name(SIMIX_host_self()),
+  DEBUG4("send chunk from %s to  %s:%d (size=%ld)",
+         SIMIX_host_get_name(SIMIX_host_self()),
          SIMIX_host_get_name(sock_data->to_host), sock->peer_port, size);
-
-  act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host,
-                                 name, size, -1);
-  SIMIX_register_action_to_condition(act, sock_data->cond);
-  SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
-  SIMIX_unregister_action_to_condition(act, sock_data->cond);
-  /* error treatmeant (FIXME) */
-
-  /* cleanup structures */
-  SIMIX_action_destroy(act);
-
-  SIMIX_mutex_unlock(sock_data->mutex);
+  //SIMIX_network_send(sock_data->rdv,size,1,-1,NULL,0,NULL,NULL);
+  THROW_UNIMPLEMENTED;
 }
 
 int gras_trp_sg_chunk_recv(gras_socket_t sock,
                            char *data, unsigned long int size)
 {
-  gras_trp_sg_sock_data_t *sock_data;
+  //gras_trp_sg_sock_data_t *sock_data =
+  //    (gras_trp_sg_sock_data_t *) sock->data;
+
+  //SIMIX_network_recv(sock_data->rdv,-1,NULL,0,NULL);
+  THROW_UNIMPLEMENTED;
+#ifdef KILLME
   gras_trp_sg_sock_data_t *remote_sock_data;
   gras_socket_t remote_socket = NULL;
   gras_msg_t msg_got;
@@ -367,5 +335,6 @@ int gras_trp_sg_chunk_recv(gras_socket_t sock,
 
   xbt_free(msg_got);
   SIMIX_mutex_unlock(remote_sock_data->mutex);
+#endif
   return 0;
 }