X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/6f04e042bc53b073be4592ad529adb45ac59b777..0039674dc4260ce2203ff4b35a012ea3a1c7a22a:/src/msg/gos.c diff --git a/src/msg/gos.c b/src/msg/gos.c index 73967cdcf2..b4ac1d913a 100644 --- a/src/msg/gos.c +++ b/src/msg/gos.c @@ -42,7 +42,7 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task, && (channel < msg_global->max_channel), "Invalid channel %d", channel); /* Sanity check */ - xbt_assert0(task, "Null pointer for the task\n"); + xbt_assert0(task, "Null pointer for the task storage"); if (*task) CRITICAL0 @@ -85,12 +85,12 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task, cond = SIMIX_cond_init(); h_simdata->sleeping[channel] = cond; - if (max_duration > 0) { + if (max_duration > 0) SIMIX_cond_wait_timeout(cond, h->simdata->mutex, max_duration); - } else + else SIMIX_cond_wait(h_simdata->sleeping[channel], h->simdata->mutex); - if (SIMIX_host_get_state(h_simdata->s_host) == 0) + if (SIMIX_host_get_state(h_simdata->smx_host) == 0) MSG_RETURN(MSG_HOST_FAILURE); first_time = 0; @@ -111,12 +111,11 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task, SIMIX_mutex_lock(t_simdata->mutex); /* Transfer */ - t_simdata->using++; /* create SIMIX action to the communication */ t_simdata->comm = SIMIX_action_communicate(t_simdata->sender->simdata->m_host-> - simdata->s_host, - process->simdata->m_host->simdata->s_host, + simdata->smx_host, + process->simdata->m_host->simdata->smx_host, t->name, t_simdata->message_size, t_simdata->rate); /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */ @@ -126,7 +125,12 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task, } process->simdata->waiting_task = t; SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond); - SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex); + while (1) { + SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex); + if (SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING) + break; + } + SIMIX_unregister_action_to_condition(t_simdata->comm, t_simdata->cond); process->simdata->waiting_task = NULL; /* the task has already finished and the pointer must be null */ @@ -145,16 +149,19 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task, //t_simdata->comm = NULL; SIMIX_action_destroy(t_simdata->comm); t_simdata->comm = NULL; + t_simdata->using--; MSG_RETURN(MSG_OK); - } else if (SIMIX_host_get_state(h_simdata->s_host) == 0) { + } else if (SIMIX_host_get_state(h_simdata->smx_host) == 0) { //t_simdata->comm = NULL; SIMIX_action_destroy(t_simdata->comm); t_simdata->comm = NULL; + t_simdata->using--; MSG_RETURN(MSG_HOST_FAILURE); } else { //t_simdata->comm = NULL; SIMIX_action_destroy(t_simdata->comm); t_simdata->comm = NULL; + t_simdata->using--; MSG_RETURN(MSG_TRANSFER_FAILURE); } @@ -342,7 +349,7 @@ MSG_error_t MSG_channel_select_from(m_channel_t channel, } SIMIX_cond_destroy(cond); SIMIX_mutex_unlock(h_simdata->mutex); - if (SIMIX_host_get_state(h_simdata->s_host) == 0) { + if (SIMIX_host_get_state(h_simdata->smx_host) == 0) { MSG_RETURN(MSG_HOST_FAILURE); } h_simdata->sleeping[channel] = NULL; @@ -447,6 +454,7 @@ MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, "This task is still being used somewhere else. You cannot send it now. Go fix your code!"); task_simdata->comm = NULL; + task_simdata->using++; local_host = ((simdata_process_t) process->simdata)->m_host; remote_host = dest; @@ -472,27 +480,48 @@ MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, process->simdata->waiting_task = task; if (max_duration > 0) { - SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex, - max_duration); - /* verify if the timeout happened and the communication didn't started yet */ - if (task->simdata->comm == NULL) { - task->simdata->using--; - process->simdata->waiting_task = NULL; - xbt_fifo_remove(((simdata_host_t) remote_host->simdata)-> - mbox[channel], task); - if (task->simdata->receiver) { - task->simdata->receiver->simdata->waiting_task = NULL; + xbt_ex_t e; + double time; + double time_elapsed; + time = SIMIX_get_clock(); + TRY { + /*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */ + while (1) { + time_elapsed = SIMIX_get_clock() - time; + SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex, + max_duration-time_elapsed); + if ((task->simdata->comm != NULL) && + (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING)) + break; + } + } CATCH(e) { + if(e.category==timeout_error) { + xbt_ex_free(e); + /* verify if the timeout happened and the communication didn't started yet */ + if (task->simdata->comm == NULL) { + process->simdata->waiting_task = NULL; + xbt_fifo_remove(((simdata_host_t) remote_host->simdata)-> + mbox[channel], task); + if (task->simdata->receiver) { + task->simdata->receiver->simdata->waiting_task = NULL; + } + task->simdata->sender = NULL; + SIMIX_mutex_unlock(task->simdata->mutex); + MSG_RETURN(MSG_TRANSFER_FAILURE); + } + } else { + RETHROW; } - task->simdata->sender = NULL; - SIMIX_mutex_unlock(task->simdata->mutex); - MSG_RETURN(MSG_TRANSFER_FAILURE); } } else { - SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex); + while (1) { + SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex); + if (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING) + break; + } } DEBUG1("Action terminated %s", task->name); - task->simdata->using--; process->simdata->waiting_task = NULL; /* the task has already finished and the pointer must be null */ if (task->simdata->receiver) { @@ -506,7 +535,7 @@ MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE) { MSG_RETURN(MSG_OK); - } else if (SIMIX_host_get_state(local_host->simdata->s_host) == 0) { + } else if (SIMIX_host_get_state(local_host->simdata->smx_host) == 0) { MSG_RETURN(MSG_HOST_FAILURE); } else { MSG_RETURN(MSG_TRANSFER_FAILURE); @@ -591,6 +620,7 @@ MSG_error_t MSG_task_execute(m_task_t task) self->simdata->waiting_task = task; SIMIX_register_action_to_condition(simdata->compute, simdata->cond); SIMIX_cond_wait(simdata->cond, simdata->mutex); + SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond); self->simdata->waiting_task = NULL; SIMIX_mutex_unlock(simdata->mutex); @@ -667,12 +697,12 @@ m_task_t MSG_parallel_task_create(const char *name, simdata->source = NULL; simdata->host_nb = host_nb; - simdata->host_list = xbt_new0(void *, host_nb); + simdata->host_list = xbt_new0(smx_host_t, host_nb); simdata->comp_amount = computation_amount; simdata->comm_amount = communication_amount; for (i = 0; i < host_nb; i++) - simdata->host_list[i] = host_list[i]->simdata->s_host; + simdata->host_list[i] = host_list[i]->simdata->smx_host; return task; @@ -704,6 +734,7 @@ MSG_error_t MSG_parallel_task_execute(m_task_t task) self->simdata->waiting_task = task; SIMIX_register_action_to_condition(simdata->compute, simdata->cond); SIMIX_cond_wait(simdata->cond, simdata->mutex); + SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond); self->simdata->waiting_task = NULL; @@ -759,6 +790,7 @@ MSG_error_t MSG_process_sleep(double nb_sec) SIMIX_register_action_to_condition(act_sleep, cond); SIMIX_cond_wait(cond, mutex); + SIMIX_unregister_action_to_condition(act_sleep, cond); SIMIX_mutex_unlock(mutex); /* remove variables */