Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Debug the cloud API, it should work as expected now.
authorSamuel Lepetit <samuel.lepetit@inria.fr>
Wed, 13 Jun 2012 16:07:40 +0000 (18:07 +0200)
committerSamuel Lepetit <samuel.lepetit@inria.fr>
Wed, 13 Jun 2012 16:08:18 +0000 (18:08 +0200)
examples/msg/cloud/masterslave_virtual_machines.c
src/msg/msg_private.h
src/msg/msg_process.c
src/msg/msg_vm.c

index 0ed5b90..87d1050 100644 (file)
@@ -98,7 +98,6 @@ int master(int argc, char *argv[]) {
     argv[2] = NULL;
     MSG_vm_bind(vm, MSG_process_create_with_arguments(slavename,slave_fun,NULL,slaves[i],2,argv));
   }
-  work_batch(slaves_count*2);
 
   XBT_INFO("Migrate everyone to the second host.");
   for (i=0;i<xbt_dynar_length(vms);i++)
@@ -111,8 +110,7 @@ int master(int argc, char *argv[]) {
     MSG_vm_migrate(vm,slaves[2]);
     MSG_vm_resume(vm);
   }
-
-
+  work_batch(slaves_count*2);
 
   XBT_INFO("Let's shut down the simulation. 10 first processes will be shut down cleanly while the second half will forcefully get killed");
   for (i = 0; i < slaves_count; i++) {
index 23699dd..cc570f1 100644 (file)
@@ -83,6 +83,7 @@ typedef struct process_arg {
   double kill_time;
 } s_process_arg_t, *process_arg_t;
 
+
 typedef struct msg_comm {
   smx_action_t s_comm;          /* SIMIX communication object encapsulated (the same for both processes) */
   m_task_t task_sent;           /* task sent (NULL for the receiver) */
@@ -103,6 +104,11 @@ typedef struct msg_vm {
   int coreAmount;
 } s_msg_vm_t;
 
+typedef struct s_msg_process_data {
+       void *data;
+       msg_vm_t current_vm;
+} s_msg_process_data_t, *msg_process_data_t;
+
 /************************** Global variables ********************************/
 typedef struct MSG_Global {
   xbt_fifo_t host;
index 50c8fa1..4ec3efa 100644 (file)
@@ -33,25 +33,35 @@ void MSG_process_cleanup_from_SIMIX(smx_process_t smx_proc)
 
   // get the MSG process from the SIMIX process
   if (smx_proc == SIMIX_process_self()) {
-    /* avoid a SIMIX request if this function is called by the process itself */
+       /* avoid a SIMIX request if this function is called by the process itself */
     msg_proc = SIMIX_process_self_get_data(smx_proc);
+
     SIMIX_process_self_set_data(smx_proc, NULL);
   }
   else {
     msg_proc = simcall_process_get_data(smx_proc);
     simcall_process_set_data(smx_proc, NULL);
   }
-
 #ifdef HAVE_TRACING
   TRACE_msg_process_end(smx_proc);
 #endif
 
-  // free the data if a function was provided
-  if (msg_proc->data && msg_global->process_data_cleanup) {
-    msg_global->process_data_cleanup(msg_proc->data);
+  msg_process_data_t process_data = (msg_process_data_t)msg_proc->data;
+  //free the process data
+  if (process_data) {
+       //Remove the process from its vm
+       if (process_data->current_vm) {
+         int pos = xbt_dynar_search(process_data->current_vm->processes,&smx_proc);
+         xbt_dynar_remove_at(process_data->current_vm->processes,pos, NULL);
+       }
+       //Free the data if a function was provided
+    if (process_data->data && msg_global->process_data_cleanup) {
+      msg_global->process_data_cleanup(process_data->data);
+    }
+    xbt_free(process_data);
   }
 
-  // free the MSG process
+  //free the MSG process
   xbt_free(msg_proc);
 }
 
@@ -248,7 +258,10 @@ void* MSG_process_get_data(m_process_t process)
 
   /* get from SIMIX the MSG process data, and then the user data */
   simdata_process_t simdata = simcall_process_get_data(process);
-  return simdata->data;
+  if (!simdata->data) {
+       return NULL;
+  }
+  return ((msg_process_data_t)simdata->data)->data;
 }
 
 /** \ingroup m_process_management
@@ -262,8 +275,10 @@ MSG_error_t MSG_process_set_data(m_process_t process, void *data)
   xbt_assert(process != NULL, "Invalid parameter");
 
   simdata_process_t simdata = simcall_process_get_data(process);
-  simdata->data = data;
-
+  if (!simdata->data) {
+       simdata->data = xbt_new0(s_msg_process_data_t, 1);
+  }
+       ((msg_process_data_t)simdata->data)->data = data;
   return MSG_OK;
 }
 
index b31536f..fd951c2 100644 (file)
@@ -65,7 +65,24 @@ int MSG_vm_is_running(msg_vm_t vm) {
  * @bug for now, if a binded process terminates, every VM functions will segfault. Baaaad.
  */
 void MSG_vm_bind(msg_vm_t vm, m_process_t process) {
-  xbt_dynar_push_as(vm->processes,m_process_t,process);
+  simdata_process_t simdata = simcall_process_get_data(process);
+  if (!simdata->data) {
+       simdata->data = xbt_new0(s_msg_process_data_t,1);
+  }
+  //If if it is already in a vm, get it out of it
+  if ( ((msg_process_data_t)(simdata->data))->current_vm) {
+       msg_vm_t old_vm = ((msg_process_data_t)(simdata->data))->current_vm;
+    int pos = xbt_dynar_search(old_vm->processes,&process);
+    xbt_dynar_remove_at(old_vm->processes,pos, NULL);
+    //If it is on the wrong host, migrate it to the new host
+    if (vm->location != old_vm->location) {
+       MSG_process_migrate(process,vm->location);
+    }
+  }
+
+  ((msg_process_data_t)(simdata->data))->current_vm = vm;
+
+       xbt_dynar_push_as(vm->processes,m_process_t,process);
 }
 /** @brief Removes the given process from the given VM, and kill it
  *  @ingroup msg_VMs
@@ -107,7 +124,7 @@ void MSG_vm_suspend(msg_vm_t vm) {
   unsigned int cpt;
   m_process_t process;
   xbt_dynar_foreach(vm->processes,cpt,process) {
-    XBT_INFO("suspend process %s of host %s",MSG_process_get_name(process),MSG_host_get_name(MSG_process_get_host(process)));
+       XBT_DEBUG("suspend process %s of host %s",MSG_process_get_name(process),MSG_host_get_name(MSG_process_get_host(process)));
     MSG_process_suspend(process);
   }
 }
@@ -123,7 +140,7 @@ void MSG_vm_resume(msg_vm_t vm) {
   unsigned int cpt;
   m_process_t process;
   xbt_dynar_foreach(vm->processes,cpt,process) {
-    XBT_INFO("resume process %s of host %s",MSG_process_get_name(process),MSG_host_get_name(MSG_process_get_host(process)));
+    XBT_DEBUG("resume process %s of host %s",MSG_process_get_name(process),MSG_host_get_name(MSG_process_get_host(process)));
     MSG_process_resume(process);
   }
 }