Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update the VM example for the new VM support
authorTakahiro Hirofuchi <t.hirofuchi+sg@aist.go.jp>
Thu, 28 Feb 2013 11:26:17 +0000 (12:26 +0100)
committerTakahiro Hirofuchi <t.hirofuchi+sg@aist.go.jp>
Thu, 28 Feb 2013 11:26:17 +0000 (12:26 +0100)
examples/msg/cloud/masterslave_virtual_machines.c

index e1527eb..dd77241 100644 (file)
@@ -4,7 +4,7 @@
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
 #include <stdio.h>
-#include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
+#include "msg/msg.h"
 #include "xbt/sysdep.h"         /* calloc, printf */
 
 /* Create a log channel to have nice outputs. */
@@ -15,217 +15,235 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
 
 /** @addtogroup MSG_examples
  * 
- *  - <b>cloud/masterslave_virtual_machines.c: Master/slaves
- *    example, à la cloud</b>. The classical example revisited to demonstrate the use of virtual machines.
+ *  - <b>cloud/masterslave_virtual_machines.c: Master/workers
+ *    example on a cloud</b>. The classical example revisited to demonstrate the use of virtual machines.
  */
 
 double task_comp_size = 10000000;
 double task_comm_size = 10000000;
 
 
-int master(int argc, char *argv[]);
-int slave_fun(int argc, char *argv[]);
+int master_fun(int argc, char *argv[]);
+int worker_fun(int argc, char *argv[]);
 
