+/* $Id$ */
+
+/* Copyright (c) 2009 Cristian Rosa.
+ All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "private.h"
+#include "xbt/log.h"
+
+/******************************************************************************/
+/* Rendez-Vous Points */
+/******************************************************************************/
+
+/**
+ * \brief Creates a new rendez-vous point
+ * \param name The name of the rendez-vous point
+ * \return The created rendez-vous point
+ */
+smx_rvpoint_t SIMIX_rvpoint_create(char *name)
+{
+ smx_rvpoint_t rvp = xbt_new0(s_smx_rvpoint_t, 1);
+ rvp->name = xbt_strdup(name);
+ rvp->read = SIMIX_mutex_init();
+ rvp->write = SIMIX_mutex_init();
+ rvp->comm_mutex = SIMIX_mutex_init();
+ rvp->comm_fifo = xbt_fifo_new();
+
+ return rvp;
+}
+
+/**
+ * \brief Destroy a rendez-vous point
+ * \param name The rendez-vous point to destroy
+ */
+void SIMIX_rvpoint_destroy(smx_rvpoint_t rvp)
+{
+ xbt_free(rvp->name);
+ SIMIX_mutex_destroy(rvp->read);
+ SIMIX_mutex_destroy(rvp->write);
+ SIMIX_mutex_destroy(rvp->comm_mutex);
+ xbt_fifo_free(rvp->comm_fifo);
+ xbt_free(rvp);
+}
+
+/**
+ * \brief Push a communication request into a rendez-vous point
+ * The communications request are dequeued by the two functions below
+ * \param rvp The rendez-vous point
+ * \param comm The communication request
+ */
+static inline void SIMIX_rvpoint_push(smx_rvpoint_t rvp, smx_comm_t comm)
+{
+ xbt_fifo_push(rvp->comm_fifo, comm);
+}
+
+/**
+ * \brief Checks if there is a receive communication request queued in a rendez-vous
+ * \param rvp The rendez-vous with the queue
+ * \return The communication request if found, NULL otherwise.
+ */
+smx_comm_t SIMIX_rvpoint_get_receiver(smx_rvpoint_t rvp)
+{
+ /* Get a communication request from the rendez-vous queue. If it is a receive
+ request then return it, otherwise put it again in the queue and return NULL
+ */
+ smx_comm_t comm_head = xbt_fifo_shift(rvp->comm_fifo);
+
+ if(comm_head != NULL && comm_head->dst_host != NULL)
+ return comm_head;
+
+ xbt_fifo_unshift(rvp->comm_fifo, comm_head);
+ return NULL;
+}
+
+/**
+ * \brief Checks if there is a send communication request queued in a rendez-vous
+ * \param rvp The rendez-vous with the queue
+ * \return The communication request if found, NULL otherwise.
+ */
+smx_comm_t SIMIX_rvpoint_get_sender(smx_rvpoint_t rvp)
+{
+ /* Get a communication request from the rendez-vous queue. If it is a send
+ request then return it, otherwise put it again in the queue and return NULL
+ */
+ smx_comm_t comm_head = xbt_fifo_shift(rvp->comm_fifo);
+
+ if(comm_head != NULL && comm_head->src_host != NULL)
+ return comm_head;
+
+ xbt_fifo_unshift(rvp->comm_fifo, comm_head);
+ return NULL;
+}
+
+/**
+ * \brief Get the communication mutex of the rendez-vous point
+ * \param rvp The rendez-vous point
+ */
+static inline smx_mutex_t SIMIX_rvpoint_get_comm_mutex(smx_rvpoint_t rvp)
+{
+ return rvp->comm_mutex;
+}
+
+/******************************************************************************/
+/* Communication Requests */
+/******************************************************************************/
+
+/**
+ * \brief Creates a new communication request
+ * \param sender The process starting the communication (by send)
+ * \param receiver The process receiving the communication (by recv)
+ * \return the communication request
+ */
+smx_comm_t SIMIX_communication_new(smx_host_t src_host, smx_host_t dst_host,
+ smx_rvpoint_t rdv)
+{
+ /* alloc structures */
+ smx_comm_t comm = xbt_new0(s_smx_comm_t, 1);
+ comm->cond = SIMIX_cond_init();
+
+ /* initialize them */
+ comm->src_host = src_host;
+ comm->dst_host = dst_host;
+ comm->rdv = rdv;
+
+ return comm;
+}
+
+/**
+ * \brief Destroy a communication request
+ * \param comm The request to be destroyed
+ */
+void SIMIX_communication_destroy(smx_comm_t comm)
+{
+ comm->refcount--;
+ if(comm->refcount == 0){
+ if(comm->act != NULL)
+ SIMIX_action_destroy(comm->act);
+
+ if(comm->data != NULL)
+ xbt_free(comm->data);
+
+ xbt_free(comm->cond);
+ xbt_free(comm);
+ }
+}
+
+/**
+ * \brief Increase the number of users of the communication.
+ * \param comm The communication request
+ * Each communication request can be used by more than one process, so it is
+ * necessary to know number of them at destroy time, to avoid freeing stuff that
+ * maybe is in use by others.
+ * \
+ */
+static inline void SIMIX_communication_use(smx_comm_t comm)
+{
+ comm->refcount++;
+}
+
+/**
+ * \brief Start the simulation of a communication request
+ * \param comm The communicatino request
+ */
+static inline void SIMIX_communication_start(smx_comm_t comm)
+{
+ comm->act = SIMIX_action_communicate(comm->src_host, comm->dst_host, NULL,
+ comm->data_size, comm->rate);
+
+ SIMIX_register_action_to_condition(comm->act, comm->cond);
+ __SIMIX_cond_wait(comm->cond);
+ SIMIX_unregister_action_to_condition(comm->act, comm->cond);
+}
+
+/******************************************************************************/
+/* Synchronous Communication */
+/******************************************************************************/
+
+void SIMIX_network_send(smx_rvpoint_t rdv, void *data, size_t size, double timeout, double rate)
+{
+ /*double start_time = SIMIX_get_clock();*/
+ void *smx_net_data;
+ smx_host_t my_host = SIMIX_host_self();
+ smx_comm_t comm;
+ smx_mutex_t rvp_comm_mutex = SIMIX_rvpoint_get_comm_mutex(rdv);
+
+ /* Lock the rendez-vous point */
+ SIMIX_mutex_lock(rvp_comm_mutex);
+ /* Copy the message to the network */
+ /*FIXME here the MC should allocate the space from the network storage area */
+ smx_net_data = xbt_malloc(size);
+ memcpy(smx_net_data, data, size);
+
+ /* Check if there is already a receive waiting in the rendez-vous point */
+ if((comm = SIMIX_rvpoint_get_receiver(rdv)) != NULL){
+ comm->src_host = my_host;
+ comm->data = smx_net_data;
+ comm->data_size = size;
+ comm->rate = rate;
+ SIMIX_communication_use(comm);
+
+ /* Unlock the rendez-vous point and start the communication action.*/
+ /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
+ SIMIX_mutex_unlock(rvp_comm_mutex);
+ SIMIX_communication_start(comm);
+
+ /* Nobody is at the rendez-vous point, so push the comm action into it */
+ }else{
+ comm = SIMIX_communication_new(my_host, NULL, rdv);
+ comm->data = smx_net_data;
+ comm->data_size = size;
+ comm->rate = rate;
+ SIMIX_communication_use(comm);
+ SIMIX_rvpoint_push(rdv, comm);
+
+ /* Wait for communication completion */
+ /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
+ /* FIXME: add timeout checking stuff */
+ SIMIX_mutex_unlock (rvp_comm_mutex);
+ __SIMIX_cond_wait(comm->cond);
+ }
+
+ /* Check for errors */
+ if (SIMIX_host_get_state(comm->dst_host) == 0){
+ THROW1(host_error, 0, "Destination host %s failed", comm->dst_host->name);
+ }else if(SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED){
+ THROW0(network_error, 0, "Link failure");
+ }
+
+ SIMIX_communication_destroy(comm);
+ return;
+}
+
+void SIMIX_network_recv(smx_rvpoint_t rdv, void **data, size_t *size, double timeout)
+{
+ /*double start_time = SIMIX_get_clock();*/
+ smx_comm_t comm;
+ smx_host_t my_host = SIMIX_host_self();
+ smx_mutex_t rvp_comm_mutex = SIMIX_rvpoint_get_comm_mutex(rdv);
+
+ /* Lock the rendez-vous point */
+ SIMIX_mutex_lock(rvp_comm_mutex);
+
+ /* Check if there is already a send waiting in the rendez-vous point */
+ if((comm = SIMIX_rvpoint_get_sender(rdv)) != NULL){
+ comm->dst_host = my_host;
+ comm->dest_buff = data;
+ SIMIX_communication_use(comm);
+
+ /* Unlock the rendez-vous point and start the communication action.*/
+ /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
+ SIMIX_mutex_unlock(rvp_comm_mutex);
+ SIMIX_communication_start(comm);
+
+ /* Nobody is at the rendez-vous point, so push the comm action into it */
+ }else{
+ comm = SIMIX_communication_new(NULL, my_host, rdv);
+ comm->dest_buff = data;
+ SIMIX_communication_use(comm);
+ SIMIX_rvpoint_push(rdv, comm);
+
+ /* Wait for communication completion */
+ /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
+ /* FIXME: add timeout checking stuff*/
+ SIMIX_mutex_unlock (rvp_comm_mutex);
+ __SIMIX_cond_wait(comm->cond);
+ }
+
+ /* Check for errors */
+ if (SIMIX_host_get_state(comm->src_host) == 0){
+ THROW1(host_error, 0, "Source host %s failed", comm->src_host->name);
+ }else if(SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED){
+ THROW0(network_error, 0, "Link failure");
+ }
+
+ /* We are OK, let's copy the message to receiver's buffer */
+ *size = *size < comm->data_size ? *size : comm->data_size;
+ memcpy(*data, comm->data, *size);
+
+ SIMIX_communication_destroy(comm);
+ return;
+}
+
+/******************************************************************************/
+/* Asynchronous Communication */
+/******************************************************************************/
+
+/*
+void SIMIX_network_wait(smx_action_t comm, double timeout)
+{
+ if (timeout > 0)
+ SIMIX_cond_wait_timeout(rvp_cond, rvp_comm_mutex, timeout - start_time);
+ else
+ SIMIX_cond_wait(rvp_cond, rvp_comm_mutex);
+
+}
+
+
+XBT_PUBLIC(int) SIMIX_network_test(smx_action_t comm)
+{
+ if(SIMIX_action_get_state (comm) == SURF_ACTION_DONE){
+ memcpy(comm->data
+
+ return SIMIX_action_get_state (comm) == SURF_ACTION_DONE ? TRUE : FALSE;
+}*/
+
+
+
+
+
+
+