X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/42530ed329ac001f1de9b9303549d5ea1972c630..76a93840692d8668298e016732d899d26a6b5b12:/src/msg/gos.c diff --git a/src/msg/gos.c b/src/msg/gos.c index ed5e70291f..ba0d5be5a1 100644 --- a/src/msg/gos.c +++ b/src/msg/gos.c @@ -21,7 +21,7 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg, * \brief Return the last value returned by a MSG function (except * MSG_get_errno...). */ -MSG_error_t +MSG_error_t MSG_get_errno(void) { return PROCESS_GET_ERRNO(); @@ -37,33 +37,37 @@ MSG_get_errno(void) * \return #MSG_FATAL if \a task is not properly initialized and * #MSG_OK otherwise. */ -MSG_error_t +MSG_error_t MSG_task_execute(m_task_t task) { simdata_task_t simdata = NULL; m_process_t self = MSG_process_self(); + e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM; CHECK_HOST(); - + simdata = task->simdata; - xbt_assert0((!simdata->compute) && (task->simdata->using == 1),"This task is executed somewhere else. Go fix your code!"); - + xbt_assert0((!simdata->compute) && (task->simdata->refcount == 1),"This task is executed somewhere else. Go fix your code!"); + DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name); - - simdata->using++; + + simdata->refcount ++; SIMIX_mutex_lock(simdata->mutex); simdata->compute = SIMIX_action_execute(SIMIX_host_self(), task->name, simdata->computation_amount); SIMIX_action_set_priority(simdata->compute, simdata->priority); - + self->simdata->waiting_task = task; SIMIX_register_action_to_condition(simdata->compute, simdata->cond); - SIMIX_cond_wait(simdata->cond, simdata->mutex); + do { + SIMIX_cond_wait(simdata->cond, simdata->mutex); + state = SIMIX_action_get_state(simdata->compute); + } while(state==SURF_ACTION_READY || state==SURF_ACTION_RUNNING); SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond); self->simdata->waiting_task = NULL; - + SIMIX_mutex_unlock(simdata->mutex); - simdata->using--; - - if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) + simdata->refcount --; + + if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) { /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */ SIMIX_action_destroy(task->simdata->compute); @@ -71,16 +75,16 @@ MSG_task_execute(m_task_t task) simdata->comm = NULL; simdata->compute = NULL; MSG_RETURN(MSG_OK); - } - else if(SIMIX_host_get_state(SIMIX_host_self()) == 0) + } + else if(SIMIX_host_get_state(SIMIX_host_self()) == 0) { /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */ SIMIX_action_destroy(task->simdata->compute); simdata->comm = NULL; simdata->compute = NULL; MSG_RETURN(MSG_HOST_FAILURE); - } - else + } + else { /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */ SIMIX_action_destroy(task->simdata->compute); @@ -93,7 +97,7 @@ MSG_task_execute(m_task_t task) /** \ingroup m_task_management * \brief Creates a new #m_task_t (a parallel one....). * - * A constructor for #m_task_t taking six arguments and returning the + * A constructor for #m_task_t taking six arguments and returning the corresponding object. * \param name a name for the object. It is for user-level information and can be NULL. @@ -109,18 +113,18 @@ MSG_task_execute(m_task_t task) * \see m_task_t * \return The new corresponding object. */ -m_task_t +m_task_t MSG_parallel_task_create(const char *name,int host_nb, const m_host_t * host_list, double *computation_amount, double *communication_amount, void *data) { int i; simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1); m_task_t task = xbt_new0(s_m_task_t, 1); task->simdata = simdata; - + /* Task structure */ task->name = xbt_strdup(name); task->data = data; - + /* Simulator Data */ simdata->computation_amount = 0; simdata->message_size = 0; @@ -129,54 +133,59 @@ MSG_parallel_task_create(const char *name,int host_nb, const m_host_t * host_lis simdata->compute = NULL; simdata->comm = NULL; simdata->rate = -1.0; - simdata->using = 1; + simdata->refcount = 1; simdata->sender = NULL; simdata->receiver = NULL; simdata->source = NULL; - + simdata->host_nb = host_nb; simdata->host_list = xbt_new0(smx_host_t, host_nb); simdata->comp_amount = computation_amount; simdata->comm_amount = communication_amount; - + for (i = 0; i < host_nb; i++) simdata->host_list[i] = host_list[i]->simdata->smx_host; - + return task; } -MSG_error_t +MSG_error_t MSG_parallel_task_execute(m_task_t task) { simdata_task_t simdata = NULL; m_process_t self = MSG_process_self(); + e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM; CHECK_HOST(); - + simdata = task->simdata; - - xbt_assert0((!simdata->compute) && (task->simdata->using == 1),"This task is executed somewhere else. Go fix your code!"); - + + xbt_assert0((!simdata->compute) && (task->simdata->refcount == 1),"This task is executed somewhere else. Go fix your code!"); + xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell."); - + DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name); - - simdata->using++; - + + simdata->refcount ++; + SIMIX_mutex_lock(simdata->mutex); simdata->compute = SIMIX_action_parallel_execute(task->name, simdata->host_nb, simdata->host_list, simdata->comp_amount, simdata->comm_amount, 1.0, -1.0); - + self->simdata->waiting_task = task; SIMIX_register_action_to_condition(simdata->compute, simdata->cond); - SIMIX_cond_wait(simdata->cond, simdata->mutex); + do { + SIMIX_cond_wait(simdata->cond, simdata->mutex); + state = SIMIX_action_get_state(task->simdata->compute); + } while(state==SURF_ACTION_READY || state==SURF_ACTION_RUNNING); + SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond); self->simdata->waiting_task = NULL; - - + + SIMIX_mutex_unlock(simdata->mutex); - simdata->using--; - - if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) + simdata->refcount --; + + if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) { /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */ SIMIX_action_destroy(task->simdata->compute); @@ -184,16 +193,16 @@ MSG_parallel_task_execute(m_task_t task) simdata->comm = NULL; simdata->compute = NULL; MSG_RETURN(MSG_OK); - } - else if(SIMIX_host_get_state(SIMIX_host_self()) == 0) + } + else if(SIMIX_host_get_state(SIMIX_host_self()) == 0) { /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */ SIMIX_action_destroy(task->simdata->compute); simdata->comm = NULL; simdata->compute = NULL; MSG_RETURN(MSG_HOST_FAILURE); - } - else + } + else { /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */ SIMIX_action_destroy(task->simdata->compute); @@ -212,46 +221,51 @@ MSG_parallel_task_execute(m_task_t task) * * \param nb_sec a number of second */ -MSG_error_t +MSG_error_t MSG_process_sleep(double nb_sec) { smx_action_t act_sleep; m_process_t proc = MSG_process_self(); + e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM; smx_mutex_t mutex; smx_cond_t cond; - + /* create action to sleep */ act_sleep = SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process), nb_sec); - + mutex = SIMIX_mutex_init(); SIMIX_mutex_lock(mutex); - + /* create conditional and register action to it */ cond = SIMIX_cond_init(); - + SIMIX_register_action_to_condition(act_sleep, cond); - SIMIX_cond_wait(cond, mutex); + do { + SIMIX_cond_wait(cond, mutex); + state = SIMIX_action_get_state(act_sleep); + } while(state==SURF_ACTION_READY || state==SURF_ACTION_RUNNING); SIMIX_unregister_action_to_condition(act_sleep, cond); SIMIX_mutex_unlock(mutex); - + /* remove variables */ SIMIX_cond_destroy(cond); SIMIX_mutex_destroy(mutex); - - if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) + + if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) { - if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_CPU_OFF) + if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_CPU_OFF) { SIMIX_action_destroy(act_sleep); MSG_RETURN(MSG_HOST_FAILURE); } - } - else + } + else { SIMIX_action_destroy(act_sleep); MSG_RETURN(MSG_HOST_FAILURE); } - + + SIMIX_action_destroy(act_sleep); MSG_RETURN(MSG_OK); } @@ -259,7 +273,7 @@ MSG_process_sleep(double nb_sec) * \brief Return the number of MSG tasks currently running on * the host of the current running process. */ -static int +static int MSG_get_msgload(void) { xbt_die("not implemented yet"); @@ -283,7 +297,7 @@ MSG_get_msgload(void) * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING if \a *task is not equal to \c NULL, and #MSG_OK otherwise. */ -MSG_error_t +MSG_error_t MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host) { return MSG_task_get_ext(task, channel, -1, host); @@ -303,7 +317,7 @@ MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host) * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING * if \a *task is not equal to \c NULL, and #MSG_OK otherwise. */ -MSG_error_t +MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel) { return MSG_task_get_with_timeout(task, channel, -1); @@ -321,13 +335,13 @@ MSG_task_get(m_task_t * task, m_channel_t channel) listening. This value has to be >=0 and < than the maximal number of channels fixed with MSG_set_channel_number(). * \param max_duration the maximum time to wait for a task before giving - up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task + up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task will not be modified and will still be - equal to \c NULL when returning. + equal to \c NULL when returning. * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING if \a *task is not equal to \c NULL, and #MSG_OK otherwise. */ -MSG_error_t +MSG_error_t MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel, double max_duration) { return MSG_task_get_ext(task, channel, max_duration, NULL); @@ -338,33 +352,33 @@ MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel, double max_durat * by an agent for handling some task. */ -MSG_error_t +MSG_error_t MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,m_host_t host) { xbt_assert1((channel >= 0) && (channel < msg_global->max_channel), "Invalid channel %d",channel); - + return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel(MSG_host_self(), channel), task, host, timeout); } -MSG_error_t +MSG_error_t MSG_task_receive_from_host(m_task_t * task, const char* alias, m_host_t host) { return MSG_task_receive_ext(task, alias, -1, host); } -MSG_error_t +MSG_error_t MSG_task_receive(m_task_t * task, const char* alias) { - return MSG_task_receive_with_timeout(task, alias, -1); + return MSG_task_receive_with_timeout(task, alias, -1); } -MSG_error_t +MSG_error_t MSG_task_receive_with_timeout(m_task_t * task, const char* alias, double timeout) { - return MSG_task_receive_ext(task, alias, timeout, NULL); + return MSG_task_receive_ext(task, alias, timeout, NULL); } -MSG_error_t +MSG_error_t MSG_task_receive_ext(m_task_t* task, const char* alias, double timeout, m_host_t host) { return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, host, timeout); @@ -397,14 +411,14 @@ MSG_task_receive_ext(m_task_t* task, const char* alias, double timeout, m_host_t * #MSG_TRANSFER_FAILURE if the transfer could not be properly done * (network failure, dest failure) */ -MSG_error_t +MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel) { return MSG_task_put_with_timeout(task, dest, channel, -1.0); } /** \ingroup msg_gos_functions - * \brief Does exactly the same as MSG_task_put but with a bounded transmition + * \brief Does exactly the same as MSG_task_put but with a bounded transmition * rate. * * \sa MSG_task_put @@ -437,48 +451,48 @@ MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel, double m task. This value has to be >=0 and < than the maximal number of channels fixed with MSG_set_channel_number(). * \param timeout the maximum time to wait for a task before giving - up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task - will not be modified + up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task + will not be modified * \return #MSG_FATAL if \a task is not properly initialized and #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which this function was called was shut down. Returns #MSG_TRANSFER_FAILURE if the transfer could not be properly done (network failure, dest failure, timeout...) */ -MSG_error_t +MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, m_channel_t channel, double timeout) { xbt_assert1((channel >= 0) && (channel < msg_global->max_channel), "Invalid channel %d", channel); - + return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel(dest, channel), task, timeout); } -MSG_error_t +MSG_error_t MSG_task_send(m_task_t task,const char* alias) { return MSG_task_send_with_timeout(task, alias, -1); } -MSG_error_t +MSG_error_t MSG_task_send_bounded(m_task_t task, const char* alias, double maxrate) { task->simdata->rate = maxrate; return MSG_task_send(task, alias); } - -MSG_error_t + +MSG_error_t MSG_task_send_with_timeout(m_task_t task, const char* alias, double timeout) { return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias), task, timeout); } -int +int MSG_task_listen(const char* alias) { CHECK_HOST(); - + return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias)); } @@ -491,13 +505,13 @@ MSG_task_listen(const char* alias) number of channels fixed with MSG_set_channel_number(). * \return 1 if there is a pending communication and 0 otherwise */ -int +int MSG_task_Iprobe(m_channel_t channel) { xbt_assert1((channel >= 0) && (channel < msg_global->max_channel), "Invalid channel %d", channel); - + CHECK_HOST(); - + return !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel(MSG_host_self(), channel)); } @@ -514,23 +528,23 @@ MSG_task_Iprobe(m_channel_t channel) * \return the number of tasks waiting to be received on \a channel and sent by \a host. */ -int +int MSG_task_probe_from_host(int channel, m_host_t host) { xbt_assert1((channel >= 0) && (channel < msg_global->max_channel), "Invalid channel %d", channel); - + CHECK_HOST(); - - return MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel(MSG_host_self(), channel),host); - + + return MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel(MSG_host_self(), channel),host); + } -int +int MSG_task_listen_from_host(const char* alias, m_host_t host) { CHECK_HOST(); - - return MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias(alias),host); + + return MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias(alias),host); } /** \ingroup msg_gos_functions @@ -542,13 +556,13 @@ MSG_task_listen_from_host(const char* alias, m_host_t host) number of channels fixed with MSG_set_channel_number(). * \return -1 if there is no pending communication and the PID of the process who sent it otherwise */ -int +int MSG_task_probe_from(m_channel_t channel) { m_task_t task; CHECK_HOST(); - + xbt_assert1((channel >= 0) && (channel < msg_global->max_channel), "Invalid channel %d", channel); if(NULL == (task = MSG_mailbox_get_head(MSG_mailbox_get_by_channel(MSG_host_self(), channel)))) @@ -572,8 +586,8 @@ MSG_task_listen_from(const char* alias) /** \ingroup msg_gos_functions * \brief Wait for at most \a max_duration second for a task reception - on \a channel. - + on \a channel. + * \a PID is updated with the PID of the first process that triggered this event if any. * * It takes three parameters: @@ -587,7 +601,7 @@ MSG_task_listen_from(const char* alias) * \return #MSG_HOST_FAILURE if the host is shut down in the meantime and #MSG_OK otherwise. */ -MSG_error_t +MSG_error_t MSG_channel_select_from(m_channel_t channel,double timeout, int *PID) { m_host_t h = NULL; @@ -596,81 +610,81 @@ MSG_channel_select_from(m_channel_t channel,double timeout, int *PID) int first_time = 1; smx_cond_t cond; msg_mailbox_t mailbox; - + xbt_assert1((channel >= 0) && (channel < msg_global->max_channel), "Invalid channel %d",channel); - - if(PID) + + if(PID) { *PID = -1; } - - if (timeout == 0.0) + + if (timeout == 0.0) { *PID = MSG_task_probe_from(channel); MSG_RETURN(MSG_OK); - } - else + } + else { CHECK_HOST(); h = MSG_host_self(); h_simdata = h->simdata; - - mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel); - + + mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel); + while(MSG_mailbox_is_empty(mailbox)) { - if(timeout > 0) + if(timeout > 0) { - if (!first_time) + if (!first_time) { MSG_RETURN(MSG_OK); } } - + SIMIX_mutex_lock(h_simdata->mutex); - + xbt_assert1(!MSG_mailbox_get_cond(mailbox),"A process is already blocked on this channel %d",channel); - + cond = SIMIX_cond_init(); - + MSG_mailbox_set_cond(mailbox, cond); - - if (timeout > 0) + + if (timeout > 0) { SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout); - } - else + } + else { SIMIX_cond_wait(cond, h_simdata->mutex); } - + SIMIX_cond_destroy(cond); SIMIX_mutex_unlock(h_simdata->mutex); - - if (SIMIX_host_get_state(h_simdata->smx_host) == 0) + + if (SIMIX_host_get_state(h_simdata->smx_host) == 0) { MSG_RETURN(MSG_HOST_FAILURE); } - + MSG_mailbox_set_cond(mailbox,NULL); first_time = 0; } - + if(NULL == (t = MSG_mailbox_get_head(mailbox))) MSG_RETURN(MSG_OK); - - - if (PID) + + + if (PID) { *PID = MSG_process_get_PID(t->simdata->sender); } - + MSG_RETURN(MSG_OK); } } -MSG_error_t +MSG_error_t MSG_alias_select_from(const char* alias, double timeout, int* PID) { m_host_t h = NULL; @@ -679,75 +693,75 @@ MSG_alias_select_from(const char* alias, double timeout, int* PID) int first_time = 1; smx_cond_t cond; msg_mailbox_t mailbox; - - if (PID) + + if (PID) { *PID = -1; } - - if(timeout == 0.0) + + if(timeout == 0.0) { *PID = MSG_task_listen_from(alias); MSG_RETURN(MSG_OK); - } - else + } + else { CHECK_HOST(); h = MSG_host_self(); h_simdata = h->simdata; - + DEBUG2("Probing on alias %s (%s)", alias, h->name); - - mailbox = MSG_mailbox_get_by_alias(alias); - + + mailbox = MSG_mailbox_get_by_alias(alias); + while(MSG_mailbox_is_empty(mailbox)) { - if(timeout > 0) + if(timeout > 0) { - if (!first_time) + if (!first_time) { MSG_RETURN(MSG_OK); } } - + SIMIX_mutex_lock(h_simdata->mutex); - + xbt_assert1(!MSG_mailbox_get_cond(mailbox),"A process is already blocked on this alias %s",alias); - + cond = SIMIX_cond_init(); - + MSG_mailbox_set_cond(mailbox, cond); - - if (timeout > 0) + + if (timeout > 0) { SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout); - } - else + } + else { SIMIX_cond_wait(cond, h_simdata->mutex); } - + SIMIX_cond_destroy(cond); SIMIX_mutex_unlock(h_simdata->mutex); - - if (SIMIX_host_get_state(h_simdata->smx_host) == 0) + + if (SIMIX_host_get_state(h_simdata->smx_host) == 0) { MSG_RETURN(MSG_HOST_FAILURE); } - + MSG_mailbox_set_cond(mailbox,NULL); first_time = 0; } - + if(NULL == (t = MSG_mailbox_get_head(mailbox))) MSG_RETURN(MSG_OK); - - - if (PID) + + + if (PID) { *PID = MSG_process_get_PID(t->simdata->sender); } - + MSG_RETURN(MSG_OK); } } @@ -755,4 +769,4 @@ MSG_alias_select_from(const char* alias, double timeout, int* PID) - +