Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Cleanup network prototype
[simgrid.git] / src / simix / smx_network.c
index 8890f81..5f40e44 100644 (file)
  *  \param name The name of the rendez-vous point
  *  \return The created rendez-vous point
  */
-smx_rvpoint_t SIMIX_rvpoint_create(char *name)
+smx_rdv_t SIMIX_rdv_create(char *name)
 {
-  smx_rvpoint_t rvp = xbt_new0(s_smx_rvpoint_t, 1);
+  smx_rdv_t rvp = xbt_new0(s_smx_rvpoint_t, 1);
   rvp->name = xbt_strdup(name);
   rvp->read = SIMIX_mutex_init();
   rvp->write = SIMIX_mutex_init();
   rvp->comm_mutex = SIMIX_mutex_init();
   rvp->comm_fifo = xbt_fifo_new();
-  
+
   return rvp;
 }
 
@@ -34,7 +34,7 @@ smx_rvpoint_t SIMIX_rvpoint_create(char *name)
  *  \brief Destroy a rendez-vous point
  *  \param name The rendez-vous point to destroy
  */
-void SIMIX_rvpoint_destroy(smx_rvpoint_t rvp)
+void SIMIX_rdv_destroy(smx_rdv_t rvp)
 {
   xbt_free(rvp->name);
   SIMIX_mutex_destroy(rvp->read);
@@ -50,56 +50,39 @@ void SIMIX_rvpoint_destroy(smx_rvpoint_t rvp)
  *  \param rvp The rendez-vous point
  *  \param comm The communication request
  */
-static inline void SIMIX_rvpoint_push(smx_rvpoint_t rvp, smx_comm_t comm)
+static inline void SIMIX_rdv_push(smx_rdv_t rvp, smx_comm_t comm)
 {
   xbt_fifo_push(rvp->comm_fifo, comm);
 }
 
 /**
- *  \brief Checks if there is a receive communication request queued in a rendez-vous
+ *  \brief Checks if there is a communication request queued in a rendez-vous matching our needs
  *  \param rvp The rendez-vous with the queue
- *  \return The communication request if found, NULL otherwise.
+ *  \param look_for_src boolean. True: we are receiver looking for sender; False: other way round
+ *  \return The communication request if found, a newly created one otherwise.
  */
-smx_comm_t SIMIX_rvpoint_get_receiver(smx_rvpoint_t rvp)
-{
+smx_comm_t SIMIX_rdv_get_request_or_create(smx_rdv_t rvp, int look_for_src) {
   /* Get a communication request from the rendez-vous queue. If it is a receive
      request then return it, otherwise put it again in the queue and return NULL
    */
   smx_comm_t comm_head = xbt_fifo_shift(rvp->comm_fifo);
 
-  if(comm_head != NULL && comm_head->dst_host != NULL)
-    return comm_head;
-  
-  xbt_fifo_unshift(rvp->comm_fifo, comm_head);
-  return NULL;
-}
+  if(comm_head != NULL) {
+    if (( look_for_src && comm_head->src_host != NULL) ||
+        (!look_for_src && comm_head->dst_host != NULL)) {
 
-/**
- *  \brief Checks if there is a send communication request queued in a rendez-vous
- *  \param rvp The rendez-vous with the queue
- *  \return The communication request if found, NULL otherwise.
- */
-smx_comm_t SIMIX_rvpoint_get_sender(smx_rvpoint_t rvp)
-{
-  /* Get a communication request from the rendez-vous queue. If it is a send
-     request then return it, otherwise put it again in the queue and return NULL
-   */
-  smx_comm_t comm_head = xbt_fifo_shift(rvp->comm_fifo);
-
-  if(comm_head != NULL && comm_head->src_host != NULL)
-    return comm_head;
-  
+      SIMIX_communication_use(comm_head);
+      return comm_head;
+    }
+  }
   xbt_fifo_unshift(rvp->comm_fifo, comm_head);
-  return NULL;
-}
 
-/**
- *  \brief Get the communication mutex of the rendez-vous point
- *  \param rvp The rendez-vous point
- */
-static inline smx_mutex_t SIMIX_rvpoint_get_comm_mutex(smx_rvpoint_t rvp)
-{
-  return rvp->comm_mutex;
+  /* no relevant peer found. Create a new comm action and set it up */
+  comm_head = SIMIX_communication_new(rvp);
+  SIMIX_rdv_push(rvp, comm_head);
+  SIMIX_communication_use(comm_head);
+
+  return comm_head;
 }
 
 /******************************************************************************/
@@ -112,16 +95,12 @@ static inline smx_mutex_t SIMIX_rvpoint_get_comm_mutex(smx_rvpoint_t rvp)
  *  \param receiver The process receiving the communication (by recv)
  *  \return the communication request
  */  
-smx_comm_t SIMIX_communication_new(smx_host_t src_host, smx_host_t dst_host,
-                                   smx_rvpoint_t rdv)
+smx_comm_t SIMIX_communication_new(smx_rdv_t rdv)
 {
   /* alloc structures */
   smx_comm_t comm = xbt_new0(s_smx_comm_t, 1);
   comm->cond = SIMIX_cond_init();
 
-  /* initialize them */
-  comm->src_host = src_host;
-  comm->dst_host = dst_host;
   comm->rdv = rdv;
 
   return comm;
@@ -166,120 +145,91 @@ static inline void SIMIX_communication_use(smx_comm_t comm)
 static inline void SIMIX_communication_start(smx_comm_t comm)
 {
   comm->act = SIMIX_action_communicate(comm->src_host, comm->dst_host, NULL, 
-                                       comm->data_size, comm->rate);
+      comm->data_size, comm->rate);
 
   SIMIX_register_action_to_condition(comm->act, comm->cond);
+}
+
+static inline void SIMIX_communication_wait_for_completion(smx_comm_t comm)
+{
   __SIMIX_cond_wait(comm->cond);
-  SIMIX_unregister_action_to_condition(comm->act, comm->cond);
+  if (comm->act) {
+    SIMIX_unregister_action_to_condition(comm->act, comm->cond);
+    SIMIX_action_destroy(comm->act);
+    comm->act=NULL;
+  }
+  /* Check for errors */
+  if (SIMIX_host_get_state(comm->dst_host) == 0){
+    THROW1(host_error, 0, "Destination host %s failed", comm->dst_host->name);
+  } else if (SIMIX_host_get_state(comm->src_host) == 0){
+    THROW1(host_error, 0, "Source host %s failed", comm->src_host->name);
+  } else if (SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED) {
+    THROW0(network_error, 0, "Link failure");
+  }
+
 }
-  
 /******************************************************************************/
 /*                        Synchronous Communication                           */
 /******************************************************************************/
+/* Throws:
+ *  - host_error if peer failed
+ *  - network_error if network failed */
 
-void SIMIX_network_send(smx_rvpoint_t rdv, void *data, size_t size, double timeout, double rate)
+void SIMIX_network_send(smx_rdv_t rdv, void *data, size_t size, double timeout, double rate)
 {
   /*double start_time = SIMIX_get_clock();*/
   void *smx_net_data;
-  smx_host_t my_host = SIMIX_host_self();
   smx_comm_t comm;
-  smx_mutex_t rvp_comm_mutex = SIMIX_rvpoint_get_comm_mutex(rdv);
 
-  /* Lock the rendez-vous point */
-  SIMIX_mutex_lock(rvp_comm_mutex);
   /* Copy the message to the network */
   /*FIXME here the MC should allocate the space from the network storage area */
   smx_net_data = xbt_malloc(size);
   memcpy(smx_net_data, data, size);
-    
-  /* Check if there is already a receive waiting in the rendez-vous point */
-  if((comm = SIMIX_rvpoint_get_receiver(rdv)) != NULL){
-    comm->src_host = my_host;
-    comm->data = smx_net_data;
-    comm->data_size = size;
-    comm->rate = rate;
-    SIMIX_communication_use(comm);
-
-    /* Unlock the rendez-vous point and start the communication action.*/
-    /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
-    SIMIX_mutex_unlock(rvp_comm_mutex);
-    SIMIX_communication_start(comm);   
-    
-  /* Nobody is at the rendez-vous point, so push the comm action into it */
-  }else{    
-    comm = SIMIX_communication_new(my_host, NULL, rdv);
-    comm->data = smx_net_data;
-    comm->data_size = size;
-    comm->rate = rate;
-    SIMIX_communication_use(comm);
-    SIMIX_rvpoint_push(rdv, comm);
-
-    /* Wait for communication completion */
-    /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
-    /* FIXME: add timeout checking stuff */
-    SIMIX_mutex_unlock (rvp_comm_mutex);
-    __SIMIX_cond_wait(comm->cond);
-  }
 
-  /* Check for errors */
-  if (SIMIX_host_get_state(comm->dst_host) == 0){
-    THROW1(host_error, 0, "Destination host %s failed", comm->dst_host->name);
-  }else if(SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED){
-    THROW0(network_error, 0, "Link failure");
-  }
-    
+  comm = SIMIX_rdv_get_request_or_create(rdv,0);
+  comm->src_host = SIMIX_host_self();
+  comm->data = smx_net_data;
+  comm->data_size = size;
+  comm->rate = rate;
+
+  /* If the receiver is already there, start the communication */
+  /* If it's not here already, it will start that communication itself later on */
+  if(comm->dst_host != NULL)
+    SIMIX_communication_start(comm);
+
+  /* Wait for communication completion */
+  /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
+  /* FIXME: add timeout checking stuff */
+  SIMIX_communication_wait_for_completion(comm);
+
   SIMIX_communication_destroy(comm);
-  return;
 }
 
-void SIMIX_network_recv(smx_rvpoint_t rdv, void **data, size_t *size, double timeout)
+void SIMIX_network_recv(smx_rdv_t rdv, void **data, size_t *size, double timeout)
 {
   /*double start_time = SIMIX_get_clock();*/
   smx_comm_t comm;
   smx_host_t my_host = SIMIX_host_self();
-  smx_mutex_t rvp_comm_mutex = SIMIX_rvpoint_get_comm_mutex(rdv);
-
-  /* Lock the rendez-vous point */
-  SIMIX_mutex_lock(rvp_comm_mutex);
-  
-  /* Check if there is already a send waiting in the rendez-vous point */
-  if((comm = SIMIX_rvpoint_get_sender(rdv)) != NULL){
-    comm->dst_host = my_host;
-    comm->dest_buff = data;
-    SIMIX_communication_use(comm);
-
-    /* Unlock the rendez-vous point and start the communication action.*/
-    /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
-    SIMIX_mutex_unlock(rvp_comm_mutex);
-    SIMIX_communication_start(comm);   
-
-  /* Nobody is at the rendez-vous point, so push the comm action into it */
-  }else{
-    comm = SIMIX_communication_new(NULL, my_host, rdv);
-    comm->dest_buff = data;
-    SIMIX_communication_use(comm);
-    SIMIX_rvpoint_push(rdv, comm);
-
-    /* Wait for communication completion */
-    /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
-    /* FIXME: add timeout checking stuff*/
-    SIMIX_mutex_unlock (rvp_comm_mutex);
-    __SIMIX_cond_wait(comm->cond);
-  }
 
-  /* Check for errors */
-  if (SIMIX_host_get_state(comm->src_host) == 0){
-    THROW1(host_error, 0, "Source host %s failed", comm->src_host->name);
-  }else if(SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED){
-    THROW0(network_error, 0, "Link failure");
-  }
+  comm = SIMIX_rdv_get_request_or_create(rdv,1);
+  comm->dst_host = SIMIX_host_self();
+  comm->dest_buff = data;
+
+  /* If the sender is already there, start the communication */
+  /* If it's not here already, it will start that communication itself later on */
+  if (comm->src_host != NULL)
+    SIMIX_communication_start(comm);
+
+  /* Wait for communication completion */
+  /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
+  /* FIXME: add timeout checking stuff */
+  SIMIX_communication_wait_for_completion(comm);
 
   /* We are OK, let's copy the message to receiver's buffer */
   *size = *size < comm->data_size ? *size : comm->data_size;
   memcpy(*data, comm->data, *size);
-  
+
   SIMIX_communication_destroy(comm);
-  return;
 }
 
 /******************************************************************************/
@@ -301,7 +251,7 @@ XBT_PUBLIC(int) SIMIX_network_test(smx_action_t comm)
 {
   if(SIMIX_action_get_state (comm) == SURF_ACTION_DONE){
     memcpy(comm->data
-    
+
   return SIMIX_action_get_state (comm) == SURF_ACTION_DONE ? TRUE : FALSE;
 }*/