Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add a test in MSG_task_execute to stop whenever a task is being executed on two diffe...
[simgrid.git] / src / msg / gos.c
index 17f0326..a223cec 100644 (file)
@@ -5,48 +5,33 @@
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
-#include"private.h"
-#include"xbt/sysdep.h"
-#include "xbt/error.h"
+#include "private.h"
+#include "xbt/sysdep.h"
+#include "xbt/log.h"
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
                                "Logging specific to MSG (gos)");
 
-/** \ingroup msg_gos_functions
- * \brief This function is now deprecated and useless. Please stop using it.
+/** \defgroup msg_gos_functions MSG Operating System Functions
+ *  \brief This section describes the functions that can be used
+ *  by an agent for handling some task.
  */
-MSG_error_t MSG_process_start(m_process_t process)
-{
-  xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
-  
-  return MSG_OK;
-}
 
-/** \ingroup msg_gos_functions
- * \brief Listen on a channel and wait for receiving a task.
- *
- * It takes two parameter.
- * \param task a memory location for storing a #m_task_t. It will
-   hold a task when this function will return. Thus \a task should not
-   be equal to \c NULL and \a *task should be equal to \c NULL. If one of
-   those two condition does not hold, there will be a warning message.
- * \param channel the channel on which the agent should be
-   listening. This value has to be >=0 and < than the maximal
-   number of channels fixed with MSG_set_channel_number().
- * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
- * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
- */
-MSG_error_t MSG_task_get(m_task_t * task,
-                        m_channel_t channel)
+static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
+                                                       m_channel_t channel,
+                                                       double max_duration,
+                                                       m_host_t host)
 {
   m_process_t process = MSG_process_self();
   m_task_t t = NULL;
   m_host_t h = NULL;
   simdata_task_t t_simdata = NULL;
   simdata_host_t h_simdata = NULL;
-  int warning = 0;
+  int first_time = 1;
   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
-  
+  xbt_fifo_item_t item = NULL;
+
   CHECK_HOST();
+  xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
   /* Sanity check */
   xbt_assert0(task,"Null pointer for the task\n");
 
@@ -56,20 +41,50 @@ MSG_error_t MSG_task_get(m_task_t * task,
   /* Get the task */
   h = MSG_host_self();
   h_simdata = h->simdata;
-  while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
+
+  DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
+
+  while (1) {
+    if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
+      if(!host) {
+       t = xbt_fifo_shift(h_simdata->mbox[channel]);
+       break;
+      } else {
+       xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
+         if(t->simdata->source==host) break;
+       }
+       if(item) {
+         xbt_fifo_remove_item(h->simdata->mbox[channel],item);
+         break;
+       } 
+      }
+    }
+                                                      
+    if(max_duration>0) {
+      if(!first_time) {
+       MSG_RETURN(MSG_OK);
+      }
+    }
     xbt_assert2(!(h_simdata->sleeping[channel]),
                "A process (%s(%d)) is already blocked on this channel",
                h_simdata->sleeping[channel]->name,
                h_simdata->sleeping[channel]->simdata->PID);
     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
-    __MSG_process_block();
+    if(max_duration>0) {
+      __MSG_process_block(max_duration);
+    } else {
+      __MSG_process_block(-1);
+    }
+    h_simdata->sleeping[channel] = NULL;
+    first_time = 0;
     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
        == SURF_CPU_OFF)
       MSG_RETURN(MSG_HOST_FAILURE);
-    h_simdata->sleeping[channel] = NULL;
     /* OK, we should both be ready now. Are you there ? */
   }
 
+  DEBUG1("OK, got a task (%s)", t->name);
+
   t_simdata = t->simdata;
   /*   *task = __MSG_task_copy(t); */
   *task=t;
