Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
f9d1ed1eae4aebee8b694baa54ef1af29d462ec8
[simgrid.git] / examples / msg / cloud-masterworker / cloud-masterworker.c
1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "simgrid/msg.h"
7 #include "simgrid/plugins/live_migration.h"
8
9 #include <stdio.h> /* snprintf */
10
11 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test, "Messages specific for this msg example");
12
13 #define MAXMBOXLEN 64
14
15 /** @addtogroup MSG_examples
16  *
17  *  - <b>cloud/master_worker_vm.c: Master/workers
18  *    example on a cloud</b>. The classical example revisited to demonstrate the use of virtual machines.
19  */
20
21 const double task_comp_size = 10000000;
22 const double task_comm_size = 10000000;
23
24 static void send_tasks(int nb_workers)
25 {
26   for (int i = 0; i < nb_workers; i++) {
27     char *tname = bprintf("Task%02d", i);
28     char *mbox  = bprintf("MBOX:WRK%02d", i);
29
30     msg_task_t task = MSG_task_create(tname, task_comp_size, task_comm_size, NULL);
31
32     XBT_INFO("Send task(%s) to mailbox(%s)", tname, mbox);
33     MSG_task_send(task, mbox);
34
35     free(tname);
36     free(mbox);
37   }
38 }
39
40 static int worker_fun(int argc, char *argv[])
41 {
42   const char *pr_name = MSG_process_get_name(MSG_process_self());
43   char mbox[MAXMBOXLEN];
44   snprintf(mbox, MAXMBOXLEN, "MBOX:%s", pr_name);
45
46   XBT_INFO("%s is listening on mailbox(%s)", pr_name, mbox);
47
48   for (;;) {
49     msg_task_t task = NULL;
50
51     msg_error_t res = MSG_task_receive(&task, mbox);
52     xbt_assert(res == MSG_OK, "MSG_task_get failed");
53
54     XBT_INFO("%s received task(%s) from mailbox(%s)", pr_name, MSG_task_get_name(task), mbox);
55
56     if (strcmp(MSG_task_get_name(task), "finalize") == 0) {
57       MSG_task_destroy(task);
58       break;
59     }
60
61     MSG_task_execute(task);
62     XBT_INFO("%s executed task(%s)", pr_name, MSG_task_get_name(task));
63     MSG_task_destroy(task);
64   }
65   return 0;
66 }
67
68 static int master_fun(int argc, char *argv[])
69 {
70   msg_vm_t vm;
71   unsigned int i;
72
73   xbt_dynar_t worker_pms = MSG_process_get_data(MSG_process_self());
74   int nb_workers = xbt_dynar_length(worker_pms);
75
76   xbt_dynar_t vms = xbt_dynar_new(sizeof(msg_vm_t), NULL);
77
78   /* Launch VMs and worker processes. One VM per PM, and one worker process per VM. */
79   XBT_INFO("# Launch %d VMs", nb_workers);
80   for (int i = 0; i< nb_workers; i++) {
81     char *vm_name = bprintf("VM%02d", i);
82     char *pr_name = bprintf("WRK%02d", i);
83
84     msg_host_t pm = xbt_dynar_get_as(worker_pms, i, msg_host_t);
85
86     XBT_INFO("create %s on PM(%s)", vm_name, MSG_host_get_name(pm));
87     msg_vm_t vm = MSG_vm_create_core(pm, vm_name);
88
89     MSG_vm_set_ramsize(vm, 1L * 1024 * 1024 * 1024); // 1GiB
90
91     MSG_vm_start(vm);
92     xbt_dynar_push(vms, &vm);
93
94     XBT_INFO("put a process (%s) on %s", pr_name, vm_name);
95     MSG_process_create(pr_name, worker_fun, NULL, (msg_host_t)vm);
96
97     xbt_free(vm_name);
98     xbt_free(pr_name);
99   }
100
101   /* Send a bunch of work to every one */
102   XBT_INFO("# Send a task to %d worker process", nb_workers);
103   send_tasks(nb_workers);
104
105   XBT_INFO("# Suspend all VMs");
106   xbt_dynar_foreach(vms, i, vm) {
107     XBT_INFO("suspend %s", MSG_vm_get_name(vm));
108     MSG_vm_suspend(vm);
109   }
110
111   XBT_INFO("# Wait a while");
112   MSG_process_sleep(2);
113
114   XBT_INFO("# Resume all VMs");
115   xbt_dynar_foreach(vms, i, vm) {
116     MSG_vm_resume(vm);
117   }
118
119   XBT_INFO("# Sleep long enough for everyone to be done with previous batch of work");
120   MSG_process_sleep(10 - MSG_get_clock());
121
122   XBT_INFO("# Add one more process on each VM");
123   xbt_dynar_foreach(vms, i, vm) {
124     unsigned int index = i + xbt_dynar_length(vms);
125     char* vm_name      = bprintf("VM%02u", i);
126     char* pr_name      = bprintf("WRK%02u", index);
127
128     XBT_INFO("put a process (%s) on %s", pr_name, vm_name);
129     MSG_process_create(pr_name, worker_fun, NULL, (msg_host_t)vm);
130
131     xbt_free(vm_name);
132     xbt_free(pr_name);
133   }
134
135   XBT_INFO("# Send a task to %d worker process", nb_workers * 2);
136   send_tasks(nb_workers * 2);
137
138   msg_host_t worker_pm0 = xbt_dynar_get_as(worker_pms, 0, msg_host_t);
139   msg_host_t worker_pm1 = xbt_dynar_get_as(worker_pms, 1, msg_host_t);
140
141   XBT_INFO("# Migrate all VMs to PM(%s)", MSG_host_get_name(worker_pm0));
142   xbt_dynar_foreach(vms, i, vm) {
143     MSG_vm_migrate(vm, worker_pm0);
144   }
145
146   XBT_INFO("# Migrate all VMs to PM(%s)", MSG_host_get_name(worker_pm1));
147   xbt_dynar_foreach(vms, i, vm) {
148     MSG_vm_migrate(vm, worker_pm1);
149   }
150
151   XBT_INFO("# Shutdown the half of worker processes gracefully. The remaining half will be forcibly killed.");
152   for (i = 0; i < nb_workers; i++) {
153     char mbox[MAXMBOXLEN];
154     snprintf(mbox, MAXMBOXLEN, "MBOX:WRK%02u", i);
155     msg_task_t finalize = MSG_task_create("finalize", 0, 0, 0);
156     MSG_task_send(finalize, mbox);
157   }
158
159   XBT_INFO("# Wait a while before effective shutdown.");
160   MSG_process_sleep(2);
161
162   XBT_INFO("# Shutdown and destroy all the VMs. The remaining worker processes will be forcibly killed.");
163   xbt_dynar_foreach(vms, i, vm) {
164     XBT_INFO("shutdown %s", MSG_vm_get_name(vm));
165     MSG_vm_shutdown(vm);
166     XBT_INFO("destroy %s", MSG_vm_get_name(vm));
167     MSG_vm_destroy(vm);
168   }
169
170   XBT_INFO("# Goodbye now!");
171   xbt_dynar_free(&vms);
172   return 0;
173 }
174
175 /** Receiver function  */
176 int main(int argc, char *argv[])
177 {
178   const int nb_workers = 2;
179
180   MSG_init(&argc, argv);
181   MSG_vm_live_migration_plugin_init();
182
183   xbt_assert(argc >1,"Usage: %s example/platforms/cluster_backbone.xml\n", argv[0]);
184
185   /* Load the platform file */
186   MSG_create_environment(argv[1]);
187
188   /* Retrieve hosts from the platform file */
189   xbt_dynar_t pms = MSG_hosts_as_dynar();
190
191   /* we need a master node and worker nodes */
192   xbt_assert(xbt_dynar_length(pms) > nb_workers,"need %d hosts", nb_workers + 1);
193
194   /* the first pm is the master, the others are workers */
195   msg_host_t master_pm = xbt_dynar_get_as(pms, 0, msg_host_t);
196
197   xbt_dynar_t worker_pms = xbt_dynar_new(sizeof(msg_host_t), NULL);
198   for (int i = 1; i < nb_workers + 1; i++) {
199     msg_host_t pm = xbt_dynar_get_as(pms, i, msg_host_t);
200     xbt_dynar_push(worker_pms, &pm);
201   }
202   xbt_dynar_free(&pms);
203
204   /* Start the master process on the master pm. */
205   MSG_process_create("master", master_fun, worker_pms, master_pm);
206
207   msg_error_t res = MSG_main();
208   XBT_INFO("Bye (simulation time %g)", MSG_get_clock());
209
210   xbt_dynar_free(&worker_pms);
211
212   return !(res == MSG_OK);
213 }