m_host_t MSG_host_self(void);
int MSG_get_host_msgload(m_host_t host);
/* int MSG_get_msgload(void); This function lacks specification; discard it */
+double MSG_get_host_speed(m_host_t h);
void MSG_create_environment(const char *file);
void *MSG_task_get_data(m_task_t task);
m_process_t MSG_task_get_sender(m_task_t task);
const char *MSG_task_get_name(m_task_t task);
-
+MSG_error_t MSG_task_cancel(m_task_t task);
MSG_error_t MSG_task_destroy(m_task_t task);
MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel);
MSG_error_t MSG_get_errno(void);
double MSG_task_get_compute_duration(m_task_t task);
+double MSG_task_get_remaining_computation(m_task_t task);
double MSG_task_get_data_size(m_task_t task);
/************************** Deprecated ***************************************/
m_process_t process = NULL;
int nbprocess,i;
double elapsed_time = 0.0;
-
+ int state_modifications;
/* Clean IO before the run */
fflush(stdout);
fflush(stderr);
xbt_context_schedule(process->simdata->context);
msg_global->current_process = NULL;
}
- DEBUG1("%g : Calling surf_solve",MSG_getClock());
- elapsed_time = surf_solve();
- DEBUG1("Elapsed_time %g",elapsed_time);
-
-/* fprintf(stderr, "====== %g =====\n",Now); */
-/* if (elapsed_time==0.0) { */
-/* fprintf(stderr, "No change in time\n"); */
-/* } */
- if (elapsed_time<0.0) {
-/* fprintf(stderr, "We're done %g\n",elapsed_time); */
- break;
- }
-
+
{
surf_action_t action = NULL;
surf_resource_t resource = NULL;
void *fun = NULL;
void *arg = NULL;
+
+ xbt_dynar_foreach(resource_list, i, resource) {
+ if(xbt_swag_size(resource->common_public->states.failed_action_set) ||
+ xbt_swag_size(resource->common_public->states.done_action_set))
+ state_modifications = 1;
+ }
+
+ if(!state_modifications) {
+ DEBUG1("%g : Calling surf_solve",MSG_getClock());
+ elapsed_time = surf_solve();
+ DEBUG1("Elapsed_time %g",elapsed_time);
+
+ if (elapsed_time<0.0) {
+ /* fprintf(stderr, "We're done %g\n",elapsed_time); */
+ break;
+ }
+ }
+
while (surf_timer_resource->extension_public->get(&fun,(void*)&arg)) {
DEBUG2("got %p %p", fun, arg);
if(fun==MSG_process_create_with_arguments) {
xbt_dynar_foreach(resource_list, i, resource) {
while ((action =
- xbt_swag_extract(resource->common_public->states.
- failed_action_set))) {
+ xbt_swag_extract(resource->common_public->states.
+ failed_action_set))) {
task = action->data;
if(task) {
int _cursor;
}
}
while ((action =
- xbt_swag_extract(resource->common_public->states.
- done_action_set))) {
+ xbt_swag_extract(resource->common_public->states.
+ done_action_set))) {
task = action->data;
if(task) {
int _cursor;
}
}
}
+ state_modifications = 0;
}
if ((nbprocess=xbt_fifo_size(msg_global->process_list)) == 0) {
}
}
-/* static void MarkAsFailed(m_task_t t, TBX_HashTable_t failedProcessList) */
-/* { */
-/* simdata_task_t simdata = NULL; */
-/* xbt_fifo_item_t i = NULL; */
-/* m_process_t p = NULL; */
-
-/* xbt_assert0((t!=NULL),"Invalid task"); */
-/* simdata = t->simdata; */
-
-/* #define KILL(task) if(task) SG_failTask(task) */
-/* KILL(simdata->compute); */
-/* KILL(simdata->TCP_comm); */
-/* KILL(simdata->s[0]); */
-/* KILL(simdata->s[1]); */
-/* KILL(simdata->s[2]); */
-/* KILL(simdata->s[3]); */
-/* KILL(simdata->sleep); */
-/* #undef KILL */
-/* /\* if(simdata->comm) SG_failEndToEndTransfer(simdata->comm); *\/ */
-
-/* xbt_fifo_foreach(simdata->sleeping,i,p,m_process_t) { */
-/* if(!TBX_HashTable_isInList(failedProcessList,p,TBX_basicHash)) */
-/* TBX_HashTable_insert(failedProcessList,p,TBX_basicHash); */
-/* } */
-
-/* } */
-
-/* static xbt_fifo_t MSG_buildFailedHostList(double begin, double end) */
-/* { */
-/* xbt_fifo_t failedHostList = xbt_fifo_new(); */
-/* m_host_t host = NULL; */
-/* xbt_fifo_item_t i; */
-
-/* xbt_fifo_foreach(msg_global->host,i,host,m_host_t) { */
-/* SG_Resource r= ((simdata_host_t) (host->simdata))->host; */
-
-/* if(SG_evaluateFailureTrace(r->failure_trace,begin,end)!=-1.0) */
-/* xbt_fifo_insert(failedHostList,host); */
-/* } */
-
-/* return failedHostList; */
-/* } */
-
/** \ingroup msg_simulation
* \brief Kill all running process
if(state == SURF_ACTION_DONE) {
if(surf_workstation_resource->common_public->action_free(simdata->compute))
simdata->compute = NULL;
+ simdata->computation_amount = 0.0;
MSG_RETURN(MSG_OK);
} else if(surf_workstation_resource->extension_public->
get_state(MSG_process_get_host(process)->simdata->host)
} else {
if(surf_workstation_resource->common_public->action_free(simdata->compute))
simdata->compute = NULL;
- MSG_RETURN(MSG_TRANSFER_FAILURE);
+ MSG_RETURN(MSG_TASK_CANCELLED);
}
}
-
+/** \ingroup m_task_management
+ * \brief Creates a new #m_task_t (a parallel one....).
+ *
+ * A constructor for #m_task_t taking six arguments and returning the
+ corresponding object.
+ * \param name a name for the object. It is for user-level information
+ and can be NULL.
+ * \param host_nb the number of hosts implied in the parallel task.
+ * \param host_list an array of #host_nb m_host_t.
+ * \param computation_amount an array of #host_nb
+ doubles. computation_amount[i] is the total number of operations
+ that have to be performed on host_list[i].
+ * \param communication_amount an array of #host_nb*#host_nb doubles.
+ * \param data a pointer to any data may want to attach to the new
+ object. It is for user-level information and can be NULL. It can
+ be retrieved with the function \ref MSG_task_get_data.
+ * \see m_task_t
+ * \return The new corresponding object.
+ */
m_task_t MSG_parallel_task_create(const char *name,
int host_nb,
const m_host_t *host_list,
return MSG_OK;
}
+
+/** \ingroup m_task_management
+ * \brief Cancel a #m_task_t.
+ * \param task the taskt to cancel. If it was executed or transfered, it
+ stops the process that were working on it.
+ */
+MSG_error_t MSG_task_cancel(m_task_t task)
+{
+ xbt_assert0((task != NULL), "Invalid parameter");
+
+ if(task->simdata->compute) {
+ surf_workstation_resource->common_public->action_cancel(task->simdata->compute);
+ return MSG_OK;
+ }
+ if(task->simdata->comm) {
+ surf_workstation_resource->common_public->action_cancel(task->simdata->comm);
+ return MSG_OK;
+ }
+
+ return MSG_FATAL;
+}
+
/** \ingroup m_task_management
* \brief Returns the computation amount needed to process a task #m_task_t.
- *
+ * Once a task has been processed, this amount is thus set to 0...
*/
double MSG_task_get_compute_duration(m_task_t task)
{
return task->simdata->computation_amount;
}
+/** \ingroup m_task_management
+ * \brief Returns the remaining computation amount of a task #m_task_t.
+ *
+ */
+double MSG_task_get_remaining_computation(m_task_t task)
+{
+ simdata_task_t simdata = NULL;
+
+ xbt_assert0((task != NULL) && (task->simdata != NULL), "Invalid parameter");
+
+ if(task->simdata->compute) {
+ return task->simdata->compute->remains;
+ } else {
+ return task->simdata->computation_amount;
+ }
+}
+
/** \ingroup m_task_management
* \brief Returns the size of the data attached to a task #m_task_t.
*