1 /* Copyright (c) 2007-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
9 #include "xbt/sysdep.h" /* calloc, printf */
11 /* Create a log channel to have nice outputs. */
13 #include "xbt/asserts.h"
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
15 "Messages specific for this msg example");
17 /** @addtogroup MSG_examples
19 * - <b>cloud/masterslave_virtual_machines.c: Master/workers
20 * example on a cloud</b>. The classical example revisited to demonstrate the use of virtual machines.
23 const double task_comp_size = 10000000;
24 const double task_comm_size = 10000000;
27 int master_fun(int argc, char *argv[]);
28 int worker_fun(int argc, char *argv[]);
31 static void work_batch(int workers_count)
34 for (i = 0; i < workers_count; i++) {
35 char *tname = bprintf("Task%02d", i);
36 char *mbox = bprintf("MBOX:WRK%02d", i);
38 msg_task_t task = MSG_task_create(tname, task_comp_size, task_comm_size, NULL);
40 XBT_INFO("send task(%s) to mailbox(%s)", tname, mbox);
41 MSG_task_send(task, mbox);
48 int master_fun(int argc, char *argv[])
52 int workers_count = argc - 1;
54 msg_host_t *pms = xbt_new(msg_host_t, workers_count);
55 xbt_dynar_t vms = xbt_dynar_new(sizeof(msg_vm_t), NULL);
57 /* Retrieve the PMs that will launch worker processes. */
58 for (i = 1; i < argc; i++)
59 pms[i - 1] = MSG_get_host_by_name(argv[i]);
62 /* Launch VMs and worker processes. One VM per PM, and one worker process per VM. */
64 XBT_INFO("Launch %d VMs", workers_count);
65 for (i=0; i< workers_count; i++) {
66 char *vm_name = bprintf("VM%02d", i);
67 char *pr_name = bprintf("WRK%02d", i);
68 char *mbox = bprintf("MBOX:WRK%02d", i);
70 char **wrk_argv = xbt_new(char*, 3);
71 wrk_argv[0] = pr_name;
75 XBT_INFO("create %s", vm_name);
76 msg_vm_t vm = MSG_vm_create_core(pms[i], vm_name);
79 memset(¶ms, 0, sizeof(params));
80 params.ramsize = 1L * 1024 * 1024 * 1024; // 1Gbytes
81 MSG_host_set_params(vm, ¶ms);
84 xbt_dynar_push(vms, &vm);
86 XBT_INFO("put %s on %s", pr_name, vm_name);
87 MSG_process_create_with_arguments(pr_name, worker_fun, NULL, vm, 2, wrk_argv);
92 /* Send a bunch of work to every one */
93 XBT_INFO("Send a task to %d worker process", workers_count);
94 work_batch(workers_count);
96 XBT_INFO("Suspend all VMs");
97 xbt_dynar_foreach(vms, i, vm) {
98 const char *vm_name = MSG_host_get_name(vm);
99 XBT_INFO("suspend %s", vm_name);
103 XBT_INFO("Wait a while");
104 MSG_process_sleep(2);
106 XBT_INFO("Resume all VMs");
107 xbt_dynar_foreach(vms, i, vm) {
112 XBT_INFO("Sleep long enough for everyone to be done with previous batch of work");
113 MSG_process_sleep(1000 - MSG_get_clock());
115 XBT_INFO("Add one more process per VM");
116 xbt_dynar_foreach(vms, i, vm) {
117 unsigned int index = i + xbt_dynar_length(vms);
118 char *vm_name = bprintf("VM%02d", i);
119 char *pr_name = bprintf("WRK%02d", index);
120 char *mbox = bprintf("MBOX:WRK%02d", index);
122 char **wrk_argv = xbt_new(char*, 3);
123 wrk_argv[0] = pr_name;
127 XBT_INFO("put %s on %s", pr_name, vm_name);
128 MSG_process_create_with_arguments(pr_name, worker_fun, NULL, vm, 2, wrk_argv);
132 XBT_INFO("Send a task to %d worker process", workers_count * 2);
133 work_batch(workers_count * 2);
135 XBT_INFO("Migrate all VMs to PM(%s)", MSG_host_get_name(pms[1]));
136 xbt_dynar_foreach(vms, i, vm) {
137 MSG_vm_migrate(vm, pms[1]);
140 /* Migration with default policy is called (i.e. live migration with pre-copy strategy) */
141 /* If you want to use other policy such as post-copy or cold migration, you should add a third parameter that defines the policy */
142 XBT_INFO("Migrate all VMs to PM(%s)", MSG_host_get_name(pms[2]));
143 xbt_dynar_foreach(vms, i, vm) {
144 // MSG_vm_suspend(vm);
145 MSG_vm_migrate(vm, pms[2]);
146 // MSG_vm_resume(vm);
150 XBT_INFO("Shutdown the half of worker processes gracefuly. The remaining half will be forcibly killed");
151 for (i = 0; i < workers_count; i++) {
153 sprintf(mbox, "MBOX:WRK%02d", i);
154 msg_task_t finalize = MSG_task_create("finalize", 0, 0, 0);
155 MSG_task_send(finalize, mbox);
158 XBT_INFO("Wait a while before effective shutdown.");
159 MSG_process_sleep(2);
162 XBT_INFO("Shutdown and destroy all the VMs. The remaining worker processes will be forcibly killed.");
163 xbt_dynar_foreach(vms, i, vm) {
164 XBT_INFO("shutdown %s", MSG_host_get_name(vm));
166 XBT_INFO("destroy %s", MSG_host_get_name(vm));
170 XBT_INFO("Goodbye now!");
172 xbt_dynar_free(&vms);
177 /** Receiver function */
178 int worker_fun(int argc, char *argv[])
180 xbt_assert(argc == 2, "need mbox in arguments");
182 char *mbox = argv[1];
183 const char *pr_name = MSG_process_get_name(MSG_process_self());
184 XBT_INFO("%s is listenning on mailbox(%s)", pr_name, mbox);
187 msg_task_t task = NULL;
189 msg_error_t res = MSG_task_receive(&task, mbox);
191 XBT_CRITICAL("MSG_task_get failed");
195 XBT_INFO("%s received task(%s) from mailbox(%s)",
196 pr_name, MSG_task_get_name(task), mbox);
198 if (!strcmp(MSG_task_get_name(task), "finalize")) {
199 MSG_task_destroy(task);
203 MSG_task_execute(task);
204 XBT_INFO("%s executed task(%s)", pr_name, MSG_task_get_name(task));
205 MSG_task_destroy(task);
213 int main(int argc, char *argv[])
215 const int nb_hosts = 3;
217 MSG_init(&argc, argv);
219 printf("Usage: %s example/msg/msg_platform.xml\n", argv[0]);
223 /* Load the platform file */
224 MSG_create_environment(argv[1]);
226 /* Retrieve hosts from the platform file */
227 xbt_dynar_t hosts_dynar = MSG_hosts_as_dynar();
229 if (xbt_dynar_length(hosts_dynar) <= nb_hosts) {
230 XBT_CRITICAL("need %d hosts", nb_hosts);
234 msg_host_t master_pm = NULL;
235 char **master_argv = xbt_new(char *, 12);
236 master_argv[0] = xbt_strdup("master");
237 master_argv[11] = NULL;
241 xbt_dynar_foreach(hosts_dynar, i, host) {
247 master_argv[i] = xbt_strdup(MSG_host_get_name(host));
254 if (master_pm!=NULL){
255 MSG_process_create_with_arguments("master", master_fun, NULL, master_pm, nb_hosts + 1, master_argv);
258 XBT_INFO("Bye (simulation time %g)", MSG_get_clock());
260 xbt_dynar_free(&hosts_dynar);
262 return !(res == MSG_OK);