- * It takes three parameters:
- * \param channel the channel on which the agent should be
- listening. This value has to be >=0 and < than the maximal.
- number of channels fixed with MSG_set_channel_number().
- * \param PID a memory location for storing an int.
- * \param max_duration the maximum time to wait for a task before
- giving up. In the case of a reception, *\a PID will be updated
- with the PID of the first process to send a task.
- * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
- and #MSG_OK otherwise.
- */
-MSG_error_t MSG_channel_select_from(m_channel_t channel,
- double max_duration, int *PID)
-{
- m_host_t h = NULL;
- simdata_host_t h_simdata = NULL;
- xbt_fifo_item_t item;
- m_task_t t;
- int first_time = 1;
- smx_cond_t cond;
-
- xbt_assert1((channel >= 0)
- && (channel < msg_global->max_channel), "Invalid channel %d",
- channel);
- if (PID) {
- *PID = -1;
- }
-
- if (max_duration == 0.0) {
- *PID = MSG_task_probe_from(channel);
- MSG_RETURN(MSG_OK);
- } else {
- CHECK_HOST();
- h = MSG_host_self();
- h_simdata = h->simdata;
-
- DEBUG2("Probing on channel %d (%s)", channel, h->name);
- while (!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
- if (max_duration > 0) {
- if (!first_time) {
- MSG_RETURN(MSG_OK);
- }
- }
- SIMIX_mutex_lock(h_simdata->mutex);
- xbt_assert1(!(h_simdata->sleeping[channel]),
- "A process is already blocked on this channel %d",
- channel);
- cond = SIMIX_cond_init();
- h_simdata->sleeping[channel] = cond; /* I'm waiting. Wake me up when you're ready */
- if (max_duration > 0) {
- SIMIX_cond_wait_timeout(cond, h_simdata->mutex, max_duration);
- } else {
- SIMIX_cond_wait(cond, h_simdata->mutex);
- }
- SIMIX_cond_destroy(cond);
- SIMIX_mutex_unlock(h_simdata->mutex);
- if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
- MSG_RETURN(MSG_HOST_FAILURE);
- }
- h_simdata->sleeping[channel] = NULL;
- first_time = 0;
- }
- if (!item || !(t = xbt_fifo_get_item_content(item))) {
- MSG_RETURN(MSG_OK);
- }
- if (PID) {
- *PID = MSG_process_get_PID(t->simdata->sender);
- }
- MSG_RETURN(MSG_OK);
- }
-}
-
-
-/** \ingroup msg_gos_functions
-
- * \brief Return the number of tasks waiting to be received on a \a
- channel and sent by \a host.
- *
- * It takes two parameters.
- * \param channel the channel on which the agent should be
- listening. This value has to be >=0 and < than the maximal
- number of channels fixed with MSG_set_channel_number().
- * \param host the host that is to be watched.
- * \return the number of tasks waiting to be received on \a channel
- and sent by \a host.
- */
-int MSG_task_probe_from_host(int channel, m_host_t host)
-{
- xbt_fifo_item_t item;
- m_task_t t;
- int count = 0;
- m_host_t h = NULL;
-
- xbt_assert1((channel >= 0)
- && (channel < msg_global->max_channel), "Invalid channel %d",
- channel);
- CHECK_HOST();
- h = MSG_host_self();
-
- DEBUG2("Probing on channel %d (%s)", channel, h->name);
-
- xbt_fifo_foreach(h->simdata->mbox[channel], item, t, m_task_t) {
- if (t->simdata->source == host)
- count++;
- }
-
- return count;
-}
-
-/** \ingroup msg_gos_functions \brief Put a task on a channel of an
- * host (with a timeout on the waiting of the destination host) and
- * waits for the end of the transmission.
- *
- * This function is used for describing the behavior of an agent. It
- * takes four parameter.
- * \param task a #m_task_t to send on another location. This task
- will not be usable anymore when the function will return. There is
- no automatic task duplication and you have to save your parameters
- before calling this function. Tasks are unique and once it has been
- sent to another location, you should not access it anymore. You do
- not need to call MSG_task_destroy() but to avoid using, as an
- effect of inattention, this task anymore, you definitely should
- renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
- can be transfered iff it has been correctly created with
- MSG_task_create().
- * \param dest the destination of the message
- * \param channel the channel on which the agent should put this
- task. This value has to be >=0 and < than the maximal number of
- channels fixed with MSG_set_channel_number().
- * \param max_duration the maximum time to wait for a task before giving
- up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
- will not be modified
- * \return #MSG_FATAL if \a task is not properly initialized and
- #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
- this function was called was shut down. Returns
- #MSG_TRANSFER_FAILURE if the transfer could not be properly done
- (network failure, dest failure, timeout...)
- */
-MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
- m_channel_t channel,
- double max_duration)
-{
-
-
- m_process_t process = MSG_process_self();
- simdata_task_t task_simdata = NULL;
- m_host_t local_host = NULL;
- m_host_t remote_host = NULL;
- CHECK_HOST();
-
- xbt_assert1((channel >= 0)
- && (channel < msg_global->max_channel), "Invalid channel %d",
- channel);
-
- task_simdata = task->simdata;
- task_simdata->sender = process;
- task_simdata->source = MSG_process_get_host(process);
- xbt_assert0(task_simdata->using == 1,
- "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
- task_simdata->comm = NULL;
-
- task_simdata->using++;
- local_host = ((simdata_process_t) process->simdata)->m_host;
- remote_host = dest;
-
- DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d",
- task->simdata->message_size / 1000, local_host->name,
- remote_host->name, channel);
-
- SIMIX_mutex_lock(remote_host->simdata->mutex);
- xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
- mbox[channel], task);
-
-
- if (remote_host->simdata->sleeping[channel]) {
- DEBUG0("Somebody is listening. Let's wake him up!");
- SIMIX_cond_signal(remote_host->simdata->sleeping[channel]);
- }
- SIMIX_mutex_unlock(remote_host->simdata->mutex);
-
- process->simdata->put_host = dest;
- process->simdata->put_channel = channel;
- SIMIX_mutex_lock(task->simdata->mutex);
- // DEBUG4("Task sent (%g kB) from %s to %s on channel %d, waiting...", task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
-
- process->simdata->waiting_task = task;
- if (max_duration > 0) {
- xbt_ex_t e;
- double time;
- double time_elapsed;
- time = SIMIX_get_clock();
- TRY {
- /*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */
- while (1) {
- time_elapsed = SIMIX_get_clock() - time;
- SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex,
- max_duration-time_elapsed);
- if ((task->simdata->comm != NULL) &&
- (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING))
- break;
- }
- } CATCH(e) {
- if(e.category==timeout_error) {
- xbt_ex_free(e);
- /* verify if the timeout happened and the communication didn't started yet */
- if (task->simdata->comm == NULL) {
- process->simdata->waiting_task = NULL;
- xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->
- mbox[channel], task);
- if (task->simdata->receiver) {
- task->simdata->receiver->simdata->waiting_task = NULL;
- }
- task->simdata->sender = NULL;
- SIMIX_mutex_unlock(task->simdata->mutex);
- MSG_RETURN(MSG_TRANSFER_FAILURE);
- }
- } else {
- RETHROW;
- }
- }
- } else {
- while (1) {
- SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex);
- if (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING)
- break;
- }
- }
-
- DEBUG1("Action terminated %s", task->name);
- process->simdata->waiting_task = NULL;
- /* the task has already finished and the pointer must be null */
- if (task->simdata->receiver) {
- task->simdata->receiver->simdata->waiting_task = NULL;
- /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
- // task->simdata->comm = NULL;
- //task->simdata->compute = NULL;
- }
- task->simdata->sender = NULL;
- SIMIX_mutex_unlock(task->simdata->mutex);
-
- if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE) {
- MSG_RETURN(MSG_OK);
- } else if (SIMIX_host_get_state(local_host->simdata->smx_host) == 0) {
- MSG_RETURN(MSG_HOST_FAILURE);
- } else {
- MSG_RETURN(MSG_TRANSFER_FAILURE);
- }
-}
-
-/** \ingroup msg_gos_functions
- * \brief Put a task on a channel of an host and waits for the end of the
- * transmission.
- *
- * This function is used for describing the behavior of an agent. It
- * takes three parameter.
- * \param task a #m_task_t to send on another location. This task
- will not be usable anymore when the function will return. There is
- no automatic task duplication and you have to save your parameters
- before calling this function. Tasks are unique and once it has been
- sent to another location, you should not access it anymore. You do
- not need to call MSG_task_destroy() but to avoid using, as an
- effect of inattention, this task anymore, you definitely should
- renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
- can be transfered iff it has been correctly created with
- MSG_task_create().
- * \param dest the destination of the message
- * \param channel the channel on which the agent should put this
- task. This value has to be >=0 and < than the maximal number of
- channels fixed with MSG_set_channel_number().
- * \return #MSG_FATAL if \a task is not properly initialized and
- * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
- * this function was called was shut down. Returns
- * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
- * (network failure, dest failure)
- */
-MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
-{
- return MSG_task_put_with_timeout(task, dest, channel, -1.0);
-}
-
-/** \ingroup msg_gos_functions
- * \brief Does exactly the same as MSG_task_put but with a bounded transmition
- * rate.
- *
- * \sa MSG_task_put