From 1d093eb0c576e2f7a1d6c7a707ee55026aca3915 Mon Sep 17 00:00:00 2001 From: donassbr Date: Fri, 30 Mar 2007 10:42:08 +0000 Subject: [PATCH 1/1] msg_simix alpha. All functions implemented. Some memory leaks solved. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3366 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/include/simix/datatypes.h | 2 +- src/msg_simix/msg_simix_environment.c | 1 + src/msg_simix/msg_simix_global.c | 16 ++-- src/msg_simix/msg_simix_gos.c | 114 +++++++++++++++++++------- src/msg_simix/msg_simix_private.h | 23 ++---- src/msg_simix/msg_simix_process.c | 33 +++----- src/simix/smx_action.c | 4 +- src/simix/smx_deployment.c | 1 + src/simix/smx_global.c | 3 - src/simix/smx_process.c | 1 + src/simix/smx_synchro.c | 9 +- 11 files changed, 124 insertions(+), 83 deletions(-) diff --git a/src/include/simix/datatypes.h b/src/include/simix/datatypes.h index d6a59cfe43..3d69b6fbfd 100644 --- a/src/include/simix/datatypes.h +++ b/src/include/simix/datatypes.h @@ -67,7 +67,7 @@ typedef struct s_smx_simdata_process *smx_simdata_process_t; /** @brief Process datatype @ingroup m_datatypes_management_details @{ */ typedef struct s_smx_process { - char *name; /**< @brief process name if any */ + char *name; /**< @brief process name if any */ smx_simdata_process_t simdata; /**< @brief simulator data */ s_xbt_swag_hookup_t process_hookup; s_xbt_swag_hookup_t synchro_hookup; diff --git a/src/msg_simix/msg_simix_environment.c b/src/msg_simix/msg_simix_environment.c index e2abf68fe9..69e2a354a2 100644 --- a/src/msg_simix/msg_simix_environment.c +++ b/src/msg_simix/msg_simix_environment.c @@ -59,6 +59,7 @@ void MSG_create_environment(const char *file) for (i=0; i< SIMIX_host_get_number();i++) { __MSG_host_create(workstation[i], NULL); } + xbt_free(workstation); return; } diff --git a/src/msg_simix/msg_simix_global.c b/src/msg_simix/msg_simix_global.c index c549968174..0296d16eed 100644 --- a/src/msg_simix/msg_simix_global.c +++ b/src/msg_simix/msg_simix_global.c @@ -3,7 +3,7 @@ #include "xbt/log.h" #include "xbt/ex.h" /* ex_backtrace_display */ -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_kernel, msg, "Logging specific to MSG (kernel)"); +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_kernel, msg, "Logging specific to MSG (kernel)"); MSG_Global_t msg_global = NULL; @@ -137,29 +137,27 @@ MSG_error_t MSG_main(void) while ( (smx_action = xbt_fifo_pop(actions_failed)) ) { - xbt_fifo_item_t _cursor; DEBUG1("** %s failed **",smx_action->name); - xbt_fifo_foreach(smx_action->cond_list,_cursor,cond,smx_cond_t) { + while ( (cond = xbt_fifo_pop(smx_action->cond_list)) ) { SIMIX_cond_broadcast(cond); - /* remove conditional from action */ - xbt_fifo_remove(smx_action->cond_list,cond); } + /* action finished, destroy it */ SIMIX_action_destroy(smx_action); } while ( (smx_action = xbt_fifo_pop(actions_done)) ) { - xbt_fifo_item_t _cursor; DEBUG1("** %s done **",smx_action->name); - xbt_fifo_foreach(smx_action->cond_list,_cursor,cond,smx_cond_t) { + while ( (cond = xbt_fifo_pop(smx_action->cond_list)) ) { SIMIX_cond_broadcast(cond); - /* remove conditional from action */ - xbt_fifo_remove(smx_action->cond_list,cond); } + /* action finished, destroy it */ SIMIX_action_destroy(smx_action); } } + xbt_fifo_free(actions_failed); + xbt_fifo_free(actions_done); return MSG_OK; } diff --git a/src/msg_simix/msg_simix_gos.c b/src/msg_simix/msg_simix_gos.c index 439609ae6c..429274a2aa 100644 --- a/src/msg_simix/msg_simix_gos.c +++ b/src/msg_simix/msg_simix_gos.c @@ -61,9 +61,7 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task, MSG_RETURN(MSG_TRANSFER_FAILURE); } } - xbt_assert1(!(h_simdata->sleeping[channel]), - "A process is already blocked on channel %d", - channel); + xbt_assert1(!(h_simdata->sleeping[channel]),"A process is already blocked on channel %d", channel); cond = SIMIX_cond_init(); h_simdata->sleeping[channel] = cond; @@ -88,12 +86,10 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task, *task=t; SIMIX_mutex_lock(t_simdata->mutex); - // DEBUG1("OK, Mutex task locked (%s)", t->name); /* Transfer */ t_simdata->using++; /* create SIMIX action to the communication */ - // DEBUG3("Action (%s), Size (%lf), Rate (%lf)", t->name,t_simdata->message_size, t_simdata->rate); t_simdata->comm = SIMIX_action_communicate(t_simdata->sender->simdata->host->simdata->host, process->simdata->host->simdata->host,t->name, t_simdata->message_size, t_simdata->rate); /* @@ -107,17 +103,17 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task, SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond); SIMIX_register_condition_to_action(t_simdata->comm, t_simdata->cond); SIMIX_cond_wait(t_simdata->cond,t_simdata->mutex); - DEBUG1("TASSK %s",t->name ); + /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */ t->simdata->comm = NULL; t->simdata->compute = NULL; SIMIX_mutex_unlock(t_simdata->mutex); - MSG_task_destroy(t); + //MSG_task_destroy(t); MSG_RETURN(MSG_OK); } - + /** \ingroup msg_gos_functions * \brief Listen on a channel and wait for receiving a task. * @@ -195,8 +191,15 @@ MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, */ int MSG_task_Iprobe(m_channel_t channel) { - xbt_die("not implemented yet"); - return 0; + m_host_t h = NULL; + + xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel); + CHECK_HOST(); + + DEBUG2("Probing on channel %d (%s)", channel,h->name); + + h = MSG_host_self(); + return(xbt_fifo_get_first_item(h->simdata->mbox[channel])!=NULL); } /** \ingroup msg_gos_functions @@ -210,8 +213,22 @@ int MSG_task_Iprobe(m_channel_t channel) */ int MSG_task_probe_from(m_channel_t channel) { - xbt_die("not implemented yet"); - return 0; + m_host_t h = NULL; + xbt_fifo_item_t item; + m_task_t t; + + xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel); + CHECK_HOST(); + + h = MSG_host_self(); + + DEBUG2("Probing on channel %d (%s)", channel,h->name); + + item = xbt_fifo_get_first_item(h->simdata->mbox[channel]); + if ( (!item) || (!(t = xbt_fifo_get_item_content(item))) ) + return -1; + + return MSG_process_get_PID(t->simdata->sender); } /** \ingroup msg_gos_functions @@ -231,10 +248,61 @@ int MSG_task_probe_from(m_channel_t channel) and #MSG_OK otherwise. */ MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration, - int *PID) + int *PID) { - xbt_die("not implemented yet"); - MSG_RETURN(MSG_OK); + m_host_t h = NULL; + simdata_host_t h_simdata = NULL; + xbt_fifo_item_t item; + m_task_t t; + int first_time = 1; + smx_cond_t cond; + + xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel); + if(PID) { + *PID = -1; + } + + if(max_duration==0.0) { + *PID = MSG_task_probe_from(channel); + MSG_RETURN(MSG_OK); + } else { + CHECK_HOST(); + h = MSG_host_self(); + h_simdata = h->simdata; + + DEBUG2("Probing on channel %d (%s)", channel,h->name); + while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) { + if(max_duration>0) { + if(!first_time) { + MSG_RETURN(MSG_OK); + } + } + SIMIX_mutex_lock(h_simdata->mutex); + xbt_assert1(!(h_simdata->sleeping[channel]), + "A process is already blocked on this channel %d", channel); + cond = SIMIX_cond_init(); + h_simdata->sleeping[channel] = cond; /* I'm waiting. Wake me up when you're ready */ + if(max_duration>0) { + SIMIX_cond_wait_timeout(cond,h_simdata->mutex, max_duration); + } else { + SIMIX_cond_wait(cond,h_simdata->mutex); + } + SIMIX_cond_destroy(cond); + SIMIX_mutex_unlock(h_simdata->mutex); + if(SIMIX_host_get_state(h_simdata->host)==0) { + MSG_RETURN(MSG_HOST_FAILURE); + } + h_simdata->sleeping[channel] = NULL; + first_time = 0; + } + if (!item || !(t = xbt_fifo_get_item_content(item))) { + MSG_RETURN(MSG_OK); + } + if(PID) { + *PID = MSG_process_get_PID(t->simdata->sender); + } + MSG_RETURN(MSG_OK); + } } @@ -305,7 +373,6 @@ MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, xbt_assert0(task_simdata->using==1, "This taks is still being used somewhere else. You cannot send it now. Go fix your code!"); task_simdata->comm = NULL; - task_simdata->cond = SIMIX_cond_init(); local_host = ((simdata_process_t) process->simdata)->host; remote_host = dest; @@ -320,7 +387,6 @@ MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, if(remote_host->simdata->sleeping[channel]) { DEBUG0("Somebody is listening. Let's wake him up!"); - //__MSG_process_unblock(remote_host->simdata->sleeping[channel]); SIMIX_cond_signal(remote_host->simdata->sleeping[channel]); } SIMIX_mutex_unlock(remote_host->simdata->mutex); @@ -337,6 +403,7 @@ MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, SIMIX_cond_wait(task->simdata->cond,task->simdata->mutex); } DEBUG1("Action terminated %s",task->name); + task->simdata->using--; SIMIX_mutex_unlock(task->simdata->mutex); @@ -427,20 +494,10 @@ MSG_error_t MSG_task_execute(m_task_t task) SIMIX_mutex_unlock(simdata->mutex); simdata->using--; - -// MSG_RETURN(MSG_OK); - return MSG_OK; + MSG_RETURN(MSG_OK); } -void __MSG_task_execute(m_process_t process, m_task_t task) -{ -} - -MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task) -{ - MSG_RETURN(MSG_OK); -} /** \ingroup m_task_management * \brief Creates a new #m_task_t (a parallel one....). * @@ -513,7 +570,6 @@ MSG_error_t MSG_process_sleep(double nb_sec) SIMIX_mutex_unlock(mutex); /* remove variables */ - SIMIX_action_destroy(act_sleep); SIMIX_cond_destroy(cond); SIMIX_mutex_destroy(mutex); diff --git a/src/msg_simix/msg_simix_private.h b/src/msg_simix/msg_simix_private.h index dbc1a6414c..811c1f9820 100644 --- a/src/msg_simix/msg_simix_private.h +++ b/src/msg_simix/msg_simix_private.h @@ -23,9 +23,8 @@ typedef struct simdata_host { smx_host_t host; /* SURF modeling */ xbt_fifo_t *mbox; /* array of FIFOs used as a mailboxes */ - smx_cond_t *sleeping; /* array of process used to know whether a local process is - waiting for a communication on a channel */ - smx_mutex_t mutex; + smx_cond_t *sleeping; /* array of conditions on which the processes sleep if they are waiting for a communication on a channel */ + smx_mutex_t mutex; /* mutex to access the host */ } s_simdata_host_t; /********************************* Task **************************************/ @@ -35,8 +34,8 @@ typedef struct simdata_task { smx_action_t comm; /* SURF modeling of communication */ double message_size; /* Data size */ double computation_amount; /* Computation size */ - smx_cond_t cond; - smx_mutex_t mutex; + smx_cond_t cond; + smx_mutex_t mutex; /* Task mutex */ m_process_t sender; m_host_t source; double priority; @@ -56,9 +55,6 @@ typedef struct simdata_process { smx_process_t smx_process; int PID; /* used for debugging purposes */ int PPID; /* The parent PID */ - //m_task_t waiting_task; - int blocked; - int suspended; m_host_t put_host; /* used for debugging purposes */ m_channel_t put_channel; /* used for debugging purposes */ int argc; /* arguments number if any */ @@ -91,22 +87,15 @@ extern MSG_Global_t msg_global; #define PROCESS_SET_ERRNO(val) (MSG_process_self()->simdata->last_errno=val) #define PROCESS_GET_ERRNO() (MSG_process_self()->simdata->last_errno) -//#define MSG_RETURN(val) do {PROCESS_SET_ERRNO(val);return(val);} while(0) -#define MSG_RETURN(val) do {return(val);} while(0) +#define MSG_RETURN(val) do {PROCESS_SET_ERRNO(val);return(val);} while(0) /* #define CHECK_ERRNO() ASSERT((PROCESS_GET_ERRNO()!=MSG_HOST_FAILURE),"Host failed, you cannot call this function.") */ #define CHECK_HOST() xbt_assert0(SIMIX_host_get_state(SIMIX_host_self())==1,\ "Host failed, you cannot call this function.") m_host_t __MSG_host_create(smx_host_t workstation, void *data); -void __MSG_host_destroy(m_host_t host); -void __MSG_task_execute(m_process_t process, m_task_t task); -MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task); -MSG_error_t __MSG_task_wait_event(m_process_t process, m_task_t task); -int __MSG_process_block(double max_duration, const char *info); -MSG_error_t __MSG_process_unblock(m_process_t process); -int __MSG_process_isBlocked(m_process_t process); +void __MSG_host_destroy(m_host_t host); void __MSG_display_process_status(void); diff --git a/src/msg_simix/msg_simix_process.c b/src/msg_simix/msg_simix_process.c index b2af5acc0d..a40a81acae 100644 --- a/src/msg_simix/msg_simix_process.c +++ b/src/msg_simix/msg_simix_process.c @@ -325,8 +325,11 @@ m_process_t MSG_process_self(void) */ MSG_error_t MSG_process_suspend(m_process_t process) { - xbt_die("not implemented yet"); - return MSG_OK; + xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters"); + CHECK_HOST(); + + SIMIX_process_suspend(process->simdata->smx_process); + MSG_RETURN(MSG_OK); } /** \ingroup m_process_management @@ -337,8 +340,12 @@ MSG_error_t MSG_process_suspend(m_process_t process) */ MSG_error_t MSG_process_resume(m_process_t process) { - xbt_die("not implemented yet"); - MSG_RETURN(MSG_OK); + + xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters"); + CHECK_HOST(); + + SIMIX_process_resume(process->simdata->smx_process); + MSG_RETURN(MSG_OK); } /** \ingroup m_process_management @@ -349,21 +356,7 @@ MSG_error_t MSG_process_resume(m_process_t process) */ int MSG_process_is_suspended(m_process_t process) { - xbt_die("not implemented yet"); - return 0; -} - -int __MSG_process_block(double max_duration, const char *info) -{ - return 1; -} - -MSG_error_t __MSG_process_unblock(m_process_t process) -{ - MSG_RETURN(MSG_OK); + xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters"); + return SIMIX_process_is_suspended(process->simdata->smx_process); } -int __MSG_process_isBlocked(m_process_t process) -{ - return 0; -} diff --git a/src/simix/smx_action.c b/src/simix/smx_action.c index 98e015a970..0bcd505c7e 100644 --- a/src/simix/smx_action.c +++ b/src/simix/smx_action.c @@ -125,13 +125,15 @@ void SIMIX_action_destroy(smx_action_t action) xbt_assert1((xbt_fifo_size(action->cond_list)==0), "Conditional list not empty %d. There is a problem. Cannot destroy it now!", xbt_fifo_size(action->cond_list)); - if(action->name) free(action->name); + if(action->name) xbt_free(action->name); xbt_fifo_free(action->cond_list); if(action->simdata->surf_action) action->simdata->surf_action->resource_type->common_public->action_free(action->simdata->surf_action); + xbt_free(action->simdata); + xbt_free(action); return; } diff --git a/src/simix/smx_deployment.c b/src/simix/smx_deployment.c index 03181ceb81..c109c89c5b 100644 --- a/src/simix/smx_deployment.c +++ b/src/simix/smx_deployment.c @@ -79,6 +79,7 @@ static void parse_process_finalize(void) else surf_timer_resource->extension_public->set(kill_time, (void*) &SIMIX_process_kill, (void*) process); } + xbt_free(parse_host); } } diff --git a/src/simix/smx_global.c b/src/simix/smx_global.c index 55d01e6c4b..3a46a495c5 100644 --- a/src/simix/smx_global.c +++ b/src/simix/smx_global.c @@ -261,9 +261,6 @@ double SIMIX_solve(xbt_fifo_t actions_done, xbt_fifo_t actions_failed) xbt_dynar_foreach(resource_list, i, resource) { if(xbt_swag_size(resource->common_public->states.failed_action_set) || xbt_swag_size(resource->common_public->states.done_action_set)) { - DEBUG2("SWAG SIZES %d %d\n",xbt_swag_size(resource->common_public->states.failed_action_set), - xbt_swag_size(resource->common_public->states.done_action_set)); - state_modifications = 1; } } diff --git a/src/simix/smx_process.c b/src/simix/smx_process.c index 9d5949a76b..18eb439fe4 100644 --- a/src/simix/smx_process.c +++ b/src/simix/smx_process.c @@ -44,6 +44,7 @@ void SIMIX_process_cleanup(void *arg) xbt_swag_remove(arg, ((smx_process_t) arg)->simdata->host->simdata->process_list); free(((smx_process_t) arg)->name); ((smx_process_t) arg)->name = NULL; + free(((smx_process_t) arg)->simdata); ((smx_process_t) arg)->simdata = NULL; free(arg); diff --git a/src/simix/smx_synchro.c b/src/simix/smx_synchro.c index dd084b1f3e..218c6057bb 100644 --- a/src/simix/smx_synchro.c +++ b/src/simix/smx_synchro.c @@ -135,8 +135,13 @@ void SIMIX_cond_wait(smx_cond_t cond,smx_mutex_t mutex) act_sleep = SIMIX_action_sleep(SIMIX_host_self(), -1); SIMIX_register_action_to_condition(act_sleep,cond); SIMIX_register_condition_to_action(act_sleep,cond); + __SIMIX_cond_wait(cond); + xbt_fifo_pop(act_sleep->cond_list); + SIMIX_action_destroy(act_sleep); + } + else { + __SIMIX_cond_wait(cond); } - __SIMIX_cond_wait(cond); /* get the mutex again */ self->simdata->mutex = cond->mutex; SIMIX_mutex_lock(cond->mutex); @@ -201,12 +206,10 @@ void SIMIX_cond_broadcast(smx_cond_t cond) void SIMIX_cond_destroy(smx_cond_t cond) { - if ( cond == NULL ) return ; else { xbt_assert0( xbt_swag_size(cond->sleeping) == 0 , "Cannot destroy conditional"); - xbt_swag_free(cond->sleeping); xbt_fifo_free(cond->actions); xbt_free(cond); -- 2.20.1