From: alegrand Date: Tue, 2 Aug 2005 00:04:07 +0000 (+0000) Subject: Add MSG_task_cancel and MSG_task_get_computation_remaining X-Git-Tag: v3.3~3780 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/193edb1b7e0828fb6c6daa5dfac260468bb41e96 Add MSG_task_cancel and MSG_task_get_computation_remaining Add comments for parallel tasks git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@1576 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/include/msg/datatypes.h b/include/msg/datatypes.h index 0112cbdc7d..9a49c4a2cc 100644 --- a/include/msg/datatypes.h +++ b/include/msg/datatypes.h @@ -118,6 +118,8 @@ typedef enum { MSG_HOST_FAILURE, /**< @brief System shutdown. The host on which you are running has just been rebooted. Free your datastructures and return now !*/ + MSG_TASK_CANCELLED, /**< @brief Cancelled task. This task has been cancelled + by somebody!*/ MSG_FATAL /**< @brief You've done something wrong. You'd better look at it... */ } MSG_error_t; /** @} */ diff --git a/include/msg/msg.h b/include/msg/msg.h index 94bb434594..480d3d8cb4 100644 --- a/include/msg/msg.h +++ b/include/msg/msg.h @@ -39,6 +39,7 @@ const char *MSG_host_get_name(m_host_t host); m_host_t MSG_host_self(void); int MSG_get_host_msgload(m_host_t host); /* int MSG_get_msgload(void); This function lacks specification; discard it */ +double MSG_get_host_speed(m_host_t h); void MSG_create_environment(const char *file); @@ -90,7 +91,7 @@ m_task_t MSG_parallel_task_create(const char *name, void *MSG_task_get_data(m_task_t task); m_process_t MSG_task_get_sender(m_task_t task); const char *MSG_task_get_name(m_task_t task); - +MSG_error_t MSG_task_cancel(m_task_t task); MSG_error_t MSG_task_destroy(m_task_t task); MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel); @@ -112,6 +113,7 @@ MSG_error_t MSG_process_sleep(double nb_sec); MSG_error_t MSG_get_errno(void); double MSG_task_get_compute_duration(m_task_t task); +double MSG_task_get_remaining_computation(m_task_t task); double MSG_task_get_data_size(m_task_t task); /************************** Deprecated ***************************************/ diff --git a/src/msg/global.c b/src/msg/global.c index 2848f7880e..2c1985900c 100644 --- a/src/msg/global.c +++ b/src/msg/global.c @@ -317,7 +317,7 @@ MSG_error_t MSG_main(void) m_process_t process = NULL; int nbprocess,i; double elapsed_time = 0.0; - + int state_modifications; /* Clean IO before the run */ fflush(stdout); fflush(stderr); @@ -342,19 +342,7 @@ MSG_error_t MSG_main(void) xbt_context_schedule(process->simdata->context); msg_global->current_process = NULL; } - DEBUG1("%g : Calling surf_solve",MSG_getClock()); - elapsed_time = surf_solve(); - DEBUG1("Elapsed_time %g",elapsed_time); - -/* fprintf(stderr, "====== %g =====\n",Now); */ -/* if (elapsed_time==0.0) { */ -/* fprintf(stderr, "No change in time\n"); */ -/* } */ - if (elapsed_time<0.0) { -/* fprintf(stderr, "We're done %g\n",elapsed_time); */ - break; - } - + { surf_action_t action = NULL; surf_resource_t resource = NULL; @@ -362,6 +350,24 @@ MSG_error_t MSG_main(void) void *fun = NULL; void *arg = NULL; + + xbt_dynar_foreach(resource_list, i, resource) { + if(xbt_swag_size(resource->common_public->states.failed_action_set) || + xbt_swag_size(resource->common_public->states.done_action_set)) + state_modifications = 1; + } + + if(!state_modifications) { + DEBUG1("%g : Calling surf_solve",MSG_getClock()); + elapsed_time = surf_solve(); + DEBUG1("Elapsed_time %g",elapsed_time); + + if (elapsed_time<0.0) { + /* fprintf(stderr, "We're done %g\n",elapsed_time); */ + break; + } + } + while (surf_timer_resource->extension_public->get(&fun,(void*)&arg)) { DEBUG2("got %p %p", fun, arg); if(fun==MSG_process_create_with_arguments) { @@ -387,8 +393,8 @@ MSG_error_t MSG_main(void) xbt_dynar_foreach(resource_list, i, resource) { while ((action = - xbt_swag_extract(resource->common_public->states. - failed_action_set))) { + xbt_swag_extract(resource->common_public->states. + failed_action_set))) { task = action->data; if(task) { int _cursor; @@ -403,8 +409,8 @@ MSG_error_t MSG_main(void) } } while ((action = - xbt_swag_extract(resource->common_public->states. - done_action_set))) { + xbt_swag_extract(resource->common_public->states. + done_action_set))) { task = action->data; if(task) { int _cursor; @@ -420,6 +426,7 @@ MSG_error_t MSG_main(void) } } } + state_modifications = 0; } if ((nbprocess=xbt_fifo_size(msg_global->process_list)) == 0) { @@ -482,49 +489,6 @@ MSG_error_t MSG_main(void) } } -/* static void MarkAsFailed(m_task_t t, TBX_HashTable_t failedProcessList) */ -/* { */ -/* simdata_task_t simdata = NULL; */ -/* xbt_fifo_item_t i = NULL; */ -/* m_process_t p = NULL; */ - -/* xbt_assert0((t!=NULL),"Invalid task"); */ -/* simdata = t->simdata; */ - -/* #define KILL(task) if(task) SG_failTask(task) */ -/* KILL(simdata->compute); */ -/* KILL(simdata->TCP_comm); */ -/* KILL(simdata->s[0]); */ -/* KILL(simdata->s[1]); */ -/* KILL(simdata->s[2]); */ -/* KILL(simdata->s[3]); */ -/* KILL(simdata->sleep); */ -/* #undef KILL */ -/* /\* if(simdata->comm) SG_failEndToEndTransfer(simdata->comm); *\/ */ - -/* xbt_fifo_foreach(simdata->sleeping,i,p,m_process_t) { */ -/* if(!TBX_HashTable_isInList(failedProcessList,p,TBX_basicHash)) */ -/* TBX_HashTable_insert(failedProcessList,p,TBX_basicHash); */ -/* } */ - -/* } */ - -/* static xbt_fifo_t MSG_buildFailedHostList(double begin, double end) */ -/* { */ -/* xbt_fifo_t failedHostList = xbt_fifo_new(); */ -/* m_host_t host = NULL; */ -/* xbt_fifo_item_t i; */ - -/* xbt_fifo_foreach(msg_global->host,i,host,m_host_t) { */ -/* SG_Resource r= ((simdata_host_t) (host->simdata))->host; */ - -/* if(SG_evaluateFailureTrace(r->failure_trace,begin,end)!=-1.0) */ -/* xbt_fifo_insert(failedHostList,host); */ -/* } */ - -/* return failedHostList; */ -/* } */ - /** \ingroup msg_simulation * \brief Kill all running process diff --git a/src/msg/gos.c b/src/msg/gos.c index 162ffac703..ec7e191794 100644 --- a/src/msg/gos.c +++ b/src/msg/gos.c @@ -427,6 +427,7 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task) if(state == SURF_ACTION_DONE) { if(surf_workstation_resource->common_public->action_free(simdata->compute)) simdata->compute = NULL; + simdata->computation_amount = 0.0; MSG_RETURN(MSG_OK); } else if(surf_workstation_resource->extension_public-> get_state(MSG_process_get_host(process)->simdata->host) @@ -437,10 +438,28 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task) } else { if(surf_workstation_resource->common_public->action_free(simdata->compute)) simdata->compute = NULL; - MSG_RETURN(MSG_TRANSFER_FAILURE); + MSG_RETURN(MSG_TASK_CANCELLED); } } - +/** \ingroup m_task_management + * \brief Creates a new #m_task_t (a parallel one....). + * + * A constructor for #m_task_t taking six arguments and returning the + corresponding object. + * \param name a name for the object. It is for user-level information + and can be NULL. + * \param host_nb the number of hosts implied in the parallel task. + * \param host_list an array of #host_nb m_host_t. + * \param computation_amount an array of #host_nb + doubles. computation_amount[i] is the total number of operations + that have to be performed on host_list[i]. + * \param communication_amount an array of #host_nb*#host_nb doubles. + * \param data a pointer to any data may want to attach to the new + object. It is for user-level information and can be NULL. It can + be retrieved with the function \ref MSG_task_get_data. + * \see m_task_t + * \return The new corresponding object. + */ m_task_t MSG_parallel_task_create(const char *name, int host_nb, const m_host_t *host_list, diff --git a/src/msg/task.c b/src/msg/task.c index 1fe39757dc..d37ab3142f 100644 --- a/src/msg/task.c +++ b/src/msg/task.c @@ -139,9 +139,31 @@ MSG_error_t MSG_task_destroy(m_task_t task) return MSG_OK; } + +/** \ingroup m_task_management + * \brief Cancel a #m_task_t. + * \param task the taskt to cancel. If it was executed or transfered, it + stops the process that were working on it. + */ +MSG_error_t MSG_task_cancel(m_task_t task) +{ + xbt_assert0((task != NULL), "Invalid parameter"); + + if(task->simdata->compute) { + surf_workstation_resource->common_public->action_cancel(task->simdata->compute); + return MSG_OK; + } + if(task->simdata->comm) { + surf_workstation_resource->common_public->action_cancel(task->simdata->comm); + return MSG_OK; + } + + return MSG_FATAL; +} + /** \ingroup m_task_management * \brief Returns the computation amount needed to process a task #m_task_t. - * + * Once a task has been processed, this amount is thus set to 0... */ double MSG_task_get_compute_duration(m_task_t task) { @@ -152,6 +174,23 @@ double MSG_task_get_compute_duration(m_task_t task) return task->simdata->computation_amount; } +/** \ingroup m_task_management + * \brief Returns the remaining computation amount of a task #m_task_t. + * + */ +double MSG_task_get_remaining_computation(m_task_t task) +{ + simdata_task_t simdata = NULL; + + xbt_assert0((task != NULL) && (task->simdata != NULL), "Invalid parameter"); + + if(task->simdata->compute) { + return task->simdata->compute->remains; + } else { + return task->simdata->computation_amount; + } +} + /** \ingroup m_task_management * \brief Returns the size of the data attached to a task #m_task_t. *