From: mquinson Date: Mon, 5 Oct 2009 12:26:52 +0000 (+0000) Subject: Corrections and improvements to the new network implementation for SIMIX X-Git-Tag: SVN~1014 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/9c0ecca30e78065f9f57599a3eee17e53e8d89ca Corrections and improvements to the new network implementation for SIMIX git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6699 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/src/include/simix/simix.h b/src/include/simix/simix.h index fd67fe5a19..33371a830f 100644 --- a/src/include/simix/simix.h +++ b/src/include/simix/simix.h @@ -177,20 +177,19 @@ XBT_PUBLIC(void) SIMIX_display_process_status(void); /* Public */ XBT_PUBLIC(smx_rdv_t) SIMIX_rdv_create(char *name); XBT_PUBLIC(void) SIMIX_rdv_destroy(smx_rdv_t rvp); -XBT_PUBLIC(void) SIMIX_network_send(smx_rdv_t rdv, void *data, size_t size, double rate, double timeout); -XBT_PUBLIC(void) SIMIX_network_recv(smx_rdv_t rvp, void **data, size_t *size, double timeout); +XBT_PUBLIC(void) SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate, double timeout, void *data, size_t data_size); +XBT_PUBLIC(void) SIMIX_network_recv(smx_rdv_t rdv, double timeout, void **data, size_t *data_size); XBT_PUBLIC(void) SIMIX_network_wait(smx_action_t comm); XBT_PUBLIC(int) SIMIX_network_test(smx_action_t comm); /* These should be private to SIMIX */ -smx_comm_t SIMIX_communication_new(smx_host_t src_host, smx_host_t dst_host, - smx_rdv_t rdv); +smx_comm_t SIMIX_communication_new(smx_rdv_t rdv); void SIMIX_communication_destroy(smx_comm_t comm); -smx_comm_t SIMIX_rvpoint_get_receiver_or_create_request(smx_rdv_t rvp); -smx_comm_t SIMIX_rvpoint_get_sender_or_create(smx_rdv_t rvp); -static inline void SIMIX_rvpoint_push(smx_rdv_t rvp, smx_comm_t comm); -static inline smx_cond_t SIMIX_rvpoint_get_cond(smx_rdv_t rvp); -static inline smx_mutex_t SIMIX_rvpoint_get_comm_mutex(smx_rdv_t rvp); +static inline void SIMIX_communication_use(smx_comm_t comm); +static inline void SIMIX_communication_wait_for_completion(smx_comm_t comm, double timeout); +smx_comm_t SIMIX_rdv_get_request_or_create(smx_rdv_t rvp, int look_for_src); +static inline void SIMIX_rdv_push(smx_rdv_t rvp, smx_comm_t comm); +static inline smx_cond_t SIMIX_rdv_get_cond(smx_rdv_t rvp); SG_END_DECL() #endif /* _SIMIX_SIMIX_H */ diff --git a/src/simix/private.h b/src/simix/private.h index 5acd0fc20a..2d00b7f924 100644 --- a/src/simix/private.h +++ b/src/simix/private.h @@ -137,8 +137,9 @@ typedef struct s_smx_comm { void *data; size_t data_size; void **dest_buff; - size_t dest_buff_size; + size_t *dest_buff_size; double rate; + double task_size; int refcount; } s_smx_comm_t; diff --git a/src/simix/smx_network.c b/src/simix/smx_network.c index 5f40e44e78..4ba18f5f5e 100644 --- a/src/simix/smx_network.c +++ b/src/simix/smx_network.c @@ -24,7 +24,6 @@ smx_rdv_t SIMIX_rdv_create(char *name) rvp->name = xbt_strdup(name); rvp->read = SIMIX_mutex_init(); rvp->write = SIMIX_mutex_init(); - rvp->comm_mutex = SIMIX_mutex_init(); rvp->comm_fifo = xbt_fifo_new(); return rvp; @@ -39,7 +38,6 @@ void SIMIX_rdv_destroy(smx_rdv_t rvp) xbt_free(rvp->name); SIMIX_mutex_destroy(rvp->read); SIMIX_mutex_destroy(rvp->write); - SIMIX_mutex_destroy(rvp->comm_mutex); xbt_fifo_free(rvp->comm_fifo); xbt_free(rvp); } @@ -59,30 +57,31 @@ static inline void SIMIX_rdv_push(smx_rdv_t rvp, smx_comm_t comm) * \brief Checks if there is a communication request queued in a rendez-vous matching our needs * \param rvp The rendez-vous with the queue * \param look_for_src boolean. True: we are receiver looking for sender; False: other way round - * \return The communication request if found, a newly created one otherwise. + * \return The communication request if found, or a newly created one otherwise. */ smx_comm_t SIMIX_rdv_get_request_or_create(smx_rdv_t rvp, int look_for_src) { - /* Get a communication request from the rendez-vous queue. If it is a receive - request then return it, otherwise put it again in the queue and return NULL + /* Get a communication request from the rendez-vous queue. If it is the kind + of request we are looking for then return it, otherwise put it again in the + queue. */ - smx_comm_t comm_head = xbt_fifo_shift(rvp->comm_fifo); + smx_comm_t comm = xbt_fifo_shift(rvp->comm_fifo); - if(comm_head != NULL) { - if (( look_for_src && comm_head->src_host != NULL) || - (!look_for_src && comm_head->dst_host != NULL)) { + if(comm != NULL) { + if (( look_for_src && comm->src_host != NULL) || + (!look_for_src && comm->dst_host != NULL)) { - SIMIX_communication_use(comm_head); - return comm_head; + SIMIX_communication_use(comm); + return comm; } } - xbt_fifo_unshift(rvp->comm_fifo, comm_head); + xbt_fifo_unshift(rvp->comm_fifo, comm); - /* no relevant peer found. Create a new comm action and set it up */ - comm_head = SIMIX_communication_new(rvp); - SIMIX_rdv_push(rvp, comm_head); - SIMIX_communication_use(comm_head); + /* no relevant request found. Create a new comm action and set it up */ + comm = SIMIX_communication_new(rvp); + SIMIX_rdv_push(rvp, comm); + SIMIX_communication_use(comm); - return comm_head; + return comm; } /******************************************************************************/ @@ -100,7 +99,6 @@ smx_comm_t SIMIX_communication_new(smx_rdv_t rdv) /* alloc structures */ smx_comm_t comm = xbt_new0(s_smx_comm_t, 1); comm->cond = SIMIX_cond_init(); - comm->rdv = rdv; return comm; @@ -117,9 +115,6 @@ void SIMIX_communication_destroy(smx_comm_t comm) if(comm->act != NULL) SIMIX_action_destroy(comm->act); - if(comm->data != NULL) - xbt_free(comm->data); - xbt_free(comm->cond); xbt_free(comm); } @@ -140,57 +135,76 @@ static inline void SIMIX_communication_use(smx_comm_t comm) /** * \brief Start the simulation of a communication request - * \param comm The communicatino request + * \param comm The communication request */ static inline void SIMIX_communication_start(smx_comm_t comm) { comm->act = SIMIX_action_communicate(comm->src_host, comm->dst_host, NULL, - comm->data_size, comm->rate); + comm->task_size, comm->rate); SIMIX_register_action_to_condition(comm->act, comm->cond); } -static inline void SIMIX_communication_wait_for_completion(smx_comm_t comm) +/** + * \brief Waits for communication completion and performs error checking + * \param comm The communication + * \param timeout The max amount of time to wait for the communication to finish + * + * Throws: + * - host_error if peer failed + * - timeout_error if communication reached the timeout specified + * - network_error if network failed or peer issued a timeout + */ +static inline void SIMIX_communication_wait_for_completion(smx_comm_t comm, double timeout) { - __SIMIX_cond_wait(comm->cond); - if (comm->act) { - SIMIX_unregister_action_to_condition(comm->act, comm->cond); - SIMIX_action_destroy(comm->act); - comm->act=NULL; + xbt_ex_t e; + + if(timeout > 0){ + TRY{ + SIMIX_cond_wait_timeout(comm->cond, NULL, timeout); + } + CATCH(e){ + /* If it's a timeout then cancel the communication and signal the other peer */ + if(e.category == timeout_error) + SIMIX_action_cancel(comm->act); + SIMIX_cond_signal(comm->cond); + RETHROW; + } + }else{ + SIMIX_cond_wait(comm->cond, NULL); } + + SIMIX_unregister_action_to_condition(comm->act, comm->cond); + /* Check for errors */ if (SIMIX_host_get_state(comm->dst_host) == 0){ THROW1(host_error, 0, "Destination host %s failed", comm->dst_host->name); } else if (SIMIX_host_get_state(comm->src_host) == 0){ THROW1(host_error, 0, "Source host %s failed", comm->src_host->name); - } else if (SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED) { + } else if (SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED){ THROW0(network_error, 0, "Link failure"); } - } + /******************************************************************************/ /* Synchronous Communication */ /******************************************************************************/ -/* Throws: - * - host_error if peer failed - * - network_error if network failed */ - -void SIMIX_network_send(smx_rdv_t rdv, void *data, size_t size, double timeout, double rate) +/* Throws: + * - host_error if peer failed + * - timeout_error if communication reached the timeout specified + * - network_error if network failed or peer issued a timeout + */ +void SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate, double timeout, void *data, size_t data_size) { - /*double start_time = SIMIX_get_clock();*/ - void *smx_net_data; smx_comm_t comm; - /* Copy the message to the network */ - /*FIXME here the MC should allocate the space from the network storage area */ - smx_net_data = xbt_malloc(size); - memcpy(smx_net_data, data, size); - - comm = SIMIX_rdv_get_request_or_create(rdv,0); + /* Setup communication request */ + comm = SIMIX_rdv_get_request_or_create(rdv, 0); comm->src_host = SIMIX_host_self(); - comm->data = smx_net_data; - comm->data_size = size; + comm->task_size = task_size; comm->rate = rate; + comm->data = data; + comm->data_size = data_size; /* If the receiver is already there, start the communication */ /* If it's not here already, it will start that communication itself later on */ @@ -200,20 +214,25 @@ void SIMIX_network_send(smx_rdv_t rdv, void *data, size_t size, double timeout, /* Wait for communication completion */ /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */ /* FIXME: add timeout checking stuff */ - SIMIX_communication_wait_for_completion(comm); + SIMIX_communication_wait_for_completion(comm, timeout); SIMIX_communication_destroy(comm); } -void SIMIX_network_recv(smx_rdv_t rdv, void **data, size_t *size, double timeout) +/* Throws: + * - host_error if peer failed + * - timeout_error if communication reached the timeout specified + * - network_error if network failed or peer issued a timeout + */ +void SIMIX_network_recv(smx_rdv_t rdv, double timeout, void **data, size_t *data_size) { - /*double start_time = SIMIX_get_clock();*/ smx_comm_t comm; - smx_host_t my_host = SIMIX_host_self(); - comm = SIMIX_rdv_get_request_or_create(rdv,1); + /* Setup communication request */ + comm = SIMIX_rdv_get_request_or_create(rdv, 1); comm->dst_host = SIMIX_host_self(); comm->dest_buff = data; + comm->dest_buff_size = data_size; /* If the sender is already there, start the communication */ /* If it's not here already, it will start that communication itself later on */ @@ -223,11 +242,7 @@ void SIMIX_network_recv(smx_rdv_t rdv, void **data, size_t *size, double timeout /* Wait for communication completion */ /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */ /* FIXME: add timeout checking stuff */ - SIMIX_communication_wait_for_completion(comm); - - /* We are OK, let's copy the message to receiver's buffer */ - *size = *size < comm->data_size ? *size : comm->data_size; - memcpy(*data, comm->data, *size); + SIMIX_communication_wait_for_completion(comm, timeout); SIMIX_communication_destroy(comm); } @@ -257,6 +272,10 @@ XBT_PUBLIC(int) SIMIX_network_test(smx_action_t comm) + /* FIXME: MOVE DATA COPY TO MAESTRO AT ACTION SIGNAL TIME + We are OK, let's copy the message to receiver's buffer + *size = *size < comm->data_size ? *size : comm->data_size; + memcpy(*data, comm->data, *size);*/ diff --git a/src/simix/smx_synchro.c b/src/simix/smx_synchro.c index a54d30cd0c..b664398515 100644 --- a/src/simix/smx_synchro.c +++ b/src/simix/smx_synchro.c @@ -176,14 +176,16 @@ void SIMIX_cond_signal(smx_cond_t cond) void SIMIX_cond_wait(smx_cond_t cond, smx_mutex_t mutex) { smx_action_t act_sleep; - xbt_assert0((mutex != NULL), "Invalid parameters"); DEBUG1("Wait condition %p", cond); - cond->mutex = mutex; - SIMIX_mutex_unlock(mutex); - /* always create an action null in case there is a host failure */ -/* if (xbt_fifo_size(cond->actions) == 0) { */ + /* If there is a mutex unlock it */ + if(mutex != NULL){ + cond->mutex = mutex; + SIMIX_mutex_unlock(mutex); + } + + /* Always create an action null in case there is a host failure */ act_sleep = SIMIX_action_sleep(SIMIX_host_self(), -1); SIMIX_process_self()->waiting_action = act_sleep; SIMIX_register_action_to_condition(act_sleep, cond); @@ -191,11 +193,10 @@ void SIMIX_cond_wait(smx_cond_t cond, smx_mutex_t mutex) SIMIX_process_self()->waiting_action = NULL; SIMIX_unregister_action_to_condition(act_sleep, cond); SIMIX_action_destroy(act_sleep); -/* } else { */ -/* __SIMIX_cond_wait(cond); */ -/* } */ - /* get the mutex again */ - SIMIX_mutex_lock(cond->mutex); + + /* get the mutex again if necessary */ + if(mutex != NULL) + SIMIX_mutex_lock(cond->mutex); return; } @@ -221,7 +222,6 @@ void __SIMIX_cond_wait(smx_cond_t cond) SIMIX_process_yield(); } return; - } /** @@ -235,14 +235,16 @@ void __SIMIX_cond_wait(smx_cond_t cond) void SIMIX_cond_wait_timeout(smx_cond_t cond, smx_mutex_t mutex, double max_duration) { - smx_action_t act_sleep; - xbt_assert0((mutex != NULL), "Invalid parameters"); DEBUG1("Timed wait condition %p", cond); - cond->mutex = mutex; - SIMIX_mutex_unlock(mutex); + /* If there is a mutex unlock it */ + if(mutex != NULL){ + cond->mutex = mutex; + SIMIX_mutex_unlock(mutex); + } + if (max_duration >= 0) { act_sleep = SIMIX_action_sleep(SIMIX_host_self(), max_duration); SIMIX_register_action_to_condition(act_sleep, cond); @@ -260,8 +262,9 @@ void SIMIX_cond_wait_timeout(smx_cond_t cond, smx_mutex_t mutex, } else __SIMIX_cond_wait(cond); - /* get the mutex again */ - SIMIX_mutex_lock(cond->mutex); + /* get the mutex again if necessary */ + if(mutex != NULL) + SIMIX_mutex_lock(cond->mutex); return; }