Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Taking into account last SURF modifications (suspend, resume and is_suspended moved...
[simgrid.git] / src / msg / gos.c
index 3e1a6e6..77fa324 100644 (file)
@@ -61,7 +61,7 @@ MSG_error_t MSG_task_get(m_task_t * task,
     xbt_assert0(!(h_simdata->sleeping[channel]),
                "A process is already blocked on this channel");
     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
-    MSG_process_suspend(process);
+    __MSG_process_block();
     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
        == SURF_CPU_OFF)
       MSG_RETURN(MSG_HOST_FAILURE);
@@ -75,13 +75,15 @@ MSG_error_t MSG_task_get(m_task_t * task,
 
   /* Transfer */
   t_simdata->using++;
+
   t_simdata->comm = surf_workstation_resource->extension_public->
     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
-               h->simdata->host, t_simdata->message_size);
+               h->simdata->host, t_simdata->message_size,t_simdata->rate);
+  
   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
 
-  if(MSG_process_isSuspended(t_simdata->sender)) 
-    MSG_process_resume(t_simdata->sender);
+  if(__MSG_process_isBlocked(t_simdata->sender)) 
+    __MSG_process_unblock(t_simdata->sender);
 
   do {
     __MSG_task_wait_event(process, t);
@@ -115,6 +117,33 @@ int MSG_task_Iprobe(m_channel_t channel)
   return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
 }
 
+/** \ingroup msg_gos_functions
+ * \brief Test whether there is a pending communication on a channel, and who sent it.
+ *
+ * It takes one parameter.
+ * \param channel the channel on which the agent should be
+   listening. This value has to be >=0 and < than the maximal
+   number of channels fixed with MSG_set_channel_number().
+ * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
+ */
+int MSG_task_probe_from(m_channel_t channel)
+{
+  m_host_t h = NULL;
+  simdata_host_t h_simdata = NULL;
+  xbt_fifo_item_t item;
+  m_task_t t;
+
+  CHECK_HOST();
+  h = MSG_host_self();
+  h_simdata = h->simdata;
+   
+  item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
+  if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
+    return -1;
+   
+  return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
+}
+
 /** \ingroup msg_gos_functions
  * \brief Put a task on a channel of an host and waits for the end of the
  * transmission.
@@ -161,15 +190,15 @@ MSG_error_t MSG_task_put(m_task_t task,
                mbox[channel], task);
     
   if(remote_host->simdata->sleeping[channel]) 
-    MSG_process_resume(remote_host->simdata->sleeping[channel]);
-  else {
+    __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
+/*   else { */
     process->simdata->put_host = dest;
     process->simdata->put_channel = channel;
     while(!(task_simdata->comm)) 
-      MSG_process_suspend(process);
+      __MSG_process_block();
     process->simdata->put_host = NULL;
     process->simdata->put_channel = -1;
-  }
+/*   } */
 
   do {
     __MSG_task_wait_event(process, task);
@@ -185,6 +214,14 @@ MSG_error_t MSG_task_put(m_task_t task,
   else MSG_RETURN(MSG_TRANSFER_FAILURE);
 }
 
+MSG_error_t MSG_task_put_bounded(m_task_t task,
+                                m_host_t dest, m_channel_t channel,
+                                long double max_rate)
+{
+  task->simdata->rate=max_rate;
+  return(MSG_task_put(task, dest, channel));
+}
+
 /** \ingroup msg_gos_functions
  * \brief Executes a task and waits for its termination.
  *
@@ -275,8 +312,8 @@ MSG_error_t MSG_process_sleep(long double nb_sec)
        == SURF_CPU_OFF)
       MSG_RETURN(MSG_HOST_FAILURE);
 
-    if(MSG_process_isSuspended(process)) {
-      MSG_process_suspend(MSG_process_self());
+    if(__MSG_process_isBlocked(process)) {
+      __MSG_process_unblock(MSG_process_self());
     }
     if(surf_workstation_resource->extension_public->
        get_state(MSG_process_get_host(process)->simdata->host)