-static void work_batch(int slaves_count) {
+static void work_batch(int workers_count)
+{
   int i;
-  for (i = 0; i < slaves_count; i++) {
-    char taskname_buffer[64];
-    char mailbox_buffer[64];
+  for (i = 0; i < workers_count; i++) {
+    char *tname = bprintf("Task%02d", i);
+    char *mbox =  bprintf("MBOX:WRK%02d", i);
+
+    msg_task_t task = MSG_task_create(tname, task_comp_size, task_comm_size, NULL);
 
-    sprintf(taskname_buffer, "Task_%d", i);
-    sprintf(mailbox_buffer,"Slave_%d",i);
+    XBT_INFO("send task(%s) to mailbox(%s)", tname, mbox);
+    MSG_task_send(task, mbox);
 
-    XBT_INFO("Sending \"%s\" to \"%s\"",taskname_buffer,mailbox_buffer);
-    MSG_task_send(MSG_task_create(taskname_buffer, task_comp_size, task_comm_size,NULL),
-        mailbox_buffer);
+    free(tname);
+    free(mbox);
   }
 }
 
-int master(int argc, char *argv[]) {
-  int slaves_count = 10;
-  msg_host_t *slaves = xbt_new(msg_host_t,10);
-
+int master_fun(int argc, char *argv[])
+{
   msg_vm_t vm;
   unsigned int i;
+  int workers_count = argc - 1;
 
-  /* Retrive the hostnames constituting our playground today */
-  for (i = 1; i < argc; i++) {
-    slaves[i - 1] = MSG_get_host_by_name(argv[i]);
-    xbt_assert(slaves[i - 1] != NULL, "Cannot use inexistent host %s", argv[i]);
-  }
+  msg_host_t *pms = xbt_new(msg_host_t, workers_count);
+  xbt_dynar_t vms = xbt_dynar_new(sizeof(msg_vm_t), NULL);
 
-  /* Launch the sub processes: one VM per host, with one process inside each */
+  /* Retrive the PMs that launch worker processes. */
+  for (i = 1; i < argc; i++)
+    pms[i - 1] = MSG_get_host_by_name(argv[i]);
 
-  for (i=0;i<slaves_count;i++) {
-    char slavename[64];
-    sprintf(slavename,"Slave %d",i);
-    char**argv=xbt_new(char*,3);
-    argv[0] = xbt_strdup(slavename);
-    argv[1] = bprintf("%d",i);
-    argv[2] = NULL;
 
-    char vmName[64];
-    snprintf(vmName, 64, "vm_%d", i);
+  /* Launch VMs and worker processes. One VM per PM, and one worker process per VM. */
 
-    msg_vm_t vm = MSG_vm_start(slaves[i],vmName,2);
-    MSG_vm_bind(vm, MSG_process_create_with_arguments(slavename,slave_fun,NULL,slaves[i],2,argv));
-  }
+  XBT_INFO("Launch %ld VMs", workers_count);
+  for (i=0; i< workers_count; i++) {
+    char *vm_name = bprintf("VM%02d", i);
+    char *pr_name = bprintf("WRK%02d", i);
+    char *mbox = bprintf("MBOX:WRK%02d", i);
 
+    char **wrk_argv = xbt_new(char*, 3);
+    wrk_argv[0] = pr_name;
+    wrk_argv[1] = mbox;
+    wrk_argv[2] = NULL;
 
-  xbt_dynar_t vms = MSG_vms_as_dynar();
-  XBT_INFO("Launched %ld VMs", xbt_dynar_length(vms));
+    XBT_INFO("create %s", vm_name);
+    msg_vm_t vm = MSG_vm_create_core(pms[i], vm_name);
+    MSG_vm_start(vm);
+    xbt_dynar_push(vms, &vm);
+
+    XBT_INFO("put %s on %s", pr_name, vm_name);
+    MSG_process_create_with_arguments(pr_name, worker_fun, NULL, vm, 2, wrk_argv);
+  }
 
-  /* Send a bunch of work to every one */
-  XBT_INFO("Send a first batch of work to every one");
-  work_batch(slaves_count);
 
-  XBT_INFO("Now suspend all VMs, just for fun");
+  /* Send a bunch of work to every one */
+  XBT_INFO("Send a task to %d worker process", workers_count);
+  work_batch(workers_count);
 
-  xbt_dynar_foreach(vms,i,vm) {
+  XBT_INFO("Suspend all VMs");
+  xbt_dynar_foreach(vms, i, vm) {
+    const char *vm_name = MSG_host_get_name(vm);
+    XBT_INFO("suspend %s", vm_name);
     MSG_vm_suspend(vm);
   }
 
   XBT_INFO("Wait a while");
   MSG_process_sleep(2);
 
-  XBT_INFO("Enough. Let's resume everybody.");
-  xbt_dynar_foreach(vms,i,vm) {
+  XBT_INFO("Resume all VMs");
+  xbt_dynar_foreach(vms, i, vm) {
     MSG_vm_resume(vm);
   }
+
+
   XBT_INFO("Sleep long enough for everyone to be done with previous batch of work");
-  MSG_process_sleep(1000-MSG_get_clock());
+  MSG_process_sleep(1000 - MSG_get_clock());
 
   XBT_INFO("Add one more process per VM");
-  xbt_dynar_foreach(vms,i,vm) {
-    msg_vm_t vm = xbt_dynar_get_as(vms,i,msg_vm_t);
-    char slavename[64];
-    sprintf(slavename,"Slave %ld",i+xbt_dynar_length(vms));
-    char**argv=xbt_new(char*,3);
-    argv[0] = xbt_strdup(slavename);
-    argv[1] = bprintf("%ld",i+xbt_dynar_length(vms));
-    argv[2] = NULL;
-    MSG_vm_bind(vm, MSG_process_create_with_arguments(slavename,slave_fun,NULL,slaves[i],2,argv));
+  xbt_dynar_foreach(vms, i, vm) {
+    unsigned int index = i + xbt_dynar_length(vms);
+    char *vm_name = bprintf("VM%02d", i);
+    char *pr_name = bprintf("WRK%02d", index);
+    char *mbox = bprintf("MBOX:WRK%02d", index);
+
+    char **wrk_argv = xbt_new(char*, 3);
+    wrk_argv[0] = pr_name;
+    wrk_argv[1] = mbox;
+    wrk_argv[2] = NULL;
+
+    XBT_INFO("put %s on %s", pr_name, vm_name);
+    MSG_process_create_with_arguments(pr_name, worker_fun, NULL, vm, 2, wrk_argv);
   }
 
-  XBT_INFO("Reboot all the VMs");
-  xbt_dynar_foreach(vms,i,vm) {
-    MSG_vm_reboot(vm);
-  }
-
-  work_batch(slaves_count*2);
+  XBT_INFO("Send a task to %d worker process", workers_count * 2);
+  work_batch(workers_count * 2);
 
-  XBT_INFO("Migrate everyone to the second host.");
-  xbt_dynar_foreach(vms,i,vm) {
-    MSG_vm_migrate(vm,slaves[1]);
+  XBT_INFO("Migrate all VMs to PM(%s)", MSG_host_get_name(pms[1]));
+  xbt_dynar_foreach(vms, i, vm) {
+    MSG_vm_migrate(vm, pms[1]);
   }
-  XBT_INFO("Suspend everyone, move them to the third host, and resume them.");
-  xbt_dynar_foreach(vms,i,vm) {
-    MSG_vm_suspend(vm);
-    MSG_vm_migrate(vm,slaves[2]);
-    MSG_vm_resume(vm);
+
+  /* FIXME: Do we need to support cold migration? Yes, but how should
+   * parameters of a migration be passed? */
+  XBT_INFO("Migrate all VMs to PM(%s)", MSG_host_get_name(pms[2]));
+  xbt_dynar_foreach(vms, i, vm) {
+    // MSG_vm_suspend(vm);
+    MSG_vm_migrate(vm, pms[2]);
+    // MSG_vm_resume(vm);
   }
 
 
-  XBT_INFO("Let's shut down the simulation. 10 first processes will be shut down cleanly while the second half will forcefully get killed");
-  for (i = 0; i < slaves_count; i++) {
-    char mailbox_buffer[64];
-    sprintf(mailbox_buffer,"Slave_%d",i);
+  XBT_INFO("Shutdown the first 10 worker processes gracefuly. The   the second half will forcefully get killed");
+  for (i = 0; i < workers_count; i++) {
+    char mbox[64];
+    sprintf(mbox, "MBOX:WRK%02d", i);
     msg_task_t finalize = MSG_task_create("finalize", 0, 0, 0);
-    MSG_task_send(finalize, mailbox_buffer);
+    MSG_task_send(finalize, mbox);
   }
 
   XBT_INFO("Wait a while before effective shutdown.");
   MSG_process_sleep(2);
 
-  xbt_dynar_foreach(vms,i,vm) {
+
+  XBT_INFO("Shutdown and destroy all the VMs. The remaining worker processes will be forcibly killed.");
+  xbt_dynar_foreach(vms, i, vm) {
+    XBT_INFO("shutdown %s", MSG_host_get_name(vm));
     MSG_vm_shutdown(vm);
+    XBT_INFO("destroy %s", MSG_host_get_name(vm));
     MSG_vm_destroy(vm);
   }
 
   XBT_INFO("Goodbye now!");
-  free(slaves);
+  free(pms);
   xbt_dynar_free(&vms);
+
   return 0;
-}                               /* end_of_master */
+}
 
 /** Receiver function  */
-int slave_fun(int argc, char *argv[])
+int worker_fun(int argc, char *argv[])
 {
-  char mailbox_name[128];
-  msg_task_t task = NULL;
-  _XBT_GNUC_UNUSED int res;
-  /* since the slaves will move around, use slave_%d as mailbox names instead of hostnames */
-  xbt_assert(argc>=2, "slave processes need to be given their rank as parameter");
-  sprintf(mailbox_name,"Slave_%s",argv[1]);
-  XBT_INFO("Slave listenning on %s",argv[1]);
-  while (1) {
-    res = MSG_task_receive(&(task),mailbox_name);
-    xbt_assert(res == MSG_OK, "MSG_task_get failed");
-
-    XBT_INFO("Received \"%s\" from mailbox %s", MSG_task_get_name(task),mailbox_name);
+  xbt_assert(argc == 2, "need mbox in arguments");
+
+  char *mbox = argv[1];
+  const char *pr_name = MSG_process_get_name(MSG_process_self());
+  XBT_INFO("%s is listenning on mailbox(%s)", pr_name, mbox);
+
+  for (;;) {
+    msg_task_t task = NULL;
+
+    msg_error_t res = MSG_task_receive(&task, mbox);
+    if (res != MSG_OK) {
+      XBT_CRITICAL("MSG_task_get failed");
+      DIE_IMPOSSIBLE;
+    }
+
+    XBT_INFO("%s received task(%s) from mailbox(%s)",
+        pr_name, MSG_task_get_name(task), mbox);
+
     if (!strcmp(MSG_task_get_name(task), "finalize")) {
       MSG_task_destroy(task);
       break;
     }
 
     MSG_task_execute(task);
-    XBT_INFO("\"%s\" done", MSG_task_get_name(task));
+    XBT_INFO("%s executed task(%s)", pr_name, MSG_task_get_name(task));
     MSG_task_destroy(task);
-    task = NULL;
   }
 
   return 0;
-}                               /* end_of_slave */
+}
 
 /** Main function */
 int main(int argc, char *argv[])
 {
-  msg_error_t res = MSG_OK;
-  xbt_dynar_t hosts_dynar;
-  msg_host_t*hosts= xbt_new(msg_host_t,10);
-  char**hostnames= xbt_new(char*,10);
-  char**masterargv=xbt_new(char*,12);
-  int i;
-
-  /* Get the arguments */
   MSG_init(&argc, argv);
-  if (argc < 2) {
-    printf("Usage: %s platform_file\n", argv[0]);
-    printf("example: %s msg_platform.xml\n", argv[0]);
-    exit(1);
-  } if (argc>2) {
-    printf("Usage: %s platform_file\n", argv[0]);
-    printf("Other parameters (such as the deployment file) are ignored.");
+  if (argc != 2) {
+    printf("Usage: %s example/msg/msg_platform.xml\n", argv[0]);
+    return 1;
   }
 
-  /* load the platform file */
+  /* Load the platform file */
   MSG_create_environment(argv[1]);
-  /* Retrieve the 10 first hosts of the platform file */
-  hosts_dynar = MSG_hosts_as_dynar();
-  xbt_assert(xbt_dynar_length(hosts_dynar)>10,
-      "I need at least 10 hosts in the platform file, but %s contains only %ld hosts_dynar.",
-      argv[1],xbt_dynar_length(hosts_dynar));
-  for (i=0;i<10;i++) {
-    hosts[i] = xbt_dynar_get_as(hosts_dynar,i,msg_host_t);
-    hostnames[i] = xbt_strdup(MSG_host_get_name(hosts[i]));
+
+  /* Retrieve the 10 first hosts from the platform file */
+  xbt_dynar_t hosts_dynar = MSG_hosts_as_dynar();
+
+  if (xbt_dynar_length(hosts_dynar) <= 10) {
+    XBT_CRITICAL("need 10 hosts");
+    return 1;
   }
-  masterargv[0]=xbt_strdup("master");
-  for (i=1;i<11;i++) {
-    masterargv[i] = xbt_strdup(MSG_host_get_name(hosts[i-1]));
+
+  msg_host_t master_pm;
+  char **master_argv = xbt_new(char *, 12);
+  master_argv[0] = xbt_strdup("master");
+  master_argv[11] = NULL;
+
+  unsigned int i;
+  msg_host_t host;
+  xbt_dynar_foreach(hosts_dynar, i, host) {
+    if (i == 0) {
+      master_pm = host;
+      continue;
+    }
+
+    master_argv[i] = xbt_strdup(MSG_host_get_name(host));
+
+    if (i == 10)
+      break;
   }
-  masterargv[11]=NULL;
-  MSG_process_create_with_arguments("master",master,NULL,hosts[0],11,masterargv);
-  res = MSG_main();
+
+
+  MSG_process_create_with_arguments("master", master_fun, NULL, master_pm, 11, master_argv);
+
+  msg_error_t res = MSG_main();
   XBT_INFO("Simulation time %g", MSG_get_clock());
 
-  free(hosts);
-  for (i=0;i<10;i++) 
-     free(hostnames[i]);
-  free(hostnames);
   xbt_dynar_free(&hosts_dynar);
 
-  if (res == MSG_OK)
-    return 0;
-  else
-    return 1;
-}                               /* end_of_main */
+  return !(res == MSG_OK);
+}