From feb66679bd690f3df26956a35de11fba2ffcf61b Mon Sep 17 00:00:00 2001 From: mquinson Date: Mon, 5 Oct 2009 12:26:29 +0000 Subject: [PATCH] Cleanup network prototype git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6698 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/include/simix/datatypes.h | 2 +- src/include/simix/simix.h | 20 ++-- src/simix/private.h | 3 +- src/simix/smx_network.c | 206 +++++++++++++--------------------- 4 files changed, 90 insertions(+), 141 deletions(-) diff --git a/src/include/simix/datatypes.h b/src/include/simix/datatypes.h index 98b21a9f41..fd193beae7 100644 --- a/src/include/simix/datatypes.h +++ b/src/include/simix/datatypes.h @@ -53,7 +53,7 @@ SG_BEGIN_DECL() typedef struct s_smx_context *smx_context_t; /******************************* Networking ***********************************/ - typedef struct s_smx_rvpoint *smx_rvpoint_t; + typedef struct s_smx_rvpoint *smx_rdv_t; typedef struct s_smx_comm *smx_comm_t; diff --git a/src/include/simix/simix.h b/src/include/simix/simix.h index 6d62fd28a6..fd67fe5a19 100644 --- a/src/include/simix/simix.h +++ b/src/include/simix/simix.h @@ -175,22 +175,22 @@ XBT_PUBLIC(void) SIMIX_display_process_status(void); /************************** Comunication Handling *****************************/ /* Public */ -XBT_PUBLIC(smx_rvpoint_t) SIMIX_rvpoint_create(char *name); -XBT_PUBLIC(void) SIMIX_rvpoint_destroy(smx_rvpoint_t rvp); -XBT_PUBLIC(void) SIMIX_network_send(smx_rvpoint_t rdv, void *data, size_t size, double rate, double timeout); -XBT_PUBLIC(void) SIMIX_network_recv(smx_rvpoint_t rvp, void **data, size_t *size, double timeout); +XBT_PUBLIC(smx_rdv_t) SIMIX_rdv_create(char *name); +XBT_PUBLIC(void) SIMIX_rdv_destroy(smx_rdv_t rvp); +XBT_PUBLIC(void) SIMIX_network_send(smx_rdv_t rdv, void *data, size_t size, double rate, double timeout); +XBT_PUBLIC(void) SIMIX_network_recv(smx_rdv_t rvp, void **data, size_t *size, double timeout); XBT_PUBLIC(void) SIMIX_network_wait(smx_action_t comm); XBT_PUBLIC(int) SIMIX_network_test(smx_action_t comm); /* These should be private to SIMIX */ smx_comm_t SIMIX_communication_new(smx_host_t src_host, smx_host_t dst_host, - smx_rvpoint_t rdv); + smx_rdv_t rdv); void SIMIX_communication_destroy(smx_comm_t comm); -smx_comm_t SIMIX_rvpoint_get_receiver(smx_rvpoint_t rvp); -smx_comm_t SIMIX_rvpoint_get_sender(smx_rvpoint_t rvp); -static inline void SIMIX_rvpoint_push(smx_rvpoint_t rvp, smx_comm_t comm); -static inline smx_cond_t SIMIX_rvpoint_get_cond(smx_rvpoint_t rvp); -static inline smx_mutex_t SIMIX_rvpoint_get_comm_mutex(smx_rvpoint_t rvp); +smx_comm_t SIMIX_rvpoint_get_receiver_or_create_request(smx_rdv_t rvp); +smx_comm_t SIMIX_rvpoint_get_sender_or_create(smx_rdv_t rvp); +static inline void SIMIX_rvpoint_push(smx_rdv_t rvp, smx_comm_t comm); +static inline smx_cond_t SIMIX_rvpoint_get_cond(smx_rdv_t rvp); +static inline smx_mutex_t SIMIX_rvpoint_get_comm_mutex(smx_rdv_t rvp); SG_END_DECL() #endif /* _SIMIX_SIMIX_H */ diff --git a/src/simix/private.h b/src/simix/private.h index 8a41816b6b..5acd0fc20a 100644 --- a/src/simix/private.h +++ b/src/simix/private.h @@ -125,14 +125,13 @@ typedef struct s_smx_rvpoint { char *name; smx_mutex_t read; smx_mutex_t write; - smx_mutex_t comm_mutex; xbt_fifo_t comm_fifo; } s_smx_rvpoint_t; typedef struct s_smx_comm { smx_host_t src_host; smx_host_t dst_host; - smx_rvpoint_t rdv; + smx_rdv_t rdv; smx_cond_t cond; smx_action_t act; void *data; diff --git a/src/simix/smx_network.c b/src/simix/smx_network.c index 8890f81c67..5f40e44e78 100644 --- a/src/simix/smx_network.c +++ b/src/simix/smx_network.c @@ -18,15 +18,15 @@ * \param name The name of the rendez-vous point * \return The created rendez-vous point */ -smx_rvpoint_t SIMIX_rvpoint_create(char *name) +smx_rdv_t SIMIX_rdv_create(char *name) { - smx_rvpoint_t rvp = xbt_new0(s_smx_rvpoint_t, 1); + smx_rdv_t rvp = xbt_new0(s_smx_rvpoint_t, 1); rvp->name = xbt_strdup(name); rvp->read = SIMIX_mutex_init(); rvp->write = SIMIX_mutex_init(); rvp->comm_mutex = SIMIX_mutex_init(); rvp->comm_fifo = xbt_fifo_new(); - + return rvp; } @@ -34,7 +34,7 @@ smx_rvpoint_t SIMIX_rvpoint_create(char *name) * \brief Destroy a rendez-vous point * \param name The rendez-vous point to destroy */ -void SIMIX_rvpoint_destroy(smx_rvpoint_t rvp) +void SIMIX_rdv_destroy(smx_rdv_t rvp) { xbt_free(rvp->name); SIMIX_mutex_destroy(rvp->read); @@ -50,56 +50,39 @@ void SIMIX_rvpoint_destroy(smx_rvpoint_t rvp) * \param rvp The rendez-vous point * \param comm The communication request */ -static inline void SIMIX_rvpoint_push(smx_rvpoint_t rvp, smx_comm_t comm) +static inline void SIMIX_rdv_push(smx_rdv_t rvp, smx_comm_t comm) { xbt_fifo_push(rvp->comm_fifo, comm); } /** - * \brief Checks if there is a receive communication request queued in a rendez-vous + * \brief Checks if there is a communication request queued in a rendez-vous matching our needs * \param rvp The rendez-vous with the queue - * \return The communication request if found, NULL otherwise. + * \param look_for_src boolean. True: we are receiver looking for sender; False: other way round + * \return The communication request if found, a newly created one otherwise. */ -smx_comm_t SIMIX_rvpoint_get_receiver(smx_rvpoint_t rvp) -{ +smx_comm_t SIMIX_rdv_get_request_or_create(smx_rdv_t rvp, int look_for_src) { /* Get a communication request from the rendez-vous queue. If it is a receive request then return it, otherwise put it again in the queue and return NULL */ smx_comm_t comm_head = xbt_fifo_shift(rvp->comm_fifo); - if(comm_head != NULL && comm_head->dst_host != NULL) - return comm_head; - - xbt_fifo_unshift(rvp->comm_fifo, comm_head); - return NULL; -} + if(comm_head != NULL) { + if (( look_for_src && comm_head->src_host != NULL) || + (!look_for_src && comm_head->dst_host != NULL)) { -/** - * \brief Checks if there is a send communication request queued in a rendez-vous - * \param rvp The rendez-vous with the queue - * \return The communication request if found, NULL otherwise. - */ -smx_comm_t SIMIX_rvpoint_get_sender(smx_rvpoint_t rvp) -{ - /* Get a communication request from the rendez-vous queue. If it is a send - request then return it, otherwise put it again in the queue and return NULL - */ - smx_comm_t comm_head = xbt_fifo_shift(rvp->comm_fifo); - - if(comm_head != NULL && comm_head->src_host != NULL) - return comm_head; - + SIMIX_communication_use(comm_head); + return comm_head; + } + } xbt_fifo_unshift(rvp->comm_fifo, comm_head); - return NULL; -} -/** - * \brief Get the communication mutex of the rendez-vous point - * \param rvp The rendez-vous point - */ -static inline smx_mutex_t SIMIX_rvpoint_get_comm_mutex(smx_rvpoint_t rvp) -{ - return rvp->comm_mutex; + /* no relevant peer found. Create a new comm action and set it up */ + comm_head = SIMIX_communication_new(rvp); + SIMIX_rdv_push(rvp, comm_head); + SIMIX_communication_use(comm_head); + + return comm_head; } /******************************************************************************/ @@ -112,16 +95,12 @@ static inline smx_mutex_t SIMIX_rvpoint_get_comm_mutex(smx_rvpoint_t rvp) * \param receiver The process receiving the communication (by recv) * \return the communication request */ -smx_comm_t SIMIX_communication_new(smx_host_t src_host, smx_host_t dst_host, - smx_rvpoint_t rdv) +smx_comm_t SIMIX_communication_new(smx_rdv_t rdv) { /* alloc structures */ smx_comm_t comm = xbt_new0(s_smx_comm_t, 1); comm->cond = SIMIX_cond_init(); - /* initialize them */ - comm->src_host = src_host; - comm->dst_host = dst_host; comm->rdv = rdv; return comm; @@ -166,120 +145,91 @@ static inline void SIMIX_communication_use(smx_comm_t comm) static inline void SIMIX_communication_start(smx_comm_t comm) { comm->act = SIMIX_action_communicate(comm->src_host, comm->dst_host, NULL, - comm->data_size, comm->rate); + comm->data_size, comm->rate); SIMIX_register_action_to_condition(comm->act, comm->cond); +} + +static inline void SIMIX_communication_wait_for_completion(smx_comm_t comm) +{ __SIMIX_cond_wait(comm->cond); - SIMIX_unregister_action_to_condition(comm->act, comm->cond); + if (comm->act) { + SIMIX_unregister_action_to_condition(comm->act, comm->cond); + SIMIX_action_destroy(comm->act); + comm->act=NULL; + } + /* Check for errors */ + if (SIMIX_host_get_state(comm->dst_host) == 0){ + THROW1(host_error, 0, "Destination host %s failed", comm->dst_host->name); + } else if (SIMIX_host_get_state(comm->src_host) == 0){ + THROW1(host_error, 0, "Source host %s failed", comm->src_host->name); + } else if (SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED) { + THROW0(network_error, 0, "Link failure"); + } + } - /******************************************************************************/ /* Synchronous Communication */ /******************************************************************************/ +/* Throws: + * - host_error if peer failed + * - network_error if network failed */ -void SIMIX_network_send(smx_rvpoint_t rdv, void *data, size_t size, double timeout, double rate) +void SIMIX_network_send(smx_rdv_t rdv, void *data, size_t size, double timeout, double rate) { /*double start_time = SIMIX_get_clock();*/ void *smx_net_data; - smx_host_t my_host = SIMIX_host_self(); smx_comm_t comm; - smx_mutex_t rvp_comm_mutex = SIMIX_rvpoint_get_comm_mutex(rdv); - /* Lock the rendez-vous point */ - SIMIX_mutex_lock(rvp_comm_mutex); /* Copy the message to the network */ /*FIXME here the MC should allocate the space from the network storage area */ smx_net_data = xbt_malloc(size); memcpy(smx_net_data, data, size); - - /* Check if there is already a receive waiting in the rendez-vous point */ - if((comm = SIMIX_rvpoint_get_receiver(rdv)) != NULL){ - comm->src_host = my_host; - comm->data = smx_net_data; - comm->data_size = size; - comm->rate = rate; - SIMIX_communication_use(comm); - - /* Unlock the rendez-vous point and start the communication action.*/ - /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */ - SIMIX_mutex_unlock(rvp_comm_mutex); - SIMIX_communication_start(comm); - - /* Nobody is at the rendez-vous point, so push the comm action into it */ - }else{ - comm = SIMIX_communication_new(my_host, NULL, rdv); - comm->data = smx_net_data; - comm->data_size = size; - comm->rate = rate; - SIMIX_communication_use(comm); - SIMIX_rvpoint_push(rdv, comm); - - /* Wait for communication completion */ - /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */ - /* FIXME: add timeout checking stuff */ - SIMIX_mutex_unlock (rvp_comm_mutex); - __SIMIX_cond_wait(comm->cond); - } - /* Check for errors */ - if (SIMIX_host_get_state(comm->dst_host) == 0){ - THROW1(host_error, 0, "Destination host %s failed", comm->dst_host->name); - }else if(SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED){ - THROW0(network_error, 0, "Link failure"); - } - + comm = SIMIX_rdv_get_request_or_create(rdv,0); + comm->src_host = SIMIX_host_self(); + comm->data = smx_net_data; + comm->data_size = size; + comm->rate = rate; + + /* If the receiver is already there, start the communication */ + /* If it's not here already, it will start that communication itself later on */ + if(comm->dst_host != NULL) + SIMIX_communication_start(comm); + + /* Wait for communication completion */ + /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */ + /* FIXME: add timeout checking stuff */ + SIMIX_communication_wait_for_completion(comm); + SIMIX_communication_destroy(comm); - return; } -void SIMIX_network_recv(smx_rvpoint_t rdv, void **data, size_t *size, double timeout) +void SIMIX_network_recv(smx_rdv_t rdv, void **data, size_t *size, double timeout) { /*double start_time = SIMIX_get_clock();*/ smx_comm_t comm; smx_host_t my_host = SIMIX_host_self(); - smx_mutex_t rvp_comm_mutex = SIMIX_rvpoint_get_comm_mutex(rdv); - - /* Lock the rendez-vous point */ - SIMIX_mutex_lock(rvp_comm_mutex); - - /* Check if there is already a send waiting in the rendez-vous point */ - if((comm = SIMIX_rvpoint_get_sender(rdv)) != NULL){ - comm->dst_host = my_host; - comm->dest_buff = data; - SIMIX_communication_use(comm); - - /* Unlock the rendez-vous point and start the communication action.*/ - /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */ - SIMIX_mutex_unlock(rvp_comm_mutex); - SIMIX_communication_start(comm); - - /* Nobody is at the rendez-vous point, so push the comm action into it */ - }else{ - comm = SIMIX_communication_new(NULL, my_host, rdv); - comm->dest_buff = data; - SIMIX_communication_use(comm); - SIMIX_rvpoint_push(rdv, comm); - - /* Wait for communication completion */ - /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */ - /* FIXME: add timeout checking stuff*/ - SIMIX_mutex_unlock (rvp_comm_mutex); - __SIMIX_cond_wait(comm->cond); - } - /* Check for errors */ - if (SIMIX_host_get_state(comm->src_host) == 0){ - THROW1(host_error, 0, "Source host %s failed", comm->src_host->name); - }else if(SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED){ - THROW0(network_error, 0, "Link failure"); - } + comm = SIMIX_rdv_get_request_or_create(rdv,1); + comm->dst_host = SIMIX_host_self(); + comm->dest_buff = data; + + /* If the sender is already there, start the communication */ + /* If it's not here already, it will start that communication itself later on */ + if (comm->src_host != NULL) + SIMIX_communication_start(comm); + + /* Wait for communication completion */ + /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */ + /* FIXME: add timeout checking stuff */ + SIMIX_communication_wait_for_completion(comm); /* We are OK, let's copy the message to receiver's buffer */ *size = *size < comm->data_size ? *size : comm->data_size; memcpy(*data, comm->data, *size); - + SIMIX_communication_destroy(comm); - return; } /******************************************************************************/ @@ -301,7 +251,7 @@ XBT_PUBLIC(int) SIMIX_network_test(smx_action_t comm) { if(SIMIX_action_get_state (comm) == SURF_ACTION_DONE){ memcpy(comm->data - + return SIMIX_action_get_state (comm) == SURF_ACTION_DONE ? TRUE : FALSE; }*/ -- 2.20.1