--- /dev/null
+/* Copyright (c) 2007-2014. The SimGrid Team.
+ * All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <stdio.h>
+#include "msg/msg.h"
+#include "xbt/sysdep.h" /* calloc, printf */
+
+/* Create a log channel to have nice outputs. */
+#include "xbt/log.h"
+#include "xbt/asserts.h"
+XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
+ "Messages specific for this msg example");
+
+/** @addtogroup MSG_examples
+ *
+ * - <b>cloud/masterslave_virtual_machines.c: Master/workers
+ * example on a cloud</b>. The classical example revisited to demonstrate the use of virtual machines.
+ */
+
+const double task_comp_size = 10000000;
+const double task_comm_size = 10000000;
+
+
+int master_fun(int argc, char *argv[]);
+int worker_fun(int argc, char *argv[]);
+
+
+static void send_tasks(int nb_workers)
+{
+ int i;
+ for (i = 0; i < nb_workers; i++) {
+ char *tname = bprintf("Task%02d", i);
+ char *mbox = bprintf("MBOX:WRK%02d", i);
+
+ msg_task_t task = MSG_task_create(tname, task_comp_size, task_comm_size, NULL);
+
+ XBT_INFO("Send task(%s) to mailbox(%s)", tname, mbox);
+ MSG_task_send(task, mbox);
+
+ free(tname);
+ free(mbox);
+ }
+}
+
+int master_fun(int argc, char *argv[])
+{
+ msg_vm_t vm;
+ unsigned int i;
+
+ xbt_dynar_t worker_pms = MSG_process_get_data(MSG_process_self());
+ int nb_workers = xbt_dynar_length(worker_pms);
+
+ xbt_dynar_t vms = xbt_dynar_new(sizeof(msg_vm_t), NULL);
+
+
+ /* Launch VMs and worker processes. One VM per PM, and one worker process per VM. */
+
+ XBT_INFO("# Launch %d VMs", nb_workers);
+ for (i = 0; i< nb_workers; i++) {
+ char *vm_name = bprintf("VM%02d", i);
+ char *pr_name = bprintf("WRK%02d", i);
+
+ msg_host_t pm = xbt_dynar_get_as(worker_pms, i, msg_host_t);
+
+ XBT_INFO("create %s on PM(%s)", vm_name, MSG_host_get_name(pm));
+ msg_vm_t vm = MSG_vm_create_core(pm, vm_name);
+
+ s_ws_params_t params;
+ memset(¶ms, 0, sizeof(params));
+ params.ramsize = 1L * 1024 * 1024 * 1024; // 1Gbytes
+ MSG_host_set_params(vm, ¶ms);
+
+ MSG_vm_start(vm);
+ xbt_dynar_push(vms, &vm);
+
+ XBT_INFO("put a process (%s) on %s", pr_name, vm_name);
+ MSG_process_create(pr_name, worker_fun, NULL, vm);
+
+ xbt_free(vm_name);
+ xbt_free(pr_name);
+ }
+
+
+ /* Send a bunch of work to every one */
+ XBT_INFO("# Send a task to %d worker process", nb_workers);
+ send_tasks(nb_workers);
+
+ XBT_INFO("# Suspend all VMs");
+ xbt_dynar_foreach(vms, i, vm) {
+ const char *vm_name = MSG_host_get_name(vm);
+ XBT_INFO("suspend %s", vm_name);
+ MSG_vm_suspend(vm);
+ }
+
+ XBT_INFO("# Wait a while");
+ MSG_process_sleep(2);
+
+ XBT_INFO("# Resume all VMs");
+ xbt_dynar_foreach(vms, i, vm) {
+ MSG_vm_resume(vm);
+ }
+
+
+ XBT_INFO("# Sleep long enough for everyone to be done with previous batch of work");
+ MSG_process_sleep(1000 - MSG_get_clock());
+
+ XBT_INFO("# Add one more process on each VM");
+ xbt_dynar_foreach(vms, i, vm) {
+ unsigned int index = i + xbt_dynar_length(vms);
+ char *vm_name = bprintf("VM%02d", i);
+ char *pr_name = bprintf("WRK%02d", index);
+
+ XBT_INFO("put a process (%s) on %s", pr_name, vm_name);
+ MSG_process_create(pr_name, worker_fun, NULL, vm);
+
+ xbt_free(vm_name);
+ xbt_free(pr_name);
+ }
+
+ XBT_INFO("# Send a task to %d worker process", nb_workers * 2);
+ send_tasks(nb_workers * 2);
+
+ msg_host_t worker_pm0 = xbt_dynar_get_as(worker_pms, 0, msg_host_t);
+ msg_host_t worker_pm1 = xbt_dynar_get_as(worker_pms, 1, msg_host_t);
+
+ XBT_INFO("# Migrate all VMs to PM(%s)", MSG_host_get_name(worker_pm0));
+ xbt_dynar_foreach(vms, i, vm) {
+ MSG_vm_migrate(vm, worker_pm0);
+ }
+
+ XBT_INFO("# Migrate all VMs to PM(%s)", MSG_host_get_name(worker_pm1));
+ xbt_dynar_foreach(vms, i, vm) {
+ MSG_vm_migrate(vm, worker_pm1);
+ }
+
+
+ XBT_INFO("# Shutdown the half of worker processes gracefuly. The remaining half will be forcibly killed.");
+ for (i = 0; i < nb_workers; i++) {
+ char mbox[64];
+ sprintf(mbox, "MBOX:WRK%02d", i);
+ msg_task_t finalize = MSG_task_create("finalize", 0, 0, 0);
+ MSG_task_send(finalize, mbox);
+ }
+
+ XBT_INFO("# Wait a while before effective shutdown.");
+ MSG_process_sleep(2);
+
+
+ XBT_INFO("# Shutdown and destroy all the VMs. The remaining worker processes will be forcibly killed.");
+ xbt_dynar_foreach(vms, i, vm) {
+ XBT_INFO("shutdown %s", MSG_host_get_name(vm));
+ MSG_vm_shutdown(vm);
+ XBT_INFO("destroy %s", MSG_host_get_name(vm));
+ MSG_vm_destroy(vm);
+ }
+
+ XBT_INFO("# Goodbye now!");
+ xbt_dynar_free(&vms);
+
+ return 0;
+}
+
+/** Receiver function */
+int worker_fun(int argc, char *argv[])
+{
+ const char *pr_name = MSG_process_get_name(MSG_process_self());
+ char *mbox = bprintf("MBOX:%s", pr_name);
+
+ XBT_INFO("%s is listenning on mailbox(%s)", pr_name, mbox);
+
+ for (;;) {
+ msg_task_t task = NULL;
+
+ msg_error_t res = MSG_task_receive(&task, mbox);
+ if (res != MSG_OK) {
+ XBT_CRITICAL("MSG_task_get failed");
+ DIE_IMPOSSIBLE;
+ }
+
+ XBT_INFO("%s received task(%s) from mailbox(%s)",
+ pr_name, MSG_task_get_name(task), mbox);
+
+ if (!strcmp(MSG_task_get_name(task), "finalize")) {
+ MSG_task_destroy(task);
+ break;
+ }
+
+ MSG_task_execute(task);
+ XBT_INFO("%s executed task(%s)", pr_name, MSG_task_get_name(task));
+ MSG_task_destroy(task);
+ }
+
+ free(mbox);
+
+ return 0;
+}
+
+
+
+
+int main(int argc, char *argv[])
+{
+ const int nb_workers = 2;
+
+ MSG_init(&argc, argv);
+ if (argc != 2) {
+ printf("Usage: %s example/msg/msg_platform.xml\n", argv[0]);
+ return 1;
+ }
+
+ /* Load the platform file */
+ MSG_create_environment(argv[1]);
+
+ /* Retrieve hosts from the platform file */
+ xbt_dynar_t pms = MSG_hosts_as_dynar();
+
+ /* we need a master node and worker nodes */
+ if (xbt_dynar_length(pms) < nb_workers + 1) {
+ XBT_CRITICAL("need %d hosts", nb_workers + 1);
+ return 1;
+ }
+
+ /* the first pm is the master, the others are workers */
+ msg_host_t master_pm = xbt_dynar_get_as(pms, 0, msg_host_t);
+
+ xbt_dynar_t worker_pms = xbt_dynar_new(sizeof(msg_host_t), NULL);
+ int i;
+ for (i = 1; i < nb_workers + 1; i++) {
+ msg_host_t pm = xbt_dynar_get_as(pms, i, msg_host_t);
+ xbt_dynar_push(worker_pms, &pm);
+ }
+
+
+ /* Start the master process on the master pm. */
+ MSG_process_create("master", master_fun, worker_pms, master_pm);
+
+ msg_error_t res = MSG_main();
+ XBT_INFO("Bye (simulation time %g)", MSG_get_clock());
+
+ xbt_dynar_free(&worker_pms);
+ xbt_dynar_free(&pms);
+
+ return !(res == MSG_OK);
+}
--- /dev/null
+#! ./tesh
+
+p Testing the Cloud API with a simple masterslave
+
+$ $SG_TEST_EXENV ${bindir:=.}/master_worker_vm$EXEEXT --log=no_loc ${srcdir:=.}/../msg_platform.xml
+> [Jacquelin:master:(1) 0.000000] [msg_test/INFO] # Launch 2 VMs
+> [Jacquelin:master:(1) 0.000000] [msg_test/INFO] create VM00 on PM(Intel)
+> [0.000000] [surf_vm_workstation/INFO] Create VM(VM00)@PM(Intel) with 0 mounted disks
+> [Jacquelin:master:(1) 0.000000] [msg_test/INFO] put a process (WRK00) on VM00
+> [VM00:WRK00:(2) 0.000000] [msg_test/INFO] WRK00 is listenning on mailbox(MBOX:WRK00)
+> [Jacquelin:master:(1) 0.000000] [msg_test/INFO] create VM01 on PM(Provost)
+> [0.000000] [surf_vm_workstation/INFO] Create VM(VM01)@PM(Provost) with 0 mounted disks
+> [Jacquelin:master:(1) 0.000000] [msg_test/INFO] put a process (WRK01) on VM01
+> [VM01:WRK01:(3) 0.000000] [msg_test/INFO] WRK01 is listenning on mailbox(MBOX:WRK01)
+> [Jacquelin:master:(1) 0.000000] [msg_test/INFO] # Send a task to 2 worker process
+> [Jacquelin:master:(1) 0.000000] [msg_test/INFO] Send task(Task00) to mailbox(MBOX:WRK00)
+> [VM00:WRK00:(2) 23.846402] [msg_test/INFO] WRK00 received task(Task00) from mailbox(MBOX:WRK00)
+> [Jacquelin:master:(1) 23.846402] [msg_test/INFO] Send task(Task01) to mailbox(MBOX:WRK01)
+> [VM00:WRK00:(2) 23.919218] [msg_test/INFO] WRK00 executed task(Task00)
+> [VM01:WRK01:(3) 48.653760] [msg_test/INFO] WRK01 received task(Task01) from mailbox(MBOX:WRK01)
+> [Jacquelin:master:(1) 48.653760] [msg_test/INFO] # Suspend all VMs
+> [Jacquelin:master:(1) 48.653760] [msg_test/INFO] suspend VM00
+> [Jacquelin:master:(1) 48.653760] [msg_test/INFO] suspend VM01
+> [Jacquelin:master:(1) 48.653760] [msg_test/INFO] # Wait a while
+> [Jacquelin:master:(1) 50.653760] [msg_test/INFO] # Resume all VMs
+> [Jacquelin:master:(1) 50.653760] [msg_test/INFO] # Sleep long enough for everyone to be done with previous batch of work
+> [VM01:WRK01:(3) 50.726576] [msg_test/INFO] WRK01 executed task(Task01)
+> [Jacquelin:master:(1) 1000.000000] [msg_test/INFO] # Add one more process on each VM
+> [Jacquelin:master:(1) 1000.000000] [msg_test/INFO] put a process (WRK02) on VM00
+> [VM00:WRK02:(4) 1000.000000] [msg_test/INFO] WRK02 is listenning on mailbox(MBOX:WRK02)
+> [Jacquelin:master:(1) 1000.000000] [msg_test/INFO] put a process (WRK03) on VM01
+> [VM01:WRK03:(5) 1000.000000] [msg_test/INFO] WRK03 is listenning on mailbox(MBOX:WRK03)
+> [Jacquelin:master:(1) 1000.000000] [msg_test/INFO] # Send a task to 4 worker process
+> [Jacquelin:master:(1) 1000.000000] [msg_test/INFO] Send task(Task00) to mailbox(MBOX:WRK00)
+> [VM00:WRK00:(2) 1023.846402] [msg_test/INFO] WRK00 received task(Task00) from mailbox(MBOX:WRK00)
+> [Jacquelin:master:(1) 1023.846402] [msg_test/INFO] Send task(Task01) to mailbox(MBOX:WRK01)
+> [VM00:WRK00:(2) 1023.919218] [msg_test/INFO] WRK00 executed task(Task00)
+> [VM01:WRK01:(3) 1048.653760] [msg_test/INFO] WRK01 received task(Task01) from mailbox(MBOX:WRK01)
+> [Jacquelin:master:(1) 1048.653760] [msg_test/INFO] Send task(Task02) to mailbox(MBOX:WRK02)
+> [VM01:WRK01:(3) 1048.726576] [msg_test/INFO] WRK01 executed task(Task01)
+> [VM00:WRK02:(4) 1072.500163] [msg_test/INFO] WRK02 received task(Task02) from mailbox(MBOX:WRK02)
+> [Jacquelin:master:(1) 1072.500163] [msg_test/INFO] Send task(Task03) to mailbox(MBOX:WRK03)
+> [VM00:WRK02:(4) 1072.572978] [msg_test/INFO] WRK02 executed task(Task02)
+> [VM01:WRK03:(5) 1097.307521] [msg_test/INFO] WRK03 received task(Task03) from mailbox(MBOX:WRK03)
+> [Jacquelin:master:(1) 1097.307521] [msg_test/INFO] # Migrate all VMs to PM(Intel)
+> [Intel:__pr_mig_tx:VM00(Intel-Intel):(7) 1097.307521] [msg_vm/WARNING] use the default max_downtime value 30ms
+> [Intel:__pr_mig_tx:VM00(Intel-Intel):(7) 1097.307521] [msg_vm/INFO] mig-stage1: remaining_size 1073741824.000000
+> [VM01:WRK03:(5) 1097.380336] [msg_test/INFO] WRK03 executed task(Task03)
+> [Intel:__pr_mig_tx:VM00(Intel-Intel):(7) 1099.463824] [msg_vm/INFO] actual banwdidth 474.886827 (MB/s), threshold 14938647.898422
+> [Intel:__pr_mig_tx:VM00(Intel-Intel):(7) 1099.463824] [msg_vm/INFO] mig-stage 2:0 updated_size 0.000000 computed_during_stage1 0.000000 dp_rate 0.000000 dp_cap 0.000000
+> [Intel:__pr_mig_tx:VM00(Intel-Intel):(7) 1099.463824] [msg_vm/INFO] mig-stage2.0: remaining_size 0.000000 (< threshold 14938647.898422)
+> [Intel:__pr_mig_tx:VM00(Intel-Intel):(7) 1099.463824] [msg_vm/INFO] mig-stage3: remaining_size 0.000000
+> [1099.464019] [surf_vm_workstation/INFO] migrate VM(VM00): set bound (137333000.000000) at Intel
+> [Intel:__pr_mig_rx:VM00(Intel-Intel):(6) 1099.464019] [msg_vm/INFO] set affinity(0x0000@Intel) for VM00
+> [Provost:__pr_mig_tx:VM01(Provost-Intel):(11) 1100.382717] [msg_vm/WARNING] use the default max_downtime value 30ms
+> [Provost:__pr_mig_tx:VM01(Provost-Intel):(11) 1100.382717] [msg_vm/INFO] mig-stage1: remaining_size 1073741824.000000
+> [Provost:__pr_mig_tx:VM01(Provost-Intel):(11) 3564.234309] [msg_vm/INFO] actual banwdidth 0.415609 (MB/s), threshold 13073.942775
+> [Provost:__pr_mig_tx:VM01(Provost-Intel):(11) 3564.234309] [msg_vm/INFO] mig-stage 2:0 updated_size 0.000000 computed_during_stage1 0.000000 dp_rate 0.000000 dp_cap 0.000000
+> [Provost:__pr_mig_tx:VM01(Provost-Intel):(11) 3564.234309] [msg_vm/INFO] mig-stage2.0: remaining_size 0.000000 (< threshold 13073.942775)
+> [Provost:__pr_mig_tx:VM01(Provost-Intel):(11) 3564.234309] [msg_vm/INFO] mig-stage3: remaining_size 0.000000
+> [3566.242426] [surf_vm_workstation/INFO] migrate VM(VM01): set bound (137333000.000000) at Intel
+> [Intel:__pr_mig_rx:VM01(Provost-Intel):(10) 3566.242426] [msg_vm/INFO] set affinity(0x0000@Intel) for VM01
+> [Jacquelin:master:(1) 3567.161124] [msg_test/INFO] # Migrate all VMs to PM(Provost)
+> [Intel:__pr_mig_tx:VM00(Intel-Provost):(15) 3567.161124] [msg_vm/WARNING] use the default max_downtime value 30ms
+> [Intel:__pr_mig_tx:VM00(Intel-Provost):(15) 3567.161124] [msg_vm/INFO] mig-stage1: remaining_size 1073741824.000000
+> [Intel:__pr_mig_tx:VM00(Intel-Provost):(15) 6031.012716] [msg_vm/INFO] actual banwdidth 0.415609 (MB/s), threshold 13073.942775
+> [Intel:__pr_mig_tx:VM00(Intel-Provost):(15) 6031.012716] [msg_vm/INFO] mig-stage 2:0 updated_size 0.000000 computed_during_stage1 0.000000 dp_rate 0.000000 dp_cap 0.000000
+> [Intel:__pr_mig_tx:VM00(Intel-Provost):(15) 6031.012716] [msg_vm/INFO] mig-stage2.0: remaining_size 0.000000 (< threshold 13073.942775)
+> [Intel:__pr_mig_tx:VM00(Intel-Provost):(15) 6031.012716] [msg_vm/INFO] mig-stage3: remaining_size 0.000000
+> [6033.020833] [surf_vm_workstation/INFO] migrate VM(VM00): set bound (137333000.000000) at Provost
+> [Provost:__pr_mig_rx:VM00(Intel-Provost):(14) 6033.020833] [msg_vm/INFO] set affinity(0x0000@Provost) for VM00
+> [Intel:__pr_mig_tx:VM01(Intel-Provost):(19) 6034.900487] [msg_vm/WARNING] use the default max_downtime value 30ms
+> [Intel:__pr_mig_tx:VM01(Intel-Provost):(19) 6034.900487] [msg_vm/INFO] mig-stage1: remaining_size 1073741824.000000
+> [Intel:__pr_mig_tx:VM01(Intel-Provost):(19) 8498.752079] [msg_vm/INFO] actual banwdidth 0.415609 (MB/s), threshold 13073.942775
+> [Intel:__pr_mig_tx:VM01(Intel-Provost):(19) 8498.752079] [msg_vm/INFO] mig-stage 2:0 updated_size 0.000000 computed_during_stage1 0.000000 dp_rate 0.000000 dp_cap 0.000000
+> [Intel:__pr_mig_tx:VM01(Intel-Provost):(19) 8498.752079] [msg_vm/INFO] mig-stage2.0: remaining_size 0.000000 (< threshold 13073.942775)
+> [Intel:__pr_mig_tx:VM01(Intel-Provost):(19) 8498.752079] [msg_vm/INFO] mig-stage3: remaining_size 0.000000
+> [8500.760196] [surf_vm_workstation/INFO] migrate VM(VM01): set bound (137333000.000000) at Provost
+> [Provost:__pr_mig_rx:VM01(Intel-Provost):(18) 8500.760196] [msg_vm/INFO] set affinity(0x0000@Provost) for VM01
+> [Jacquelin:master:(1) 8502.639850] [msg_test/INFO] # Shutdown the half of worker processes gracefuly. The remaining half will be forcibly killed.
+> [VM00:WRK00:(2) 8504.519504] [msg_test/INFO] WRK00 received task(finalize) from mailbox(MBOX:WRK00)
+> [VM01:WRK01:(3) 8506.399157] [msg_test/INFO] WRK01 received task(finalize) from mailbox(MBOX:WRK01)
+> [Jacquelin:master:(1) 8506.399157] [msg_test/INFO] # Wait a while before effective shutdown.
+> [Jacquelin:master:(1) 8508.399157] [msg_test/INFO] # Shutdown and destroy all the VMs. The remaining worker processes will be forcibly killed.
+> [Jacquelin:master:(1) 8508.399157] [msg_test/INFO] shutdown VM00
+> [Jacquelin:master:(1) 8508.399157] [msg_test/INFO] destroy VM00
+> [Jacquelin:master:(1) 8508.399157] [msg_test/INFO] shutdown VM01
+> [Jacquelin:master:(1) 8508.399157] [msg_test/INFO] destroy VM01
+> [Jacquelin:master:(1) 8508.399157] [msg_test/INFO] # Goodbye now!
+> [8508.399157] [msg_test/INFO] Bye (simulation time 8508.4)