Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add MSG_task_cancel and MSG_task_get_computation_remaining
authoralegrand <alegrand@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 2 Aug 2005 00:04:07 +0000 (00:04 +0000)
committeralegrand <alegrand@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 2 Aug 2005 00:04:07 +0000 (00:04 +0000)
Add comments for parallel tasks

git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@1576 48e7efb5-ca39-0410-a469-dd3cf9ba447f

include/msg/datatypes.h
include/msg/msg.h
src/msg/global.c
src/msg/gos.c
src/msg/task.c

index 0112cbd..9a49c4a 100644 (file)
@@ -118,6 +118,8 @@ typedef enum {
   MSG_HOST_FAILURE, /**< @brief System shutdown. The host on which you are
       running has just been rebooted. Free your datastructures and
       return now !*/
+  MSG_TASK_CANCELLED, /**< @brief Cancelled task. This task has been cancelled 
+                       by somebody!*/
   MSG_FATAL /**< @brief You've done something wrong. You'd better look at it... */
 } MSG_error_t;
 /** @} */
index 94bb434..480d3d8 100644 (file)
@@ -39,6 +39,7 @@ const char *MSG_host_get_name(m_host_t host);
 m_host_t MSG_host_self(void);
 int MSG_get_host_msgload(m_host_t host);
 /* int MSG_get_msgload(void); This function lacks specification; discard it */
+double MSG_get_host_speed(m_host_t h);
 
 void MSG_create_environment(const char *file);
 
@@ -90,7 +91,7 @@ m_task_t MSG_parallel_task_create(const char *name,
 void *MSG_task_get_data(m_task_t task);
 m_process_t MSG_task_get_sender(m_task_t task);
 const char *MSG_task_get_name(m_task_t task);
-
+MSG_error_t MSG_task_cancel(m_task_t task);
 MSG_error_t MSG_task_destroy(m_task_t task);
 
 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel);
@@ -112,6 +113,7 @@ MSG_error_t MSG_process_sleep(double nb_sec);
 MSG_error_t MSG_get_errno(void);
 
 double MSG_task_get_compute_duration(m_task_t task);
+double MSG_task_get_remaining_computation(m_task_t task);
 double MSG_task_get_data_size(m_task_t task);
 
 /************************** Deprecated ***************************************/
index 2848f78..2c19859 100644 (file)
@@ -317,7 +317,7 @@ MSG_error_t MSG_main(void)
   m_process_t process = NULL;
   int nbprocess,i;
   double elapsed_time = 0.0;
-
+  int state_modifications;
   /* Clean IO before the run */
   fflush(stdout);
   fflush(stderr);
@@ -342,19 +342,7 @@ MSG_error_t MSG_main(void)
       xbt_context_schedule(process->simdata->context);
       msg_global->current_process = NULL;
     }
-    DEBUG1("%g : Calling surf_solve",MSG_getClock());
-    elapsed_time = surf_solve();
-    DEBUG1("Elapsed_time %g",elapsed_time);
-
-/*     fprintf(stderr, "====== %g =====\n",Now); */
-/*     if (elapsed_time==0.0) { */
-/*       fprintf(stderr, "No change in time\n"); */
-/*     } */
-    if (elapsed_time<0.0) {
-/*       fprintf(stderr, "We're done %g\n",elapsed_time); */
-      break;
-    }
-
+    
     {
       surf_action_t action = NULL;
       surf_resource_t resource = NULL;
@@ -362,6 +350,24 @@ MSG_error_t MSG_main(void)
 
       void *fun = NULL;
       void *arg = NULL;
+
+      xbt_dynar_foreach(resource_list, i, resource) {
+       if(xbt_swag_size(resource->common_public->states.failed_action_set) ||
+          xbt_swag_size(resource->common_public->states.done_action_set))
+         state_modifications = 1;
+      }
+      
+      if(!state_modifications) {
+       DEBUG1("%g : Calling surf_solve",MSG_getClock());
+       elapsed_time = surf_solve();
+       DEBUG1("Elapsed_time %g",elapsed_time);
+       
+       if (elapsed_time<0.0) {
+         /*       fprintf(stderr, "We're done %g\n",elapsed_time); */
+         break;
+       }
+      }
+
       while (surf_timer_resource->extension_public->get(&fun,(void*)&arg)) {
        DEBUG2("got %p %p", fun, arg);
        if(fun==MSG_process_create_with_arguments) {
@@ -387,8 +393,8 @@ MSG_error_t MSG_main(void)
       
       xbt_dynar_foreach(resource_list, i, resource) {
        while ((action =
-              xbt_swag_extract(resource->common_public->states.
-                               failed_action_set))) {
+               xbt_swag_extract(resource->common_public->states.
+                                failed_action_set))) {
          task = action->data;
          if(task) {
            int _cursor;
@@ -403,8 +409,8 @@ MSG_error_t MSG_main(void)
          }
        }
        while ((action =
-              xbt_swag_extract(resource->common_public->states.
-                               done_action_set))) {
+               xbt_swag_extract(resource->common_public->states.
+                                done_action_set))) {
          task = action->data;
          if(task) {
            int _cursor;
@@ -420,6 +426,7 @@ MSG_error_t MSG_main(void)
        }
       }
     }
+    state_modifications = 0;
   }
 
   if ((nbprocess=xbt_fifo_size(msg_global->process_list)) == 0) {
@@ -482,49 +489,6 @@ MSG_error_t MSG_main(void)
   }
 }
 