@@ -77,34 +92,134 @@ MSG_error_t MSG_task_get(m_task_t * task,
   /* Transfer */
   t_simdata->using++;
 
+  while(MSG_process_is_suspended(t_simdata->sender)) {
+    DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him", 
+          t_simdata->sender->name);
+    m_task_t task_to_wait_for = t_simdata->sender->simdata->waiting_task;
+    if(__MSG_process_isBlocked(t_simdata->sender)) {
+      DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
+      __MSG_process_unblock(t_simdata->sender);
+      task_to_wait_for->simdata->using++;
+      __MSG_task_wait_event(process, task_to_wait_for);
+      MSG_task_destroy(task_to_wait_for);
+    } else {
+      DEBUG0("He's suspended. Let's wait for him to go in the resumed state");
+      task_to_wait_for->simdata->using++;
+      __MSG_task_wait_event(process, task_to_wait_for);
+      MSG_task_destroy(task_to_wait_for);
+      DEBUG0("He's resumed. He should block again. So let's free him.");
+      __MSG_process_unblock(t_simdata->sender);
+      break;
+    }
+  }
+  DEBUG0("Calling SURF for communication creation");
   t_simdata->comm = surf_workstation_resource->extension_public->
     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
                h->simdata->host, t_simdata->message_size,t_simdata->rate);
   
   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
 
-  if(__MSG_process_isBlocked(t_simdata->sender)) 
+  if(__MSG_process_isBlocked(t_simdata->sender)) {
+    DEBUG1("Unblocking %s",t_simdata->sender->name);
     __MSG_process_unblock(t_simdata->sender);
+  }
 
-  PAJE_PROCESS_STATE(process,"C");  
+  PAJE_PROCESS_PUSH_STATE(process,"C");  
 
   do {
+    DEBUG0("Waiting for action termination");
     __MSG_task_wait_event(process, t);
     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
   } while (state==SURF_ACTION_RUNNING);
+  DEBUG0("Action terminated");
 
   if(t->simdata->using>1) {
     xbt_fifo_unshift(msg_global->process_to_run,process);
     xbt_context_yield();
   }
 
+  PAJE_PROCESS_POP_STATE(process);
   PAJE_COMM_STOP(process,t,channel);
 
