XBT_PUBLIC(smx_rdv_t) SIMIX_rdv_create(const char *name);
XBT_PUBLIC(void) SIMIX_rdv_destroy(smx_rdv_t rvp);
XBT_PUBLIC(void) SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate,
- double timeout, void *data, size_t data_size);
-XBT_PUBLIC(void) SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *data,
- size_t *data_size);
+ double timeout, void *src_buff,
+ size_t src_buff_size, smx_comm_t *comm, void *data);
+XBT_PUBLIC(void) SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *dst_buff,
+ size_t *dst_buff_size, smx_comm_t *comm);
XBT_PUBLIC(void) SIMIX_network_wait(smx_action_t comm);
XBT_PUBLIC(int) SIMIX_network_test(smx_action_t comm);
-XBT_PUBLIC(int) SIMIX_communication_isSend(smx_comm_t comm);
-XBT_PUBLIC(int) SIMIX_communication_isRecv(smx_comm_t comm);
+XBT_PUBLIC(void) SIMIX_communication_cancel(smx_comm_t comm);
+XBT_PUBLIC(double) SIMIX_communication_get_remains(smx_comm_t comm);
+XBT_PUBLIC(int) SIMIX_rdv_get_count_waiting_comm(smx_rdv_t rdv, smx_host_t host);
+XBT_PUBLIC(smx_comm_t) SIMIX_rdv_get_head(smx_rdv_t rdv);
+XBT_PUBLIC(void *) SIMIX_communication_get_data(smx_comm_t comm);
/* These should be private to SIMIX */
smx_comm_t SIMIX_communication_new(smx_comm_type_t type);
process->name);
mailbox = MSG_mailbox_new(alias);
- MSG_mailbox_set_hostname(mailbox,
- process->simdata->m_host->simdata->smx_host->name);
-
-
+
}
JNIEXPORT void JNICALL
/* the key of the mailbox (in this case) is build from the name of the host and the channel number */
simdata->mailboxes[i] = MSG_mailbox_create(alias);
- MSG_mailbox_set_hostname(simdata->mailboxes[i], name);
memset(alias, 0, MAX_ALIAS_NAME + 1);
}
DEBUG3("Killing %s(%d) on %s",
process->name, p_simdata->PID, p_simdata->m_host->name);
- if (p_simdata->waiting_action) {
- DEBUG1("Canceling waiting task %s",
- SIMIX_action_get_name(p_simdata->waiting_action));
- SIMIX_action_cancel(p_simdata->waiting_action);
+ if (p_simdata->waiting_task) {
+ DEBUG1("Canceling waiting task %s", p_simdata->waiting_task->name);
+ if (p_simdata->waiting_task->simdata->compute) {
+ SIMIX_action_cancel(p_simdata->waiting_task->simdata->compute);
+ } else if (p_simdata->waiting_task->simdata->comm) {
+ SIMIX_communication_cancel(p_simdata->waiting_task->simdata->comm);
+ }
}
xbt_fifo_remove(msg_global->process_list, process);
*/
XBT_PUBLIC(void) MSG_mailbox_set_cond(msg_mailbox_t mailbox, smx_cond_t cond);
-/*! \brief MSG_mailbox_get_hostname - get the name of the host owned a mailbox.
- *
- * The function MSG_mailbox_get_hostname returns name of the host
- * owned the mailbox specified by the parameter mailbox.
- *
- * \param mailbox The mailbox to get the name of the host.
- *
- * \return The name of the host owned the mailbox specified by the parameter mailbox.
- */
-XBT_PUBLIC(const char *) MSG_mailbox_get_hostname(msg_mailbox_t mailbox);
-
-/*! \brief MSG_mailbox_set_hostname - set the name of the host owned a mailbox.
- *
- * The function MSG_mailbox_set_hostname sets the name of the host
- * owned the mailbox specified by the parameter mailbox.
- *
- * \param mailbox The mailbox to set the name of the host.
- * \param hostname The name of the owner of the mailbox.
- *
- */
-XBT_PUBLIC(void)
-MSG_mailbox_set_hostname(msg_mailbox_t mailbox, const char *hostname);
-
-
/*! \brief MSG_mailbox_is_empty - test if a mailbox is empty.
*
* The function MSG_mailbox_is_empty tests if a mailbox is empty
*/
XBT_PUBLIC(int) MSG_mailbox_is_empty(msg_mailbox_t mailbox);
-/*! \brief MSG_mailbox_remove - remove a task from a mailbox.
- *
- * The MSG_mailbox_remove removes a task from a specified mailbox.
- *
- * \param mailbox The mailbox concerned by this operation.
- * \param task The task to remove from the mailbox.
- */
-XBT_PUBLIC(void) MSG_mailbox_remove(msg_mailbox_t mailbox, m_task_t task);
-
/*! \brief MSG_mailbox_get_head - get the task at the head of a mailbox.
*
* The MSG_mailbox_get_head returns the task at the head of the mailbox.
XBT_PUBLIC(m_task_t)
MSG_mailbox_get_head(msg_mailbox_t mailbox);
-/*! \brief MSG_mailbox_pop_head - get the task at the head of a mailbox
- * and remove it from it.
- *
- * The MSG_mailbox_pop_head returns the task at the head of the mailbox
- * and remove it from it.
- *
- * \param mailbox The mailbox concerned by the operation.
- *
- * \return The task at the head of the mailbox.
- */
-XBT_PUBLIC(m_task_t)
- MSG_mailbox_pop_head(msg_mailbox_t mailbox);
-
-/*! \brief MSG_mailbox_get_first_host_task - get the first msg task
- * of a specified mailbox, sended by a process of a specified host.
- *
- * \param mailbox The mailbox concerned by the operation.
- * \param host The msg host of the process that has sended the
- * task.
- *
- * \return The first task in the mailbox specified by the
- * parameter mailbox and sended by a process located
- * on the host specified by the parameter host.
- */
-XBT_PUBLIC(m_task_t)
- MSG_mailbox_get_first_host_task(msg_mailbox_t mailbox, m_host_t host);
-
/*! \brief MSG_mailbox_get_count_host_waiting_tasks - Return the number of tasks
waiting to be received in a mailbox and sent by a host.
*
{
msg_mailbox_t mailbox = xbt_new0(s_msg_mailbox_t, 1);
- mailbox->tasks = xbt_fifo_new();
mailbox->cond = NULL;
mailbox->alias = alias ? xbt_strdup(alias) : NULL;
- mailbox->hostname = NULL;
mailbox->rdv = SIMIX_rdv_create(alias);
return mailbox;
{
msg_mailbox_t _mailbox = (msg_mailbox_t) mailbox;
- if (_mailbox->hostname)
- free(_mailbox->hostname);
-
- xbt_fifo_free(_mailbox->tasks);
free(_mailbox->alias);
SIMIX_rdv_destroy(_mailbox->rdv);
return mailbox->cond;
}
-void MSG_mailbox_remove(msg_mailbox_t mailbox, m_task_t task)
-{
- xbt_fifo_remove(mailbox->tasks, task);
-}
-
int MSG_mailbox_is_empty(msg_mailbox_t mailbox)
{
- return (NULL == xbt_fifo_get_first_item(mailbox->tasks));
-}
-
-m_task_t MSG_mailbox_pop_head(msg_mailbox_t mailbox)
-{
- return (m_task_t) xbt_fifo_shift(mailbox->tasks);
+ return (NULL == SIMIX_rdv_get_head(mailbox->rdv));
}
m_task_t MSG_mailbox_get_head(msg_mailbox_t mailbox)
{
- xbt_fifo_item_t item;
-
- if (!(item = xbt_fifo_get_first_item(mailbox->tasks)))
- return NULL;
-
- return (m_task_t) xbt_fifo_get_item_content(item);
-}
-
-
-m_task_t MSG_mailbox_get_first_host_task(msg_mailbox_t mailbox, m_host_t host)
-{
- m_task_t task = NULL;
- xbt_fifo_item_t item = NULL;
-
- xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t)
- if (task->simdata->source == host) {
- xbt_fifo_remove_item(mailbox->tasks, item);
- return task;
- }
+ smx_comm_t comm = SIMIX_rdv_get_head(mailbox->rdv);
- return NULL;
+ if(!comm)
+ return NULL;
+
+ return (m_task_t)SIMIX_communication_get_data(comm);
}
int
MSG_mailbox_get_count_host_waiting_tasks(msg_mailbox_t mailbox, m_host_t host)
{
- m_task_t task = NULL;
- xbt_fifo_item_t item = NULL;
- int count = 0;
-
- xbt_fifo_foreach(mailbox->tasks, item, task, m_task_t) {
- if (task->simdata->source == host)
- count++;
- }
-
- return count;
+ return SIMIX_rdv_get_count_waiting_comm (mailbox->rdv, host->simdata->smx_host);
}
void MSG_mailbox_set_cond(msg_mailbox_t mailbox, smx_cond_t cond)
return mailbox->alias;
}
-const char *MSG_mailbox_get_hostname(msg_mailbox_t mailbox)
-{
- return mailbox->hostname;
-}
-
-void MSG_mailbox_set_hostname(msg_mailbox_t mailbox, const char *hostname)
-{
- mailbox->hostname = xbt_strdup(hostname);
-}
-
msg_mailbox_t MSG_mailbox_get_by_alias(const char *alias)
{
msg_mailbox_t mailbox = xbt_dict_get_or_null(msg_mailboxes, alias);
- if (!mailbox) {
+ if (!mailbox)
mailbox = MSG_mailbox_new(alias);
- MSG_mailbox_set_hostname(mailbox, MSG_host_self()->name);
- }
return mailbox;
}
}
MSG_error_t
-MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t *task,
- m_host_t host, double timeout)
+MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t *task, m_host_t host,
+ double timeout)
{
xbt_ex_t e;
- MSG_error_t ret;
- smx_host_t smx_host;
- size_t task_size = sizeof(void*);
+ MSG_error_t ret = MSG_OK;
+ size_t buff_size = 0;
+ smx_comm_t comm;
CHECK_HOST();
+ /* Kept for compatibility with older implementation */
+ xbt_assert1(!MSG_mailbox_get_cond(mailbox),
+ "A process is already blocked on this channel %s",
+ MSG_mailbox_get_alias(mailbox));
+
/* Sanity check */
xbt_assert0(task, "Null pointer for the task storage");
CRITICAL0
("MSG_task_get() was asked to write in a non empty task struct.");
- smx_host = host ? host->simdata->smx_host : NULL;
-
+ /* We no loger support getting a task from a specific host */
+ if(host)
+ THROW_UNIMPLEMENTED;
+
+ /* Try to receive it by calling SIMIX network layer */
TRY{
- SIMIX_network_recv(mailbox->rdv, timeout, task, &task_size);
+ SIMIX_network_recv(mailbox->rdv, timeout, NULL, &buff_size, &comm);
}
CATCH(e){
switch(e.category){
ret = MSG_TRANSFER_FAILURE;
break;
default:
- ret = MSG_OK;
- RETHROW;
- break;
- /*xbt_die("Unhandled SIMIX network exception");*/
+ xbt_die("Unhandled SIMIX network exception");
}
- xbt_ex_free(e);
- MSG_RETURN(ret);
+ xbt_ex_free(e);
}
-
- MSG_RETURN (MSG_OK);
+
+ *task = SIMIX_communication_get_data(comm);
+
+ /* If the sender didn't decremented the refcount so far then do it */
+ if (*task && (*task)->simdata->refcount > 1)
+ (*task)->simdata->refcount--;
+
+ MSG_RETURN(ret);
}
MSG_error_t
double timeout)
{
xbt_ex_t e;
- MSG_error_t ret;
- m_process_t process = MSG_process_self();
- const char *hostname;
+ MSG_error_t ret = MSG_OK;
simdata_task_t t_simdata = NULL;
- m_host_t local_host = NULL;
- m_host_t remote_host = NULL;
-
+ m_process_t process = MSG_process_self();
+
CHECK_HOST();
+ /* Prepare the task to send */
t_simdata = task->simdata;
t_simdata->sender = process;
- t_simdata->source = MSG_process_get_host(process);
+ t_simdata->source = MSG_host_self();
xbt_assert0(t_simdata->refcount == 1,
"This task is still being used somewhere else. You cannot send it now. Go fix your code!");
- t_simdata->comm = NULL;
-
- /*t_simdata->refcount++;*/
- local_host = ((simdata_process_t) process->simdata)->m_host;
+ t_simdata->refcount++;
msg_global->sent_msg++;
- /* get the host name containing the mailbox */
- hostname = MSG_mailbox_get_hostname(mailbox);
-
- remote_host = MSG_get_host_by_name(hostname);
-
- if (!remote_host)
- THROW1(not_found_error, 0, "Host %s not fount", hostname);
-
- DEBUG4("Trying to send a task (%g kB) from %s to %s on the channel %s",
- t_simdata->message_size / 1000, local_host->name,
- remote_host->name, MSG_mailbox_get_alias(mailbox));
+ process->simdata->waiting_task = task;
+ /* Try to send it by calling SIMIX network layer */
TRY{
+ /* Kept for semantical compatibility with older implementation */
+ if(mailbox->cond)
+ SIMIX_cond_signal(mailbox->cond);
+
SIMIX_network_send(mailbox->rdv, t_simdata->message_size, t_simdata->rate,
- timeout, &task, sizeof(void *));
+ timeout, NULL, 0, &t_simdata->comm, task);
}
CATCH(e){
ret = MSG_TRANSFER_FAILURE;
break;
default:
- ret = MSG_OK;
- RETHROW;
- break;
xbt_die("Unhandled SIMIX network exception");
}
xbt_ex_free(e);
- MSG_RETURN(ret);
+ /* If the receiver end didn't decremented the refcount so far then do it */
+ if (t_simdata->refcount > 1)
+ t_simdata->refcount--;
}
- /*t_simdata->refcount--;*/
- MSG_RETURN (MSG_OK);
+ process->simdata->waiting_task = NULL;
+
+ /* If the receiver end didn't decremented the refcount so far then do it */
+ if (t_simdata->refcount > 1)
+ t_simdata->refcount--;
+
+ MSG_RETURN(ret);
}
/* this structure represents a mailbox */
typedef struct s_msg_mailbox {
char *alias; /* the key of the mailbox in the global dictionary */
- xbt_fifo_t tasks; /* the list of the tasks in the mailbox */
smx_cond_t cond; /* the condition on the mailbox */
- char *hostname; /* the name of the host containing the mailbox */
smx_rdv_t rdv; /* SIMIX rendez-vous point */
} s_msg_mailbox_t;
typedef struct simdata_task {
smx_action_t compute; /* SURF modeling of computation */
- smx_action_t comm; /* SURF modeling of communication */
+ smx_comm_t comm; /* SIMIX communication */
double message_size; /* Data size */
double computation_amount; /* Computation size */
smx_cond_t cond;
action = task->simdata->compute;
if (action)
SIMIX_action_destroy(action);
- action = task->simdata->comm;
- if (action)
- SIMIX_action_destroy(action);
+
/* parallel tasks only */
if (task->simdata->host_list)
xbt_free(task->simdata->host_list);
return MSG_OK;
}
if (task->simdata->comm) {
- SIMIX_action_cancel(task->simdata->comm);
+ SIMIX_communication_cancel(task->simdata->comm);
return MSG_OK;
}
xbt_assert0((task != NULL)
&& (task->simdata != NULL), "Invalid parameter");
- return SIMIX_action_get_remains(task->simdata->comm);
+ return SIMIX_communication_get_remains(task->simdata->comm);
}
/** \ingroup m_task_management
size_t src_buff_size;
void *dst_buff;
size_t *dst_buff_size;
+ void *data; /* User data associated to communication */
} s_smx_comm_t;
/********************************* Action *************************************/
return NULL;
}
+/**
+ * \brief counts the number of communication requests of a given host pending
+ * on a rendez-vous point
+ * \param rdv The rendez-vous point
+ * \param host The host to be counted
+ * \return The number of comm request pending in the rdv
+ */
+int
+SIMIX_rdv_get_count_waiting_comm(smx_rdv_t rdv, smx_host_t host)
+{
+ smx_comm_t comm = NULL;
+ xbt_fifo_item_t item = NULL;
+ int count = 0;
+
+ xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_comm_t) {
+ if (comm->src_proc->smx_host == host)
+ count++;
+ }
+
+ return count;
+}
+
+/**
+ * \brief returns the communication at the head of the rendez-vous
+ * \param rdv The rendez-vous point
+ * \return The communication or NULL if empty
+ */
+smx_comm_t SIMIX_rdv_get_head(smx_rdv_t rdv)
+{
+ return (smx_comm_t)xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
+}
+
+
/******************************************************************************/
/* Communication Requests */
/******************************************************************************/
if(e.category == timeout_error){
DEBUG1("Communication timeout! %p", comm);
if(comm->act && SIMIX_action_get_state(comm->act) == SURF_ACTION_RUNNING)
- SIMIX_action_cancel(comm->act);
+ SIMIX_communication_cancel(comm);
else
SIMIX_rdv_remove(comm->rdv, comm);
SIMIX_unregister_action_to_condition(comm->act, comm->cond);
}
+void SIMIX_communication_cancel(smx_comm_t comm)
+{
+ SIMIX_action_cancel(comm->act);
+}
+
+double SIMIX_communication_get_remains(smx_comm_t comm)
+{
+ return SIMIX_action_get_remains(comm->act);
+}
+
/**
* \brief Copy the communication data from the sender's buffer to the receiver's one
* \param comm The communication
{
size_t src_buff_size = comm->src_buff_size;
size_t dst_buff_size = *comm->dst_buff_size;
-
+
/* Copy at most dst_buff_size bytes of the message to receiver's buffer */
dst_buff_size = MIN(dst_buff_size, src_buff_size);
-
+
/* Update the receiver's buffer size to the copied amount */
*comm->dst_buff_size = dst_buff_size;
+ if(dst_buff_size == 0)
+ return;
+
memcpy(comm->dst_buff, comm->src_buff, dst_buff_size);
DEBUG4("Copying comm %p data from %s -> %s (%zu bytes)",
dst_buff_size);
}
+/**
+ * \brief Return the user data associated to the communication
+ * \param comm The communication
+ * \return the user data
+ */
+void *SIMIX_communication_get_data(smx_comm_t comm)
+{
+ return comm->data;
+}
+
/******************************************************************************/
/* Synchronous Communication */
/******************************************************************************/
* - network_error if network failed or peer issued a timeout
*/
void SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate,
- double timeout, void *data, size_t data_size)
+ double timeout, void *src_buff, size_t src_buff_size,
+ smx_comm_t *comm_ref, void *data)
{
smx_comm_t comm;
SIMIX_rdv_push(rdv, comm);
}
+ /* Update the communication reference with the comm to be used */
+ *comm_ref = comm;
+
/* Setup the communication request */
comm->src_proc = SIMIX_process_self();
comm->task_size = task_size;
comm->rate = rate;
- comm->src_buff = data;
- comm->src_buff_size = data_size;
+ comm->src_buff = src_buff;
+ comm->src_buff_size = src_buff_size;
+ comm->data = data;
SIMIX_communication_start(comm);
* - timeout_error if communication reached the timeout specified
* - network_error if network failed or peer issued a timeout
*/
-void SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *data, size_t *data_size)
+void SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *dst_buff,
+ size_t *dst_buff_size, smx_comm_t *comm_ref)
{
smx_comm_t comm;
SIMIX_rdv_push(rdv, comm);
}
+ /* Update the communication reference with the comm to be used */
+ *comm_ref = comm;
+
/* Setup communication request */
comm->dst_proc = SIMIX_process_self();
- comm->dst_buff = data;
- comm->dst_buff_size = data_size;
+ comm->dst_buff = dst_buff;
+ comm->dst_buff_size = dst_buff_size;
SIMIX_communication_start(comm);