XBT_PUBLIC(smx_rdv_t) SIMIX_rdv_create(const char *name);
XBT_PUBLIC(void) SIMIX_rdv_destroy(smx_rdv_t rvp);
XBT_PUBLIC(void) SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate,
- double timeout, void *data, size_t data_size,
- int (filter)(smx_comm_t, void *), void *arg);
+ double timeout, void *data, size_t data_size);
XBT_PUBLIC(void) SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *data,
- size_t *data_size, int (filter)(smx_comm_t, void *), void *arg);
+ size_t *data_size);
XBT_PUBLIC(void) SIMIX_network_wait(smx_action_t comm);
XBT_PUBLIC(int) SIMIX_network_test(smx_action_t comm);
XBT_PUBLIC(int) SIMIX_communication_isSend(smx_comm_t comm);
XBT_PUBLIC(int) SIMIX_communication_isRecv(smx_comm_t comm);
-/* FIXME: Filter function */
-int comm_filter_get(smx_comm_t comm, void *arg);
-int comm_filter_put(smx_comm_t comm, void *arg);
-
/* These should be private to SIMIX */
-smx_comm_t SIMIX_communication_new(smx_comm_type_t type, smx_rdv_t rdv);
+smx_comm_t SIMIX_communication_new(smx_comm_type_t type);
void SIMIX_communication_destroy(smx_comm_t comm);
static inline void SIMIX_communication_use(smx_comm_t comm);
static inline void SIMIX_communication_wait_for_completion(smx_comm_t comm, double timeout);
-smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rdv, int (filter)(smx_comm_t, void *), void *arg);
+smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rdv, smx_comm_type_t type);
static inline void SIMIX_rdv_push(smx_rdv_t rdv, smx_comm_t comm);
static inline void SIMIX_rdv_remove(smx_rdv_t rdv, smx_comm_t comm);
static inline smx_cond_t SIMIX_rdv_get_cond(smx_rdv_t rdv);
static inline void SIMIX_rdv_push(smx_rdv_t rdv, smx_comm_t comm)
{
xbt_fifo_push(rdv->comm_fifo, comm);
+ comm->rdv = rdv;
}
/**
static inline void SIMIX_rdv_remove(smx_rdv_t rdv, smx_comm_t comm)
{
xbt_fifo_remove(rdv->comm_fifo, comm);
+ comm->rdv = NULL;
}
-
/**
* \brief Checks if there is a communication request queued in a rendez-vous matching our needs
- * \param rdv The rendez-vous with the queue
- * \param look_for_src boolean. True: we are receiver looking for sender; False: other way round
- * \return The communication request if found, or a newly created one otherwise.
+ * \param type The type of communication we are looking for (comm_send, comm_recv)
+ * \return The communication request if found, NULL otherwise.
*/
-smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rdv, int (filter)(smx_comm_t, void*), void *arg) {
- smx_comm_t comm;
- xbt_fifo_item_t item;
-
- /* Traverse the rendez-vous queue looking for a comm request matching the
- filter conditions. If found return it and remove it from the list. */
- xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_comm_t) {
- if(filter(comm, arg)){
- SIMIX_communication_use(comm);
- xbt_fifo_remove_item(rdv->comm_fifo, item);
- DEBUG1("Communication request found! %p", comm);
- return comm;
- }
+smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rdv, smx_comm_type_t type)
+{
+ smx_comm_t comm = (smx_comm_t)xbt_fifo_get_item_content(
+ xbt_fifo_get_first_item(rdv->comm_fifo));
+
+ if(comm && comm->type == type){
+ DEBUG0("Communication request found!");
+ xbt_fifo_shift(rdv->comm_fifo);
+ SIMIX_communication_use(comm);
+ return comm;
}
/* no relevant request found. Return NULL */
/**
* \brief Creates a new communication request
- * \param sender The process starting the communication (by send)
- * \param receiver The process receiving the communication (by recv)
- * \return the communication request
+ * \param type The type of communication (comm_send, comm_recv)
+ * \return The new communication request
*/
-smx_comm_t SIMIX_communication_new(smx_comm_type_t type, smx_rdv_t rdv)
+smx_comm_t SIMIX_communication_new(smx_comm_type_t type)
{
/* alloc structures */
smx_comm_t comm = xbt_new0(s_smx_comm_t, 1);
comm->type = type;
comm->cond = SIMIX_cond_init();
- comm->rdv = rdv;
comm->refcount = 1;
return comm;
static inline void SIMIX_communication_start(smx_comm_t comm)
{
/* If both the sender and the receiver are already there, start the communication */
- if(comm->src_host != NULL && comm->dst_host != NULL){
+ if(comm->src_proc && comm->dst_proc){
DEBUG1("Starting communication %p", comm);
- comm->act = SIMIX_action_communicate(comm->src_host, comm->dst_host, NULL,
+ comm->act = SIMIX_action_communicate(comm->src_proc->smx_host,
+ comm->dst_proc->smx_host, NULL,
comm->task_size, comm->rate);
+ /* If any of the process is suspend, create the action but stop its execution,
+ it will be restart when the sender process resume */
+ if(SIMIX_process_is_suspended(comm->src_proc) ||
+ SIMIX_process_is_suspended(comm->dst_proc)) {
+ SIMIX_action_set_priority(comm->act, 0);
+ }
+
/* Add the communication as user data of the action */
comm->act->data = comm;
memcpy(comm->dst_buff, comm->src_buff, dst_buff_size);
DEBUG4("Copying comm %p data from %s -> %s (%zu bytes)",
- comm, comm->src_host->name, comm->dst_host->name, dst_buff_size);
-}
-
-/* FIXME: move to some other place */
-int comm_filter_get(smx_comm_t comm, void *arg)
-{
- if(comm->type == comm_send){
- if(arg && comm->src_host != (smx_host_t)arg)
- return FALSE;
- else
- return TRUE;
- }else{
- return FALSE;
- }
+ comm, comm->src_proc->smx_host->name, comm->dst_proc->smx_host->name,
+ dst_buff_size);
}
-int comm_filter_put(smx_comm_t comm, void *arg)
-{
- return comm->type == comm_recv ? TRUE : FALSE;
-}
/******************************************************************************/
/* Synchronous Communication */
/******************************************************************************/
* - network_error if network failed or peer issued a timeout
*/
void SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate,
- double timeout, void *data, size_t data_size,
- int (filter)(smx_comm_t, void *), void *arg)
+ double timeout, void *data, size_t data_size)
{
smx_comm_t comm;
/* Look for communication request matching our needs.
If it is not found then create it and push it into the rendez-vous point */
- comm = SIMIX_rdv_get_request(rdv, filter, arg);
+ comm = SIMIX_rdv_get_request(rdv, comm_recv);
- if(comm == NULL){
- comm = SIMIX_communication_new(comm_send, rdv);
+ if(!comm){
+ comm = SIMIX_communication_new(comm_send);
SIMIX_rdv_push(rdv, comm);
}
/* Setup the communication request */
- comm->src_host = SIMIX_host_self();
+ comm->src_proc = SIMIX_process_self();
comm->task_size = task_size;
comm->rate = rate;
comm->src_buff = data;
SIMIX_communication_start(comm);
/* Wait for communication completion */
- /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
SIMIX_communication_wait_for_completion(comm, timeout);
SIMIX_communication_destroy(comm);
* - timeout_error if communication reached the timeout specified
* - network_error if network failed or peer issued a timeout
*/
-void SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *data,
- size_t *data_size, int (filter)(smx_comm_t, void *), void *arg)
+void SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *data, size_t *data_size)
{
smx_comm_t comm;
/* Look for communication request matching our needs.
If it is not found then create it and push it into the rendez-vous point */
- comm = SIMIX_rdv_get_request(rdv, filter, arg);
+ comm = SIMIX_rdv_get_request(rdv, comm_send);
- if(comm == NULL){
- comm = SIMIX_communication_new(comm_recv, rdv);
+ if(!comm){
+ comm = SIMIX_communication_new(comm_recv);
SIMIX_rdv_push(rdv, comm);
}
/* Setup communication request */
- comm->dst_host = SIMIX_host_self();
+ comm->dst_proc = SIMIX_process_self();
comm->dst_buff = data;
comm->dst_buff_size = data_size;
SIMIX_communication_start(comm);
/* Wait for communication completion */
- /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
SIMIX_communication_wait_for_completion(comm, timeout);
SIMIX_communication_destroy(comm);
/*
void SIMIX_network_wait(smx_action_t comm, double timeout)
{
- if (timeout > 0)
- SIMIX_cond_wait_timeout(rdv_cond, rdv_comm_mutex, timeout - start_time);
- else
- SIMIX_cond_wait(rdv_cond, rdv_comm_mutex);
-
+ TO BE IMPLEMENTED
}
-
XBT_PUBLIC(int) SIMIX_network_test(smx_action_t comm)
{
- if(SIMIX_action_get_state (comm) == SURF_ACTION_DONE){
- memcpy(comm->data
-
- return SIMIX_action_get_state (comm) == SURF_ACTION_DONE ? TRUE : FALSE;
+ TO BE IMPLEMENTED
}*/