-/* static void MarkAsFailed(m_task_t t, TBX_HashTable_t failedProcessList)  */
-/* { */
-/*   simdata_task_t simdata = NULL; */
-/*   xbt_fifo_item_t i = NULL; */
-/*   m_process_t p = NULL; */
-  
-/*   xbt_assert0((t!=NULL),"Invalid task"); */
-/*   simdata = t->simdata; */
-
-/* #define KILL(task) if(task) SG_failTask(task) */
-/*   KILL(simdata->compute); */
-/*   KILL(simdata->TCP_comm); */
-/*   KILL(simdata->s[0]); */
-/*   KILL(simdata->s[1]); */
-/*   KILL(simdata->s[2]); */
-/*   KILL(simdata->s[3]); */
-/*   KILL(simdata->sleep); */
-/* #undef KILL   */
-/* /\*   if(simdata->comm) SG_failEndToEndTransfer(simdata->comm); *\/ */
-  
-/*   xbt_fifo_foreach(simdata->sleeping,i,p,m_process_t) { */
-/*     if(!TBX_HashTable_isInList(failedProcessList,p,TBX_basicHash))  */
-/*       TBX_HashTable_insert(failedProcessList,p,TBX_basicHash); */
-/*   } */
-  
-/* } */
-
-/* static xbt_fifo_t MSG_buildFailedHostList(double begin, double end) */
-/* { */
-/*   xbt_fifo_t failedHostList = xbt_fifo_new(); */
-/*   m_host_t host = NULL; */
-/*   xbt_fifo_item_t i; */
-
-/*   xbt_fifo_foreach(msg_global->host,i,host,m_host_t) { */
-/*     SG_Resource r= ((simdata_host_t) (host->simdata))->host; */
-
-/*     if(SG_evaluateFailureTrace(r->failure_trace,begin,end)!=-1.0) */
-/*       xbt_fifo_insert(failedHostList,host); */
-/*   } */
-
-/*   return failedHostList; */
-/* } */
-
 /** \ingroup msg_simulation
  * \brief Kill all running process
 
index 162ffac..ec7e191 100644 (file)
@@ -427,6 +427,7 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
   if(state == SURF_ACTION_DONE) {
     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
       simdata->compute = NULL;
+    simdata->computation_amount = 0.0;
     MSG_RETURN(MSG_OK);
   } else if(surf_workstation_resource->extension_public->
            get_state(MSG_process_get_host(process)->simdata->host) 
@@ -437,10 +438,28 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
   } else {
     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
       simdata->compute = NULL;
-    MSG_RETURN(MSG_TRANSFER_FAILURE);
+    MSG_RETURN(MSG_TASK_CANCELLED);
   }
 }
-
+/** \ingroup m_task_management
+ * \brief Creates a new #m_task_t (a parallel one....).
+ *
+ * A constructor for #m_task_t taking six arguments and returning the 
+   corresponding object.
+ * \param name a name for the object. It is for user-level information
+   and can be NULL.
+ * \param host_nb the number of hosts implied in the parallel task.
+ * \param host_list an array of #host_nb m_host_t.
+ * \param computation_amount an array of #host_nb
+   doubles. computation_amount[i] is the total number of operations
+   that have to be performed on host_list[i].
+ * \param communication_amount an array of #host_nb*#host_nb doubles.
+ * \param data a pointer to any data may want to attach to the new
+   object.  It is for user-level information and can be NULL. It can
+   be retrieved with the function \ref MSG_task_get_data.
+ * \see m_task_t
+ * \return The new corresponding object.
+ */
 m_task_t MSG_parallel_task_create(const char *name, 
                                  int host_nb,
                                  const m_host_t *host_list,
index 1fe3975..d37ab31 100644 (file)
@@ -139,9 +139,31 @@ MSG_error_t MSG_task_destroy(m_task_t task)
   return MSG_OK;
 }
 
+
+/** \ingroup m_task_management
+ * \brief Cancel a #m_task_t.
+ * \param task the taskt to cancel. If it was executed or transfered, it 
+          stops the process that were working on it.
+ */
+MSG_error_t MSG_task_cancel(m_task_t task)
+{
+  xbt_assert0((task != NULL), "Invalid parameter");
+
+  if(task->simdata->compute) {
+    surf_workstation_resource->common_public->action_cancel(task->simdata->compute);
+    return MSG_OK;
+  }
+  if(task->simdata->comm) {
+    surf_workstation_resource->common_public->action_cancel(task->simdata->comm);
+    return MSG_OK;
+  }
+
+  return MSG_FATAL;
+}
+
 /** \ingroup m_task_management
  * \brief Returns the computation amount needed to process a task #m_task_t.
- *
+ *        Once a task has been processed, this amount is thus set to 0...
  */
 double MSG_task_get_compute_duration(m_task_t task)
 {
@@ -152,6 +174,23 @@ double MSG_task_get_compute_duration(m_task_t task)
   return task->simdata->computation_amount;
 }
 
+/** \ingroup m_task_management
+ * \brief Returns the remaining computation amount of a task #m_task_t.
+ *
+ */
+double MSG_task_get_remaining_computation(m_task_t task)
+{
+  simdata_task_t simdata = NULL;
+
+  xbt_assert0((task != NULL) && (task->simdata != NULL), "Invalid parameter");
+
+  if(task->simdata->compute) {
+    return task->simdata->compute->remains;
+  } else {
+    return task->simdata->computation_amount;
+  }
+}
+
 /** \ingroup m_task_management
  * \brief Returns the size of the data attached to a task #m_task_t.
  *