1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "simgrid/msg.h"
7 #include "simgrid/plugins/live_migration.h"
9 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test, "Messages specific for this msg example");
13 /** @addtogroup MSG_examples
15 * - <b>cloud/master_worker_vm.c: Master/workers
16 * example on a cloud</b>. The classical example revisited to demonstrate the use of virtual machines.
19 const double task_comp_size = 10000000;
20 const double task_comm_size = 10000000;
22 static void send_tasks(int nb_workers)
24 for (int i = 0; i < nb_workers; i++) {
25 char *tname = bprintf("Task%02d", i);
26 char *mbox = bprintf("MBOX:WRK%02d", i);
28 msg_task_t task = MSG_task_create(tname, task_comp_size, task_comm_size, NULL);
30 XBT_INFO("Send task(%s) to mailbox(%s)", tname, mbox);
31 MSG_task_send(task, mbox);
38 static int worker_fun(int argc, char *argv[])
40 const char *pr_name = MSG_process_get_name(MSG_process_self());
41 char mbox[MAXMBOXLEN];
42 snprintf(mbox, MAXMBOXLEN, "MBOX:%s", pr_name);
44 XBT_INFO("%s is listening on mailbox(%s)", pr_name, mbox);
47 msg_task_t task = NULL;
49 msg_error_t res = MSG_task_receive(&task, mbox);
50 xbt_assert(res == MSG_OK, "MSG_task_get failed");
52 XBT_INFO("%s received task(%s) from mailbox(%s)", pr_name, MSG_task_get_name(task), mbox);
54 if (strcmp(MSG_task_get_name(task), "finalize") == 0) {
55 MSG_task_destroy(task);
59 MSG_task_execute(task);
60 XBT_INFO("%s executed task(%s)", pr_name, MSG_task_get_name(task));
61 MSG_task_destroy(task);
66 static int master_fun(int argc, char *argv[])
71 xbt_dynar_t worker_pms = MSG_process_get_data(MSG_process_self());
72 int nb_workers = xbt_dynar_length(worker_pms);
74 xbt_dynar_t vms = xbt_dynar_new(sizeof(msg_vm_t), NULL);
76 /* Launch VMs and worker processes. One VM per PM, and one worker process per VM. */
77 XBT_INFO("# Launch %d VMs", nb_workers);
78 for (int i = 0; i< nb_workers; i++) {
79 char *vm_name = bprintf("VM%02d", i);
80 char *pr_name = bprintf("WRK%02d", i);
82 msg_host_t pm = xbt_dynar_get_as(worker_pms, i, msg_host_t);
84 XBT_INFO("create %s on PM(%s)", vm_name, MSG_host_get_name(pm));
85 msg_vm_t vm = MSG_vm_create_core(pm, vm_name);
87 MSG_vm_set_ramsize(vm, 1L * 1024 * 1024 * 1024); // 1GiB
90 xbt_dynar_push(vms, &vm);
92 XBT_INFO("put a process (%s) on %s", pr_name, vm_name);
93 MSG_process_create(pr_name, worker_fun, NULL, (msg_host_t)vm);
99 /* Send a bunch of work to every one */
100 XBT_INFO("# Send a task to %d worker process", nb_workers);
101 send_tasks(nb_workers);
103 XBT_INFO("# Suspend all VMs");
104 xbt_dynar_foreach(vms, i, vm) {
105 XBT_INFO("suspend %s", MSG_vm_get_name(vm));
109 XBT_INFO("# Wait a while");
110 MSG_process_sleep(2);
112 XBT_INFO("# Resume all VMs");
113 xbt_dynar_foreach(vms, i, vm) {
117 XBT_INFO("# Sleep long enough for everyone to be done with previous batch of work");
118 MSG_process_sleep(10 - MSG_get_clock());
120 XBT_INFO("# Add one more process on each VM");
121 xbt_dynar_foreach(vms, i, vm) {
122 unsigned int index = i + xbt_dynar_length(vms);
123 char* vm_name = bprintf("VM%02u", i);
124 char* pr_name = bprintf("WRK%02u", index);
126 XBT_INFO("put a process (%s) on %s", pr_name, vm_name);
127 MSG_process_create(pr_name, worker_fun, NULL, (msg_host_t)vm);
133 XBT_INFO("# Send a task to %d worker process", nb_workers * 2);
134 send_tasks(nb_workers * 2);
136 msg_host_t worker_pm0 = xbt_dynar_get_as(worker_pms, 0, msg_host_t);
137 msg_host_t worker_pm1 = xbt_dynar_get_as(worker_pms, 1, msg_host_t);
139 XBT_INFO("# Migrate all VMs to PM(%s)", MSG_host_get_name(worker_pm0));
140 xbt_dynar_foreach(vms, i, vm) {
141 MSG_vm_migrate(vm, worker_pm0);
144 XBT_INFO("# Migrate all VMs to PM(%s)", MSG_host_get_name(worker_pm1));
145 xbt_dynar_foreach(vms, i, vm) {
146 MSG_vm_migrate(vm, worker_pm1);
149 XBT_INFO("# Shutdown the half of worker processes gracefully. The remaining half will be forcibly killed.");
150 for (i = 0; i < nb_workers; i++) {
151 char mbox[MAXMBOXLEN];
152 snprintf(mbox, MAXMBOXLEN, "MBOX:WRK%02u", i);
153 msg_task_t finalize = MSG_task_create("finalize", 0, 0, 0);
154 MSG_task_send(finalize, mbox);
157 XBT_INFO("# Wait a while before effective shutdown.");
158 MSG_process_sleep(2);
160 XBT_INFO("# Shutdown and destroy all the VMs. The remaining worker processes will be forcibly killed.");
161 xbt_dynar_foreach(vms, i, vm) {
162 XBT_INFO("shutdown %s", MSG_vm_get_name(vm));
164 XBT_INFO("destroy %s", MSG_vm_get_name(vm));
168 XBT_INFO("# Goodbye now!");
169 xbt_dynar_free(&vms);
173 /** Receiver function */
174 int main(int argc, char *argv[])
176 const int nb_workers = 2;
178 MSG_init(&argc, argv);
179 MSG_vm_live_migration_plugin_init();
181 xbt_assert(argc >1,"Usage: %s example/platforms/cluster.xml\n", argv[0]);
183 /* Load the platform file */
184 MSG_create_environment(argv[1]);
186 /* Retrieve hosts from the platform file */
187 xbt_dynar_t pms = MSG_hosts_as_dynar();
189 /* we need a master node and worker nodes */
190 xbt_assert(xbt_dynar_length(pms) > nb_workers,"need %d hosts", nb_workers + 1);
192 /* the first pm is the master, the others are workers */
193 msg_host_t master_pm = xbt_dynar_get_as(pms, 0, msg_host_t);
195 xbt_dynar_t worker_pms = xbt_dynar_new(sizeof(msg_host_t), NULL);
196 for (int i = 1; i < nb_workers + 1; i++) {
197 msg_host_t pm = xbt_dynar_get_as(pms, i, msg_host_t);
198 xbt_dynar_push(worker_pms, &pm);
200 xbt_dynar_free(&pms);
202 /* Start the master process on the master pm. */
203 MSG_process_create("master", master_fun, worker_pms, master_pm);
205 msg_error_t res = MSG_main();
206 XBT_INFO("Bye (simulation time %g)", MSG_get_clock());
208 xbt_dynar_free(&worker_pms);
210 return !(res == MSG_OK);