-  if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
-  else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
-         == SURF_CPU_OFF)
+  if(state == SURF_ACTION_DONE) {
+    if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
+      t_simdata->comm = NULL;
+    MSG_RETURN(MSG_OK);
+  } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
+         == SURF_CPU_OFF) {
+    if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
+      t_simdata->comm = NULL;
     MSG_RETURN(MSG_HOST_FAILURE);
-  else MSG_RETURN(MSG_TRANSFER_FAILURE);
+  } else {
+    if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
+      t_simdata->comm = NULL;
+    MSG_RETURN(MSG_TRANSFER_FAILURE);
+  }
+}
+
+/** \ingroup msg_gos_functions
+ * \brief Listen on a channel and wait for receiving a task.
+ *
+ * It takes two parameters.
+ * \param task a memory location for storing a #m_task_t. It will
+   hold a task when this function will return. Thus \a task should not
+   be equal to \c NULL and \a *task should be equal to \c NULL. If one of
+   those two condition does not hold, there will be a warning message.
+ * \param channel the channel on which the agent should be
+   listening. This value has to be >=0 and < than the maximal
+   number of channels fixed with MSG_set_channel_number().
+ * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
+ * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
+ */
+MSG_error_t MSG_task_get(m_task_t * task,
+                        m_channel_t channel)
+{
+  return MSG_task_get_with_time_out(task, channel, -1);
+}
+
+/** \ingroup msg_gos_functions
+ * \brief Listen on a channel and wait for receiving a task with a timeout.
+ *
+ * It takes three parameters.
+ * \param task a memory location for storing a #m_task_t. It will
+   hold a task when this function will return. Thus \a task should not
+   be equal to \c NULL and \a *task should be equal to \c NULL. If one of
+   those two condition does not hold, there will be a warning message.
+ * \param channel the channel on which the agent should be
+   listening. This value has to be >=0 and < than the maximal
+   number of channels fixed with MSG_set_channel_number().
+ * \param max_duration the maximum time to wait for a task before giving
+    up. In such a case, \a task will not be modified and will still be
+    equal to \c NULL when returning.
+ * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
+   if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
+ */
+MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
+                                      m_channel_t channel,
+                                      double max_duration)
+{
+  return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
+}
+
+/** \ingroup msg_gos_functions
+ * \brief Listen on \a channel and waits for receiving a task from \a host.
+ *
+ * It takes three parameters.
+ * \param task a memory location for storing a #m_task_t. It will
+   hold a task when this function will return. Thus \a task should not
+   be equal to \c NULL and \a *task should be equal to \c NULL. If one of
+   those two condition does not hold, there will be a warning message.
+ * \param channel the channel on which the agent should be
+   listening. This value has to be >=0 and < than the maximal
+   number of channels fixed with MSG_set_channel_number().
+ * \param host the host that is to be watched.
+ * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
+   if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
+ */
+MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
+                                  m_host_t host)
+{
+  return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
 }
 
 /** \ingroup msg_gos_functions
@@ -121,10 +236,12 @@ int MSG_task_Iprobe(m_channel_t channel)
   m_host_t h = NULL;
   simdata_host_t h_simdata = NULL;
 
+  xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
+  DEBUG2("Probing on channel %d (%s)", channel,h->name);
   CHECK_HOST();
   h = MSG_host_self();
   h_simdata = h->simdata;
-  return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
+  return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
 }
 
 /** \ingroup msg_gos_functions
@@ -143,15 +260,127 @@ int MSG_task_probe_from(m_channel_t channel)
   xbt_fifo_item_t item;
   m_task_t t;
 
+  xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
   CHECK_HOST();
   h = MSG_host_self();
   h_simdata = h->simdata;
+
+  DEBUG2("Probing on channel %d (%s)", channel,h->name);
    
-  item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
-  if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
+  item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
+  if (!item || !(t = xbt_fifo_get_item_content(item)))
     return -1;
    
-  return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
+  return MSG_process_get_PID(t->simdata->sender);
+}
+
+/** \ingroup msg_gos_functions
+ * \brief Wait for at most \a max_duration second for a task reception
+   on \a channel. *\a PID is updated with the PID of the first process
+   that triggered this event if any.
+ *
+ * It takes three parameters:
+ * \param channel the channel on which the agent should be
+   listening. This value has to be >=0 and < than the maximal.
+   number of channels fixed with MSG_set_channel_number().
+ * \param PID a memory location for storing an int.
+ * \param max_duration the maximum time to wait for a task before
+    giving up. In the case of a reception, *\a PID will be updated
+    with the PID of the first process to send a task.
+ * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
+   and #MSG_OK otherwise.
+ */
+MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
+                                   int *PID)
+{
+  m_host_t h = NULL;
+  simdata_host_t h_simdata = NULL;
+  xbt_fifo_item_t item;
+  m_task_t t;
+  int first_time = 1;
+  m_process_t process = MSG_process_self();
+
+  xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
+  if(PID) {
+    *PID = -1;
+  }
+
+  if(max_duration==0.0) {
+    *PID = MSG_task_probe_from(channel);
+    MSG_RETURN(MSG_OK);
+  } else {
+    CHECK_HOST();
+    h = MSG_host_self();
+    h_simdata = h->simdata;
+    
+    DEBUG2("Probing on channel %d (%s)", channel,h->name);
+    while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
+      if(max_duration>0) {
+       if(!first_time) {
+         MSG_RETURN(MSG_OK);
+       }
+      }
+      xbt_assert2(!(h_simdata->sleeping[channel]),
+                 "A process (%s(%d)) is already blocked on this channel",
+                 h_simdata->sleeping[channel]->name,
+                 h_simdata->sleeping[channel]->simdata->PID);
+      h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
+      if(max_duration>0) {
+       __MSG_process_block(max_duration);
+      } else {
+       __MSG_process_block(-1);
+      }
+      if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
+        == SURF_CPU_OFF) {
+       MSG_RETURN(MSG_HOST_FAILURE);
+      }
+      h_simdata->sleeping[channel] = NULL;
+      first_time = 0;
+    }
+    if (!item || !(t = xbt_fifo_get_item_content(item))) {
+      MSG_RETURN(MSG_OK);
+    }
+    if(PID) {
+      *PID = MSG_process_get_PID(t->simdata->sender);
+    }
+    MSG_RETURN(MSG_OK);
+  }
+}
+
+
+/** \ingroup msg_gos_functions
+
+ * \brief Return the number of tasks waiting to be received on a \a
+   channel and sent by \a host.
+ *
+ * It takes two parameters.
+ * \param channel the channel on which the agent should be
+   listening. This value has to be >=0 and < than the maximal
+   number of channels fixed with MSG_set_channel_number().
+ * \param host the host that is to be watched.
+ * \return the number of tasks waiting to be received on \a channel
+   and sent by \a host.
+ */
+int MSG_task_probe_from_host(int channel, m_host_t host)
+{
+  simdata_host_t h_simdata = NULL;
+  xbt_fifo_item_t item;
+  m_task_t t;
+  int count = 0;
+  m_host_t h = NULL;
+  
+  xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
+  CHECK_HOST();
+  h = MSG_host_self();
+  h_simdata = h->simdata;
+
+  DEBUG2("Probing on channel %d (%s)", channel,h->name);
+   
+  xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
+    if(t->simdata->source==host) count++;
+  }
+   
+  return count;
 }
 
 /** \ingroup msg_gos_functions
@@ -188,45 +417,73 @@ MSG_error_t MSG_task_put(m_task_t task,
 
   CHECK_HOST();
 
+  xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
+
   task_simdata = task->simdata;
   task_simdata->sender = process;
-  xbt_assert0(task_simdata->using==1,"Gargl!");
+  task_simdata->source = MSG_process_get_host(process);
+  xbt_assert0(task_simdata->using==1,
+             "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
   task_simdata->comm = NULL;
   
   local_host = ((simdata_process_t) process->simdata)->host;
   remote_host = dest;
 
+  DEBUG4("Trying to send a task (%g Mb) from %s to %s on channel %d", 
+        task->simdata->message_size,local_host->name, remote_host->name, channel);
+
   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
                mbox[channel], task);
 
   PAJE_COMM_START(process,task,channel);
     
-  if(remote_host->simdata->sleeping[channel]) 
+  if(remote_host->simdata->sleeping[channel]) {
+    DEBUG0("Somebody is listening. Let's wake him up!");
     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
-/*   else { */
+  }
+
   process->simdata->put_host = dest;
   process->simdata->put_channel = channel;
