X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/07b23e956e96e9a73b3acbbf4639b3d464604a44..1d093eb0c576e2f7a1d6c7a707ee55026aca3915:/src/msg_simix/msg_simix_gos.c diff --git a/src/msg_simix/msg_simix_gos.c b/src/msg_simix/msg_simix_gos.c index 439609ae6c..429274a2aa 100644 --- a/src/msg_simix/msg_simix_gos.c +++ b/src/msg_simix/msg_simix_gos.c @@ -61,9 +61,7 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task, MSG_RETURN(MSG_TRANSFER_FAILURE); } } - xbt_assert1(!(h_simdata->sleeping[channel]), - "A process is already blocked on channel %d", - channel); + xbt_assert1(!(h_simdata->sleeping[channel]),"A process is already blocked on channel %d", channel); cond = SIMIX_cond_init(); h_simdata->sleeping[channel] = cond; @@ -88,12 +86,10 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task, *task=t; SIMIX_mutex_lock(t_simdata->mutex); - // DEBUG1("OK, Mutex task locked (%s)", t->name); /* Transfer */ t_simdata->using++; /* create SIMIX action to the communication */ - // DEBUG3("Action (%s), Size (%lf), Rate (%lf)", t->name,t_simdata->message_size, t_simdata->rate); t_simdata->comm = SIMIX_action_communicate(t_simdata->sender->simdata->host->simdata->host, process->simdata->host->simdata->host,t->name, t_simdata->message_size, t_simdata->rate); /* @@ -107,17 +103,17 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task, SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond); SIMIX_register_condition_to_action(t_simdata->comm, t_simdata->cond); SIMIX_cond_wait(t_simdata->cond,t_simdata->mutex); - DEBUG1("TASSK %s",t->name ); + /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */ t->simdata->comm = NULL; t->simdata->compute = NULL; SIMIX_mutex_unlock(t_simdata->mutex); - MSG_task_destroy(t); + //MSG_task_destroy(t); MSG_RETURN(MSG_OK); } - + /** \ingroup msg_gos_functions * \brief Listen on a channel and wait for receiving a task. * @@ -195,8 +191,15 @@ MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, */ int MSG_task_Iprobe(m_channel_t channel) { - xbt_die("not implemented yet"); - return 0; + m_host_t h = NULL; + + xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel); + CHECK_HOST(); + + DEBUG2("Probing on channel %d (%s)", channel,h->name); + + h = MSG_host_self(); + return(xbt_fifo_get_first_item(h->simdata->mbox[channel])!=NULL); } /** \ingroup msg_gos_functions @@ -210,8 +213,22 @@ int MSG_task_Iprobe(m_channel_t channel) */ int MSG_task_probe_from(m_channel_t channel) { - xbt_die("not implemented yet"); - return 0; + m_host_t h = NULL; + xbt_fifo_item_t item; + m_task_t t; + + xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel); + CHECK_HOST(); + + h = MSG_host_self(); + + DEBUG2("Probing on channel %d (%s)", channel,h->name); + + item = xbt_fifo_get_first_item(h->simdata->mbox[channel]); + if ( (!item) || (!(t = xbt_fifo_get_item_content(item))) ) + return -1; + + return MSG_process_get_PID(t->simdata->sender); } /** \ingroup msg_gos_functions @@ -231,10 +248,61 @@ int MSG_task_probe_from(m_channel_t channel) and #MSG_OK otherwise. */ MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration, - int *PID) + int *PID) { - xbt_die("not implemented yet"); - MSG_RETURN(MSG_OK); + m_host_t h = NULL; + simdata_host_t h_simdata = NULL; + xbt_fifo_item_t item; + m_task_t t; + int first_time = 1; + smx_cond_t cond; + + xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel); + if(PID) { + *PID = -1; + } + + if(max_duration==0.0) { + *PID = MSG_task_probe_from(channel); + MSG_RETURN(MSG_OK); + } else { + CHECK_HOST(); + h = MSG_host_self(); + h_simdata = h->simdata; + + DEBUG2("Probing on channel %d (%s)", channel,h->name); + while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) { + if(max_duration>0) { + if(!first_time) { + MSG_RETURN(MSG_OK); + } + } + SIMIX_mutex_lock(h_simdata->mutex); + xbt_assert1(!(h_simdata->sleeping[channel]), + "A process is already blocked on this channel %d", channel); + cond = SIMIX_cond_init(); + h_simdata->sleeping[channel] = cond; /* I'm waiting. Wake me up when you're ready */ + if(max_duration>0) { + SIMIX_cond_wait_timeout(cond,h_simdata->mutex, max_duration); + } else { + SIMIX_cond_wait(cond,h_simdata->mutex); + } + SIMIX_cond_destroy(cond); + SIMIX_mutex_unlock(h_simdata->mutex); + if(SIMIX_host_get_state(h_simdata->host)==0) { + MSG_RETURN(MSG_HOST_FAILURE); + } + h_simdata->sleeping[channel] = NULL; + first_time = 0; + } + if (!item || !(t = xbt_fifo_get_item_content(item))) { + MSG_RETURN(MSG_OK); + } + if(PID) { + *PID = MSG_process_get_PID(t->simdata->sender); + } + MSG_RETURN(MSG_OK); + } } @@ -305,7 +373,6 @@ MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, xbt_assert0(task_simdata->using==1, "This taks is still being used somewhere else. You cannot send it now. Go fix your code!"); task_simdata->comm = NULL; - task_simdata->cond = SIMIX_cond_init(); local_host = ((simdata_process_t) process->simdata)->host; remote_host = dest; @@ -320,7 +387,6 @@ MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, if(remote_host->simdata->sleeping[channel]) { DEBUG0("Somebody is listening. Let's wake him up!"); - //__MSG_process_unblock(remote_host->simdata->sleeping[channel]); SIMIX_cond_signal(remote_host->simdata->sleeping[channel]); } SIMIX_mutex_unlock(remote_host->simdata->mutex); @@ -337,6 +403,7 @@ MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, SIMIX_cond_wait(task->simdata->cond,task->simdata->mutex); } DEBUG1("Action terminated %s",task->name); + task->simdata->using--; SIMIX_mutex_unlock(task->simdata->mutex); @@ -427,20 +494,10 @@ MSG_error_t MSG_task_execute(m_task_t task) SIMIX_mutex_unlock(simdata->mutex); simdata->using--; - -// MSG_RETURN(MSG_OK); - return MSG_OK; + MSG_RETURN(MSG_OK); } -void __MSG_task_execute(m_process_t process, m_task_t task) -{ -} - -MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task) -{ - MSG_RETURN(MSG_OK); -} /** \ingroup m_task_management * \brief Creates a new #m_task_t (a parallel one....). * @@ -513,7 +570,6 @@ MSG_error_t MSG_process_sleep(double nb_sec) SIMIX_mutex_unlock(mutex); /* remove variables */ - SIMIX_action_destroy(act_sleep); SIMIX_cond_destroy(cond); SIMIX_mutex_destroy(mutex);