* \param name The name of the rendez-vous point
* \return The created rendez-vous point
*/
-smx_rvpoint_t SIMIX_rvpoint_create(char *name)
+smx_rdv_t SIMIX_rdv_create(char *name)
{
- smx_rvpoint_t rvp = xbt_new0(s_smx_rvpoint_t, 1);
+ smx_rdv_t rvp = xbt_new0(s_smx_rvpoint_t, 1);
rvp->name = xbt_strdup(name);
rvp->read = SIMIX_mutex_init();
rvp->write = SIMIX_mutex_init();
rvp->comm_mutex = SIMIX_mutex_init();
rvp->comm_fifo = xbt_fifo_new();
-
+
return rvp;
}
* \brief Destroy a rendez-vous point
* \param name The rendez-vous point to destroy
*/
-void SIMIX_rvpoint_destroy(smx_rvpoint_t rvp)
+void SIMIX_rdv_destroy(smx_rdv_t rvp)
{
xbt_free(rvp->name);
SIMIX_mutex_destroy(rvp->read);
* \param rvp The rendez-vous point
* \param comm The communication request
*/
-static inline void SIMIX_rvpoint_push(smx_rvpoint_t rvp, smx_comm_t comm)
+static inline void SIMIX_rdv_push(smx_rdv_t rvp, smx_comm_t comm)
{
xbt_fifo_push(rvp->comm_fifo, comm);
}
/**
- * \brief Checks if there is a receive communication request queued in a rendez-vous
+ * \brief Checks if there is a communication request queued in a rendez-vous matching our needs
* \param rvp The rendez-vous with the queue
- * \return The communication request if found, NULL otherwise.
+ * \param look_for_src boolean. True: we are receiver looking for sender; False: other way round
+ * \return The communication request if found, a newly created one otherwise.
*/
-smx_comm_t SIMIX_rvpoint_get_receiver(smx_rvpoint_t rvp)
-{
+smx_comm_t SIMIX_rdv_get_request_or_create(smx_rdv_t rvp, int look_for_src) {
/* Get a communication request from the rendez-vous queue. If it is a receive
request then return it, otherwise put it again in the queue and return NULL
*/
smx_comm_t comm_head = xbt_fifo_shift(rvp->comm_fifo);
- if(comm_head != NULL && comm_head->dst_host != NULL)
- return comm_head;
-
- xbt_fifo_unshift(rvp->comm_fifo, comm_head);
- return NULL;
-}
+ if(comm_head != NULL) {
+ if (( look_for_src && comm_head->src_host != NULL) ||
+ (!look_for_src && comm_head->dst_host != NULL)) {
-/**
- * \brief Checks if there is a send communication request queued in a rendez-vous
- * \param rvp The rendez-vous with the queue
- * \return The communication request if found, NULL otherwise.
- */
-smx_comm_t SIMIX_rvpoint_get_sender(smx_rvpoint_t rvp)
-{
- /* Get a communication request from the rendez-vous queue. If it is a send
- request then return it, otherwise put it again in the queue and return NULL
- */
- smx_comm_t comm_head = xbt_fifo_shift(rvp->comm_fifo);
-
- if(comm_head != NULL && comm_head->src_host != NULL)
- return comm_head;
-
+ SIMIX_communication_use(comm_head);
+ return comm_head;
+ }
+ }
xbt_fifo_unshift(rvp->comm_fifo, comm_head);
- return NULL;
-}
-/**
- * \brief Get the communication mutex of the rendez-vous point
- * \param rvp The rendez-vous point
- */
-static inline smx_mutex_t SIMIX_rvpoint_get_comm_mutex(smx_rvpoint_t rvp)
-{
- return rvp->comm_mutex;
+ /* no relevant peer found. Create a new comm action and set it up */
+ comm_head = SIMIX_communication_new(rvp);
+ SIMIX_rdv_push(rvp, comm_head);
+ SIMIX_communication_use(comm_head);
+
+ return comm_head;
}
/******************************************************************************/
* \param receiver The process receiving the communication (by recv)
* \return the communication request
*/
-smx_comm_t SIMIX_communication_new(smx_host_t src_host, smx_host_t dst_host,
- smx_rvpoint_t rdv)
+smx_comm_t SIMIX_communication_new(smx_rdv_t rdv)
{
/* alloc structures */
smx_comm_t comm = xbt_new0(s_smx_comm_t, 1);
comm->cond = SIMIX_cond_init();
- /* initialize them */
- comm->src_host = src_host;
- comm->dst_host = dst_host;
comm->rdv = rdv;
return comm;
static inline void SIMIX_communication_start(smx_comm_t comm)
{
comm->act = SIMIX_action_communicate(comm->src_host, comm->dst_host, NULL,
- comm->data_size, comm->rate);
+ comm->data_size, comm->rate);
SIMIX_register_action_to_condition(comm->act, comm->cond);
+}
+
+static inline void SIMIX_communication_wait_for_completion(smx_comm_t comm)
+{
__SIMIX_cond_wait(comm->cond);
- SIMIX_unregister_action_to_condition(comm->act, comm->cond);
+ if (comm->act) {
+ SIMIX_unregister_action_to_condition(comm->act, comm->cond);
+ SIMIX_action_destroy(comm->act);
+ comm->act=NULL;
+ }
+ /* Check for errors */
+ if (SIMIX_host_get_state(comm->dst_host) == 0){
+ THROW1(host_error, 0, "Destination host %s failed", comm->dst_host->name);
+ } else if (SIMIX_host_get_state(comm->src_host) == 0){
+ THROW1(host_error, 0, "Source host %s failed", comm->src_host->name);
+ } else if (SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED) {
+ THROW0(network_error, 0, "Link failure");
+ }
+
}
-
/******************************************************************************/
/* Synchronous Communication */
/******************************************************************************/
+/* Throws:
+ * - host_error if peer failed
+ * - network_error if network failed */
-void SIMIX_network_send(smx_rvpoint_t rdv, void *data, size_t size, double timeout, double rate)
+void SIMIX_network_send(smx_rdv_t rdv, void *data, size_t size, double timeout, double rate)
{
/*double start_time = SIMIX_get_clock();*/
void *smx_net_data;
- smx_host_t my_host = SIMIX_host_self();
smx_comm_t comm;
- smx_mutex_t rvp_comm_mutex = SIMIX_rvpoint_get_comm_mutex(rdv);
- /* Lock the rendez-vous point */
- SIMIX_mutex_lock(rvp_comm_mutex);
/* Copy the message to the network */
/*FIXME here the MC should allocate the space from the network storage area */
smx_net_data = xbt_malloc(size);
memcpy(smx_net_data, data, size);
-
- /* Check if there is already a receive waiting in the rendez-vous point */
- if((comm = SIMIX_rvpoint_get_receiver(rdv)) != NULL){
- comm->src_host = my_host;
- comm->data = smx_net_data;
- comm->data_size = size;
- comm->rate = rate;
- SIMIX_communication_use(comm);
-
- /* Unlock the rendez-vous point and start the communication action.*/
- /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
- SIMIX_mutex_unlock(rvp_comm_mutex);
- SIMIX_communication_start(comm);
-
- /* Nobody is at the rendez-vous point, so push the comm action into it */
- }else{
- comm = SIMIX_communication_new(my_host, NULL, rdv);
- comm->data = smx_net_data;
- comm->data_size = size;
- comm->rate = rate;
- SIMIX_communication_use(comm);
- SIMIX_rvpoint_push(rdv, comm);
-
- /* Wait for communication completion */
- /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
- /* FIXME: add timeout checking stuff */
- SIMIX_mutex_unlock (rvp_comm_mutex);
- __SIMIX_cond_wait(comm->cond);
- }
- /* Check for errors */
- if (SIMIX_host_get_state(comm->dst_host) == 0){
- THROW1(host_error, 0, "Destination host %s failed", comm->dst_host->name);
- }else if(SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED){
- THROW0(network_error, 0, "Link failure");
- }
-
+ comm = SIMIX_rdv_get_request_or_create(rdv,0);
+ comm->src_host = SIMIX_host_self();
+ comm->data = smx_net_data;
+ comm->data_size = size;
+ comm->rate = rate;
+
+ /* If the receiver is already there, start the communication */
+ /* If it's not here already, it will start that communication itself later on */
+ if(comm->dst_host != NULL)
+ SIMIX_communication_start(comm);
+
+ /* Wait for communication completion */
+ /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
+ /* FIXME: add timeout checking stuff */
+ SIMIX_communication_wait_for_completion(comm);
+
SIMIX_communication_destroy(comm);
- return;
}
-void SIMIX_network_recv(smx_rvpoint_t rdv, void **data, size_t *size, double timeout)
+void SIMIX_network_recv(smx_rdv_t rdv, void **data, size_t *size, double timeout)
{
/*double start_time = SIMIX_get_clock();*/
smx_comm_t comm;
smx_host_t my_host = SIMIX_host_self();
- smx_mutex_t rvp_comm_mutex = SIMIX_rvpoint_get_comm_mutex(rdv);
-
- /* Lock the rendez-vous point */
- SIMIX_mutex_lock(rvp_comm_mutex);
-
- /* Check if there is already a send waiting in the rendez-vous point */
- if((comm = SIMIX_rvpoint_get_sender(rdv)) != NULL){
- comm->dst_host = my_host;
- comm->dest_buff = data;
- SIMIX_communication_use(comm);
-
- /* Unlock the rendez-vous point and start the communication action.*/
- /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
- SIMIX_mutex_unlock(rvp_comm_mutex);
- SIMIX_communication_start(comm);
-
- /* Nobody is at the rendez-vous point, so push the comm action into it */
- }else{
- comm = SIMIX_communication_new(NULL, my_host, rdv);
- comm->dest_buff = data;
- SIMIX_communication_use(comm);
- SIMIX_rvpoint_push(rdv, comm);
-
- /* Wait for communication completion */
- /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
- /* FIXME: add timeout checking stuff*/
- SIMIX_mutex_unlock (rvp_comm_mutex);
- __SIMIX_cond_wait(comm->cond);
- }
- /* Check for errors */
- if (SIMIX_host_get_state(comm->src_host) == 0){
- THROW1(host_error, 0, "Source host %s failed", comm->src_host->name);
- }else if(SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED){
- THROW0(network_error, 0, "Link failure");
- }
+ comm = SIMIX_rdv_get_request_or_create(rdv,1);
+ comm->dst_host = SIMIX_host_self();
+ comm->dest_buff = data;
+
+ /* If the sender is already there, start the communication */
+ /* If it's not here already, it will start that communication itself later on */
+ if (comm->src_host != NULL)
+ SIMIX_communication_start(comm);
+
+ /* Wait for communication completion */
+ /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
+ /* FIXME: add timeout checking stuff */
+ SIMIX_communication_wait_for_completion(comm);
/* We are OK, let's copy the message to receiver's buffer */
*size = *size < comm->data_size ? *size : comm->data_size;
memcpy(*data, comm->data, *size);
-
+
SIMIX_communication_destroy(comm);
- return;
}
/******************************************************************************/
{
if(SIMIX_action_get_state (comm) == SURF_ACTION_DONE){
memcpy(comm->data
-
+
return SIMIX_action_get_state (comm) == SURF_ACTION_DONE ? TRUE : FALSE;
}*/