-  while(!(task_simdata->comm)) 
-    __MSG_process_block();
+  while(!(task_simdata->comm)) {
+    DEBUG0("Communication not initiated yet. Let's block!");
+    __MSG_process_block(-1);
+  }
+  DEBUG0("Registering to this communication");
+  surf_workstation_resource->common_public->action_use(task_simdata->comm);
   process->simdata->put_host = NULL;
   process->simdata->put_channel = -1;
-/*   } */
 
-  PAJE_PROCESS_STATE(process,"C");  
+
+  PAJE_PROCESS_PUSH_STATE(process,"C");  
 
   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
   while (state==SURF_ACTION_RUNNING) {
+    DEBUG0("Waiting for action termination");
     __MSG_task_wait_event(process, task);
     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
   }
-    
-  MSG_task_destroy(task);
+  DEBUG0("Action terminated");
+  task->simdata->rate=-1.0; /* Sets the rate back to default */
 
-  if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
-  else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
-         == SURF_CPU_OFF)
+  PAJE_PROCESS_POP_STATE(process);  
+
+  if(state == SURF_ACTION_DONE) {
+    if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
+      task_simdata->comm = NULL;
+    MSG_task_destroy(task);
+    MSG_RETURN(MSG_OK);
+  } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
+           == SURF_CPU_OFF) {
+    if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
+      task_simdata->comm = NULL;
+    MSG_task_destroy(task);
     MSG_RETURN(MSG_HOST_FAILURE);
-  else MSG_RETURN(MSG_TRANSFER_FAILURE);
+  } else { 
+    if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
+      task_simdata->comm = NULL;
+    MSG_task_destroy(task);
+    MSG_RETURN(MSG_TRANSFER_FAILURE);
+  }
 }
 
 /** \ingroup msg_gos_functions
@@ -237,11 +494,12 @@ MSG_error_t MSG_task_put(m_task_t task,
  */
 MSG_error_t MSG_task_put_bounded(m_task_t task,
                                 m_host_t dest, m_channel_t channel,
-                                long double max_rate)
+                                double max_rate)
 {
+  MSG_error_t res = MSG_OK;
   task->simdata->rate=max_rate;
-  return(MSG_task_put(task, dest, channel));
-  task->simdata->rate=-1.0;
+  res = MSG_task_put(task, dest, channel);
+  return(res);
 }
 
 /** \ingroup msg_gos_functions
@@ -257,10 +515,16 @@ MSG_error_t MSG_task_put_bounded(m_task_t task,
 MSG_error_t MSG_task_execute(m_task_t task)
 {
   m_process_t process = MSG_process_self();
+  MSG_error_t res;
+
+  DEBUG1("Computing on %s", process->simdata->host->name);
 
   __MSG_task_execute(process, task);
-  PAJE_PROCESS_STATE(process,"E");  
-  return __MSG_wait_for_computation(process,task);
+
+  PAJE_PROCESS_PUSH_STATE(process,"E");  
+  res = __MSG_wait_for_computation(process,task);
+  PAJE_PROCESS_POP_STATE(process);
+  return res;
 }
 
 void __MSG_task_execute(m_process_t process, m_task_t task)
@@ -270,10 +534,14 @@ void __MSG_task_execute(m_process_t process, m_task_t task)
   CHECK_HOST();
 
   simdata = task->simdata;
+  xbt_assert0(!simdata->compute,"This taks is executed somewhere else. Go fix your code!");
 
   simdata->compute = surf_workstation_resource->extension_public->
     execute(MSG_process_get_host(process)->simdata->host,
            simdata->computation_amount);
+  surf_workstation_resource->common_public->
+    set_priority(simdata->compute, simdata->priority);
+
   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
 }
 
@@ -282,6 +550,7 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
   simdata_task_t simdata = task->simdata;
 
+  XBT_IN4("(%p(%s) %p(%s))",process,process->name,task,task->name);
   simdata->using++;
   do {
     __MSG_task_wait_event(process, task);
@@ -290,14 +559,119 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
   simdata->using--;
     
 
-  if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
-  else if(surf_workstation_resource->extension_public->
-         get_state(MSG_process_get_host(process)->simdata->host) 
-         == SURF_CPU_OFF)
+  if(state == SURF_ACTION_DONE) {
+    if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+      simdata->compute = NULL;
+    simdata->computation_amount = 0.0;
+    XBT_OUT;
+    MSG_RETURN(MSG_OK);
+  } else if(surf_workstation_resource->extension_public->
+           get_state(MSG_process_get_host(process)->simdata->host) 
+           == SURF_CPU_OFF) {
+    if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+      simdata->compute = NULL;
+    XBT_OUT;
     MSG_RETURN(MSG_HOST_FAILURE);
-  else MSG_RETURN(MSG_TRANSFER_FAILURE);
+  } else {
+    if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+      simdata->compute = NULL;
+    XBT_OUT;
+    MSG_RETURN(MSG_TASK_CANCELLED);
+  }
+}
+/** \ingroup m_task_management
+ * \brief Creates a new #m_task_t (a parallel one....).
+ *
+ * A constructor for #m_task_t taking six arguments and returning the 
+   corresponding object.
+ * \param name a name for the object. It is for user-level information
+   and can be NULL.
+ * \param host_nb the number of hosts implied in the parallel task.
+ * \param host_list an array of \p host_nb m_host_t.
+ * \param computation_amount an array of \p host_nb
+   doubles. computation_amount[i] is the total number of operations
+   that have to be performed on host_list[i].
+ * \param communication_amount an array of \p host_nb* \p host_nb doubles.
+ * \param data a pointer to any data may want to attach to the new
+   object.  It is for user-level information and can be NULL. It can
+   be retrieved with the function \ref MSG_task_get_data.
+ * \see m_task_t
+ * \return The new corresponding object.
+ */
+m_task_t MSG_parallel_task_create(const char *name, 
+                                 int host_nb,
+                                 const m_host_t *host_list,
+                                 double *computation_amount,
+                                 double *communication_amount,
+                                 void *data)
+{
+  simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
+  m_task_t task = xbt_new0(s_m_task_t,1);
+  int i;
+
+  /* Task structure */
+  task->name = xbt_strdup(name);
+  task->simdata = simdata;
+  task->data = data;
+
+  /* Simulator Data */
+  simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
+  simdata->rate = -1.0;
+  simdata->using = 1;
+  simdata->sender = NULL;
+  simdata->source = NULL;
+  simdata->host_nb = host_nb;
+  
+  simdata->host_list = xbt_new0(void *, host_nb);
+  simdata->comp_amount = computation_amount;
+  simdata->comm_amount = communication_amount;
+
+  for(i=0;i<host_nb;i++)
+    simdata->host_list[i] = host_list[i]->simdata->host;
+
+  return task;
+}
+
+
+static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
+{
+  simdata_task_t simdata = NULL;
+
+  CHECK_HOST();
+
+  simdata = task->simdata;
+
+  xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
+
+  simdata->compute = surf_workstation_resource->extension_public->
+  execute_parallel_task(task->simdata->host_nb,
+                       task->simdata->host_list,
+                       task->simdata->comp_amount,
+                       task->simdata->comm_amount,
+                       1.0,
+                       -1.0);
+  if(simdata->compute)
+    surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
+}
+
+MSG_error_t MSG_parallel_task_execute(m_task_t task)
+{
+  m_process_t process = MSG_process_self();
+  MSG_error_t res;
+
+  DEBUG0("Computing on a tons of guys");
+  
+  __MSG_parallel_task_execute(process, task);
+
+  if(task->simdata->compute)
+    res = __MSG_wait_for_computation(process,task);
+  else 
+    res = MSG_OK;
+
+  return res;  
 }
 
