Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Now, MSG internals do not use MSG_suspend/resume anymore.
authoralegrand <alegrand@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Wed, 12 Jan 2005 22:38:47 +0000 (22:38 +0000)
committeralegrand <alegrand@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Wed, 12 Jan 2005 22:38:47 +0000 (22:38 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@752 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/msg/gos.c
src/msg/m_process.c
src/msg/private.h

index fd042a4..7e7a349 100644 (file)
@@ -61,7 +61,7 @@ MSG_error_t MSG_task_get(m_task_t * task,
     xbt_assert0(!(h_simdata->sleeping[channel]),
                "A process is already blocked on this channel");
     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
-    MSG_process_suspend(process);
+    __MSG_process_block();
     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
        == SURF_CPU_OFF)
       MSG_RETURN(MSG_HOST_FAILURE);
@@ -80,8 +80,8 @@ MSG_error_t MSG_task_get(m_task_t * task,
                h->simdata->host, t_simdata->message_size);
   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
 
-  if(MSG_process_isSuspended(t_simdata->sender)) 
-    MSG_process_resume(t_simdata->sender);
+  if(__MSG_process_isBlocked(t_simdata->sender)) 
+    __MSG_process_unblock(t_simdata->sender);
 
   do {
     __MSG_task_wait_event(process, t);
@@ -188,12 +188,12 @@ MSG_error_t MSG_task_put(m_task_t task,
                mbox[channel], task);
     
   if(remote_host->simdata->sleeping[channel]) 
-    MSG_process_resume(remote_host->simdata->sleeping[channel]);
+    __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
   else {
     process->simdata->put_host = dest;
     process->simdata->put_channel = channel;
     while(!(task_simdata->comm)) 
-      MSG_process_suspend(process);
+      __MSG_process_block();
     process->simdata->put_host = NULL;
     process->simdata->put_channel = -1;
   }
@@ -302,8 +302,8 @@ MSG_error_t MSG_process_sleep(long double nb_sec)
        == SURF_CPU_OFF)
       MSG_RETURN(MSG_HOST_FAILURE);
 
-    if(MSG_process_isSuspended(process)) {
-      MSG_process_suspend(MSG_process_self());
+    if(__MSG_process_isBlocked(process)) {
+      __MSG_process_unblock(MSG_process_self());
     }
     if(surf_workstation_resource->extension_public->
        get_state(MSG_process_get_host(process)->simdata->host) 
index 920f92b..67d2328 100644 (file)
@@ -280,6 +280,7 @@ MSG_error_t MSG_process_suspend(m_process_t process)
     xbt_assert0(((simdata_task->compute)||(simdata_task->comm))&&
                !((simdata_task->comm)&&(simdata_task->comm)),
                "Got a problem in deciding which action to choose !");
+    simdata->suspended = 1;
     if(simdata_task->compute) 
       surf_workstation_resource->extension_public->suspend(simdata_task->compute);
     else
@@ -288,9 +289,11 @@ MSG_error_t MSG_process_suspend(m_process_t process)
     m_task_t dummy = MSG_TASK_UNINITIALIZED;
     dummy = MSG_task_create("suspended", 0.0, 0, NULL);
 
+    simdata->suspended = 1;
     __MSG_task_execute(process,dummy);
     surf_workstation_resource->extension_public->suspend(dummy->simdata->compute);
     __MSG_wait_for_computation(process,dummy);
+    simdata->suspended = 0;
 
     MSG_task_destroy(dummy);
   }
@@ -313,12 +316,19 @@ MSG_error_t MSG_process_resume(m_process_t process)
   CHECK_HOST();
 
   simdata = process->simdata;
+
+  if(simdata->blocked) {
+    simdata->suspended = 0; /* He'll wake up by itself */
+    MSG_RETURN(MSG_OK);
+  }
+
   if(!(simdata->waiting_task)) {
     xbt_assert0(0,"Process not waiting for anything else. Weird !");
     return MSG_WARNING;
   }
   simdata_task = simdata->waiting_task->simdata;
 
+
   if(simdata_task->compute) 
     surf_workstation_resource->extension_public->resume(simdata_task->compute);
   else 
@@ -334,23 +344,63 @@ MSG_error_t MSG_process_resume(m_process_t process)
  * task on which it was waiting for the completion.
  */
 int MSG_process_isSuspended(m_process_t process)
+{
+  xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
+
+  return (process->simdata->suspended);
+}
+
+
+
+
+
+MSG_error_t __MSG_process_block()
+{
+  m_process_t process = MSG_process_self();
+
+  m_task_t dummy = MSG_TASK_UNINITIALIZED;
+  dummy = MSG_task_create("blocked", 0.0, 0, NULL);
+  
+  process->simdata->blocked=1;
+  __MSG_task_execute(process,dummy);
+  surf_workstation_resource->extension_public->suspend(dummy->simdata->compute);
+  __MSG_wait_for_computation(process,dummy);
+  process->simdata->blocked=0;
+
+  if(process->simdata->suspended)
+    MSG_process_suspend(process);
+  
+  MSG_task_destroy(dummy);
+
+  return MSG_OK;
+}
+
+MSG_error_t __MSG_process_unblock(m_process_t process)
 {
   simdata_process_t simdata = NULL;
   simdata_task_t simdata_task = NULL;
   int i;
-  
+
   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
+  CHECK_HOST();
 
   simdata = process->simdata;
   if(!(simdata->waiting_task)) {
     xbt_assert0(0,"Process not waiting for anything else. Weird !");
-    return 0;
+    return MSG_WARNING;
   }
-
   simdata_task = simdata->waiting_task->simdata;
 
-  if(simdata_task->compute) 
-    return surf_workstation_resource->extension_public->is_suspended(simdata_task->compute);
-  else 
-    return surf_workstation_resource->extension_public->is_suspended(simdata_task->comm);
+  xbt_assert0(simdata->blocked,"Process not blocked");
+
+  surf_workstation_resource->extension_public->resume(simdata_task->compute);
+
+  MSG_RETURN(MSG_OK);
+}
+
+int __MSG_process_isBlocked(m_process_t process)
+{
+  xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
+
+  return (process->simdata->blocked);
 }
index 8c8af4d..f921e9c 100644 (file)
@@ -46,6 +46,8 @@ typedef struct simdata_process {
   int PID;                     /* used for debugging purposes */
   int PPID;                    /* The parent PID */
   m_task_t waiting_task;        
+  int blocked;
+  int suspended;
   m_host_t put_host;           /* used for debugging purposes */
   m_channel_t put_channel;     /* used for debugging purposes */
   int argc;                     /* arguments number if any */
@@ -83,4 +85,8 @@ void __MSG_task_execute(m_process_t process, m_task_t task);
 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task);
 MSG_error_t __MSG_task_wait_event(m_process_t process, m_task_t task);
 
+MSG_error_t __MSG_process_block();
+MSG_error_t __MSG_process_unblock(m_process_t process);
+int __MSG_process_isBlocked(m_process_t process);
+
 #endif