Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Additionnal checks for Henri ;)
[simgrid.git] / src / msg / gos.c
index 37d82bb..ae67680 100644 (file)
@@ -5,9 +5,9 @@
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
-#include"private.h"
-#include"xbt/sysdep.h"
-#include "xbt/error.h"
+#include "private.h"
+#include "xbt/sysdep.h"
+#include "xbt/log.h"
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
                                "Logging specific to MSG (gos)");
 
@@ -16,16 +16,6 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
  *  by an agent for handling some task.
  */
 
-/** \ingroup msg_gos_functions
- * \brief This function is now deprecated and useless. Please stop using it.
- */
-MSG_error_t MSG_process_start(m_process_t process)
-{
-  xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
-  
-  return MSG_OK;
-}
-
 /** \ingroup msg_gos_functions
  * \brief Listen on a channel and wait for receiving a task.
  *
@@ -57,7 +47,7 @@ MSG_error_t MSG_task_get(m_task_t * task,
  * \param channel the channel on which the agent should be
    listening. This value has to be >=0 and < than the maximal
    number of channels fixed with MSG_set_channel_number().
- * \param timeout the maximum time to wait for a task before giving
+ * \param max_duration the maximum time to wait for a task before giving
     up. In such a case, \a task will not be modified and will still be
     equal to \c NULL when returning.
  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
@@ -73,11 +63,11 @@ MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
   m_host_t h = NULL;
   simdata_task_t t_simdata = NULL;
   simdata_host_t h_simdata = NULL;
-  int warning = 0;
   int first_time = 1;
   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
   
   CHECK_HOST();
+  xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
   /* Sanity check */
   xbt_assert0(task,"Null pointer for the task\n");
 
@@ -175,11 +165,12 @@ int MSG_task_Iprobe(m_channel_t channel)
   m_host_t h = NULL;
   simdata_host_t h_simdata = NULL;
 
+  xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
   DEBUG2("Probing on channel %d (%s)", channel,h->name);
   CHECK_HOST();
   h = MSG_host_self();
   h_simdata = h->simdata;