+
 /** \ingroup msg_gos_functions
  * \brief Sleep for the specified number of seconds
  *
@@ -305,7 +679,7 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
  *
  * \param nb_sec a number of second
  */
-MSG_error_t MSG_process_sleep(long double nb_sec)
+MSG_error_t MSG_process_sleep(double nb_sec)
 {
   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
   m_process_t process = MSG_process_self();
@@ -332,36 +706,46 @@ MSG_error_t MSG_process_sleep(long double nb_sec)
   if(state == SURF_ACTION_DONE) {
     if(surf_workstation_resource->extension_public->
        get_state(MSG_process_get_host(process)->simdata->host) 
-       == SURF_CPU_OFF)
+       == SURF_CPU_OFF) {
+      if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+       simdata->compute = NULL;
       MSG_RETURN(MSG_HOST_FAILURE);
-
+    }
     if(__MSG_process_isBlocked(process)) {
       __MSG_process_unblock(MSG_process_self());
     }
     if(surf_workstation_resource->extension_public->
        get_state(MSG_process_get_host(process)->simdata->host) 
-       == SURF_CPU_OFF)
+       == SURF_CPU_OFF) {
+      if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+       simdata->compute = NULL;
       MSG_RETURN(MSG_HOST_FAILURE);
+    }
+    if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+      simdata->compute = NULL;
     MSG_task_destroy(dummy);
     MSG_RETURN(MSG_OK);
   } else MSG_RETURN(MSG_HOST_FAILURE);
 }
 
 /** \ingroup msg_gos_functions
- * \brief Return the number of MSG tasks currently running on a
+ * \brief Return the number of MSG tasks currently running on
  * the host of the current running process.
  */
-int MSG_get_msgload(void) 
+static int MSG_get_msgload(void) 
 {
+  m_process_t process;
+   
   CHECK_HOST();
-  xbt_assert0(0,"Not implemented yet!");
   
-  return 1;
+  xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
+  process = MSG_process_self();
+  return xbt_fifo_size(process->simdata->host->simdata->process_list);
 }
 
 /** \ingroup msg_gos_functions
  *
- * \brief Return the the last value returned by a MSG function (except
+ * \brief Return the last value returned by a MSG function (except
  * MSG_get_errno...).
  */
 MSG_error_t MSG_get_errno(void)