1 /* Copyright (c) 2007-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include "simgrid/msg.h"
9 #include "xbt/sysdep.h" /* calloc, printf */
11 /* Create a log channel to have nice outputs. */
13 #include "xbt/asserts.h"
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
15 "Messages specific for this msg example");
19 /** @addtogroup MSG_examples
21 * - <b>cloud/masterslave_virtual_machines.c: Master/workers
22 * example on a cloud</b>. The classical example revisited to demonstrate the use of virtual machines.
25 const double task_comp_size = 10000000;
26 const double task_comm_size = 10000000;
29 int master_fun(int argc, char *argv[]);
30 int worker_fun(int argc, char *argv[]);
33 static void send_tasks(int nb_workers)
36 for (i = 0; i < nb_workers; i++) {
37 char *tname = bprintf("Task%02d", i);
38 char *mbox = bprintf("MBOX:WRK%02d", i);
40 msg_task_t task = MSG_task_create(tname, task_comp_size, task_comm_size, NULL);
42 XBT_INFO("Send task(%s) to mailbox(%s)", tname, mbox);
43 MSG_task_send(task, mbox);
50 int master_fun(int argc, char *argv[])
55 xbt_dynar_t worker_pms = MSG_process_get_data(MSG_process_self());
56 int nb_workers = xbt_dynar_length(worker_pms);
58 xbt_dynar_t vms = xbt_dynar_new(sizeof(msg_vm_t), NULL);
61 /* Launch VMs and worker processes. One VM per PM, and one worker process per VM. */
63 XBT_INFO("# Launch %d VMs", nb_workers);
64 for (i = 0; i< nb_workers; i++) {
65 char *vm_name = bprintf("VM%02d", i);
66 char *pr_name = bprintf("WRK%02d", i);
68 msg_host_t pm = xbt_dynar_get_as(worker_pms, i, msg_host_t);
70 XBT_INFO("create %s on PM(%s)", vm_name, MSG_host_get_name(pm));
71 msg_vm_t vm = MSG_vm_create_core(pm, vm_name);
74 memset(¶ms, 0, sizeof(params));
75 params.ramsize = 1L * 1024 * 1024 * 1024; // 1Gbytes
76 MSG_host_set_params(vm, ¶ms);
79 xbt_dynar_push(vms, &vm);
81 XBT_INFO("put a process (%s) on %s", pr_name, vm_name);
82 MSG_process_create(pr_name, worker_fun, NULL, vm);
89 /* Send a bunch of work to every one */
90 XBT_INFO("# Send a task to %d worker process", nb_workers);
91 send_tasks(nb_workers);
93 XBT_INFO("# Suspend all VMs");
94 xbt_dynar_foreach(vms, i, vm) {
95 const char *vm_name = MSG_host_get_name(vm);
96 XBT_INFO("suspend %s", vm_name);
100 XBT_INFO("# Wait a while");
101 MSG_process_sleep(2);
103 XBT_INFO("# Resume all VMs");
104 xbt_dynar_foreach(vms, i, vm) {
109 XBT_INFO("# Sleep long enough for everyone to be done with previous batch of work");
110 MSG_process_sleep(1000 - MSG_get_clock());
112 XBT_INFO("# Add one more process on each VM");
113 xbt_dynar_foreach(vms, i, vm) {
114 unsigned int index = i + xbt_dynar_length(vms);
115 char *vm_name = bprintf("VM%02d", i);
116 char *pr_name = bprintf("WRK%02d", index);
118 XBT_INFO("put a process (%s) on %s", pr_name, vm_name);
119 MSG_process_create(pr_name, worker_fun, NULL, vm);
125 XBT_INFO("# Send a task to %d worker process", nb_workers * 2);
126 send_tasks(nb_workers * 2);
128 msg_host_t worker_pm0 = xbt_dynar_get_as(worker_pms, 0, msg_host_t);
129 msg_host_t worker_pm1 = xbt_dynar_get_as(worker_pms, 1, msg_host_t);
131 XBT_INFO("# Migrate all VMs to PM(%s)", MSG_host_get_name(worker_pm0));
132 xbt_dynar_foreach(vms, i, vm) {
133 MSG_vm_migrate(vm, worker_pm0);
136 XBT_INFO("# Migrate all VMs to PM(%s)", MSG_host_get_name(worker_pm1));
137 xbt_dynar_foreach(vms, i, vm) {
138 MSG_vm_migrate(vm, worker_pm1);
142 XBT_INFO("# Shutdown the half of worker processes gracefuly. The remaining half will be forcibly killed.");
143 for (i = 0; i < nb_workers; i++) {
144 char mbox[MAXMBOXLEN];
145 snprintf(mbox, MAXMBOXLEN, "MBOX:WRK%02d", i);
146 msg_task_t finalize = MSG_task_create("finalize", 0, 0, 0);
147 MSG_task_send(finalize, mbox);
150 XBT_INFO("# Wait a while before effective shutdown.");
151 MSG_process_sleep(2);
154 XBT_INFO("# Shutdown and destroy all the VMs. The remaining worker processes will be forcibly killed.");
155 xbt_dynar_foreach(vms, i, vm) {
156 XBT_INFO("shutdown %s", MSG_host_get_name(vm));
158 XBT_INFO("destroy %s", MSG_host_get_name(vm));
162 XBT_INFO("# Goodbye now!");
163 xbt_dynar_free(&vms);
168 /** Receiver function */
169 int worker_fun(int argc, char *argv[])
171 const char *pr_name = MSG_process_get_name(MSG_process_self());
172 char mbox[MAXMBOXLEN];
173 snprintf(mbox, MAXMBOXLEN, "MBOX:%s", pr_name);
175 XBT_INFO("%s is listenning on mailbox(%s)", pr_name, mbox);
178 msg_task_t task = NULL;
180 msg_error_t res = MSG_task_receive(&task, mbox);
182 XBT_CRITICAL("MSG_task_get failed");
186 XBT_INFO("%s received task(%s) from mailbox(%s)",
187 pr_name, MSG_task_get_name(task), mbox);
189 if (!strcmp(MSG_task_get_name(task), "finalize")) {
190 MSG_task_destroy(task);
194 MSG_task_execute(task);
195 XBT_INFO("%s executed task(%s)", pr_name, MSG_task_get_name(task));
196 MSG_task_destroy(task);
205 int main(int argc, char *argv[])
207 const int nb_workers = 2;
209 MSG_init(&argc, argv);
211 printf("Usage: %s example/msg/msg_platform.xml\n", argv[0]);
215 /* Load the platform file */
216 MSG_create_environment(argv[1]);
218 /* Retrieve hosts from the platform file */
219 xbt_dynar_t pms = MSG_hosts_as_dynar();
221 /* we need a master node and worker nodes */
222 if (xbt_dynar_length(pms) < nb_workers + 1) {
223 XBT_CRITICAL("need %d hosts", nb_workers + 1);
227 /* the first pm is the master, the others are workers */
228 msg_host_t master_pm = xbt_dynar_get_as(pms, 0, msg_host_t);
230 xbt_dynar_t worker_pms = xbt_dynar_new(sizeof(msg_host_t), NULL);
232 for (i = 1; i < nb_workers + 1; i++) {
233 msg_host_t pm = xbt_dynar_get_as(pms, i, msg_host_t);
234 xbt_dynar_push(worker_pms, &pm);
238 /* Start the master process on the master pm. */
239 MSG_process_create("master", master_fun, worker_pms, master_pm);
241 msg_error_t res = MSG_main();
242 XBT_INFO("Bye (simulation time %g)", MSG_get_clock());
244 xbt_dynar_free(&worker_pms);
245 xbt_dynar_free(&pms);
247 return !(res == MSG_OK);