-  return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
+  return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
 }
 
 /** \ingroup msg_gos_functions
@@ -198,13 +189,14 @@ int MSG_task_probe_from(m_channel_t channel)
   xbt_fifo_item_t item;
   m_task_t t;
 
+  xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
   CHECK_HOST();
   h = MSG_host_self();
   h_simdata = h->simdata;
 
   DEBUG2("Probing on channel %d (%s)", channel,h->name);
    
-  item = xbt_fifo_getFirstItem(h->simdata->mbox[channel]);
+  item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
   if (!item || !(t = xbt_fifo_get_item_content(item)))
     return -1;
    
@@ -221,6 +213,7 @@ MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
   int first_time = 1;
   m_process_t process = MSG_process_self();
 
+  xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
   if(PID) {
     *PID = -1;
   }
@@ -233,7 +226,7 @@ MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
     h_simdata = h->simdata;
     
     DEBUG2("Probing on channel %d (%s)", channel,h->name);
-    while((item = xbt_fifo_getFirstItem(h->simdata->mbox[channel]))) {
+    while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
       if(max_duration>0) {
        if(!first_time) {
          MSG_RETURN(MSG_OK);
@@ -299,6 +292,8 @@ MSG_error_t MSG_task_put(m_task_t task,
 
   CHECK_HOST();
 
+  xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
+
   task_simdata = task->simdata;
   task_simdata->sender = process;
   xbt_assert0(task_simdata->using==1,"Gargl!");
@@ -307,7 +302,7 @@ MSG_error_t MSG_task_put(m_task_t task,
   local_host = ((simdata_process_t) process->simdata)->host;
   remote_host = dest;
 
-  DEBUG4("Trying to send a task (%lg Mb) from %s to %s on channel %d", 
+  DEBUG4("Trying to send a task (%g Mb) from %s to %s on channel %d", 
         task->simdata->message_size,local_host->name, remote_host->name, channel);
 
   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
@@ -335,22 +330,24 @@ MSG_error_t MSG_task_put(m_task_t task,
     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
   }
     
-  MSG_task_destroy(task);
 
   PAJE_PROCESS_POP_STATE(process);  
 
   if(state == SURF_ACTION_DONE) {
     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
       task_simdata->comm = NULL;
+    MSG_task_destroy(task);
     MSG_RETURN(MSG_OK);
   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
            == SURF_CPU_OFF) {
     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
       task_simdata->comm = NULL;
+    MSG_task_destroy(task);
     MSG_RETURN(MSG_HOST_FAILURE);
   } else { 
     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
       task_simdata->comm = NULL;
+    MSG_task_destroy(task);
     MSG_RETURN(MSG_TRANSFER_FAILURE);
   }
 }
@@ -365,9 +362,11 @@ MSG_error_t MSG_task_put_bounded(m_task_t task,
                                 m_host_t dest, m_channel_t channel,
                                 double max_rate)
 {
+  MSG_error_t res = MSG_OK;
   task->simdata->rate=max_rate;
-  return(MSG_task_put(task, dest, channel));
+  res = MSG_task_put(task, dest, channel);
   task->simdata->rate=-1.0;
+  return(res);
 }
 
 /** \ingroup msg_gos_functions
@@ -406,6 +405,9 @@ void __MSG_task_execute(m_process_t process, m_task_t task)
   simdata->compute = surf_workstation_resource->extension_public->
     execute(MSG_process_get_host(process)->simdata->host,
            simdata->computation_amount);
+  surf_workstation_resource->common_public->
+    set_priority(simdata->compute, simdata->priority);
+
   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
 }
 
@@ -425,6 +427,7 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
   if(state == SURF_ACTION_DONE) {
     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
       simdata->compute = NULL;
+    simdata->computation_amount = 0.0;
     MSG_RETURN(MSG_OK);
   } else if(surf_workstation_resource->extension_public->
            get_state(MSG_process_get_host(process)->simdata->host) 
@@ -435,9 +438,100 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
   } else {
     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
       simdata->compute = NULL;
-    MSG_RETURN(MSG_TRANSFER_FAILURE);
+    MSG_RETURN(MSG_TASK_CANCELLED);
   }
 }
+/** \ingroup m_task_management
+ * \brief Creates a new #m_task_t (a parallel one....).
+ *
+ * A constructor for #m_task_t taking six arguments and returning the 
+   corresponding object.
+ * \param name a name for the object. It is for user-level information
+   and can be NULL.
+ * \param host_nb the number of hosts implied in the parallel task.
+ * \param host_list an array of #host_nb m_host_t.
+ * \param computation_amount an array of #host_nb
+   doubles. computation_amount[i] is the total number of operations
+   that have to be performed on host_list[i].
+ * \param communication_amount an array of #host_nb*#host_nb doubles.
+ * \param data a pointer to any data may want to attach to the new
+   object.  It is for user-level information and can be NULL. It can
+   be retrieved with the function \ref MSG_task_get_data.
+ * \see m_task_t
+ * \return The new corresponding object.
+ */
+m_task_t MSG_parallel_task_create(const char *name, 
+                                 int host_nb,
+                                 const m_host_t *host_list,
+                                 double *computation_amount,
+                                 double *communication_amount,
+                                 void *data)
+{
+  simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
+  m_task_t task = xbt_new0(s_m_task_t,1);
+  int i;
+
+  /* Task structure */
+  task->name = xbt_strdup(name);
+  task->simdata = simdata;
+  task->data = data;
+
+  /* Simulator Data */
+  simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
+  simdata->rate = -1.0;
+  simdata->using = 1;
+  simdata->sender = NULL;
+  simdata->host_nb = host_nb;
+  
+  simdata->host_list = xbt_new0(void *, host_nb);
+  simdata->comp_amount = computation_amount;
+  simdata->comm_amount = communication_amount;
+
+  for(i=0;i<host_nb;i++)
+    simdata->host_list[i] = host_list[i]->simdata->host;
+
+  return task;
+}
+
+
+static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
+{
+  simdata_task_t simdata = NULL;
+
+  CHECK_HOST();
+
+  simdata = task->simdata;
+
+  xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
+
+  simdata->compute = surf_workstation_resource->extension_public->
+  execute_parallel_task(task->simdata->host_nb,
+                       task->simdata->host_list,
+                       task->simdata->comp_amount,
+                       task->simdata->comm_amount,
+                       1.0,
+                       -1.0);
+  if(simdata->compute)
+    surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
+}
+
+MSG_error_t MSG_parallel_task_execute(m_task_t task)
+{
+  m_process_t process = MSG_process_self();
+  MSG_error_t res;
+
+  DEBUG0("Computing on a tons of guys");
+  
+  __MSG_parallel_task_execute(process, task);
+
+  if(task->simdata->compute)
+    res = __MSG_wait_for_computation(process,task);
+  else 
+    res = MSG_OK;
+
+  return res;  
+}
+
 
 /** \ingroup msg_gos_functions
  * \brief Sleep for the specified number of seconds