Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
kill dead code
[simgrid.git] / examples / msg / cloud / master_worker_vm.c
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdio.h>
8 #include "simgrid/msg.h"
9 #include "xbt/sysdep.h"         /* calloc, printf */
10
11 /* Create a log channel to have nice outputs. */
12 #include "xbt/log.h"
13 #include "xbt/asserts.h"
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
15                              "Messages specific for this msg example");
16
17 #define MAXMBOXLEN 64
18
19 /** @addtogroup MSG_examples
20  *
21  *  - <b>cloud/masterslave_virtual_machines.c: Master/workers
22  *    example on a cloud</b>. The classical example revisited to demonstrate the use of virtual machines.
23  */
24
25 const double task_comp_size = 10000000;
26 const double task_comm_size = 10000000;
27
28
29 int master_fun(int argc, char *argv[]);
30 int worker_fun(int argc, char *argv[]);
31
32
33 static void send_tasks(int nb_workers)
34 {
35   int i;
36   for (i = 0; i < nb_workers; i++) {
37     char *tname = bprintf("Task%02d", i);
38     char *mbox  = bprintf("MBOX:WRK%02d", i);
39
40     msg_task_t task = MSG_task_create(tname, task_comp_size, task_comm_size, NULL);
41
42     XBT_INFO("Send task(%s) to mailbox(%s)", tname, mbox);
43     MSG_task_send(task, mbox);
44
45     free(tname);
46     free(mbox);
47   }
48 }
49
50 int master_fun(int argc, char *argv[])
51 {
52   msg_vm_t vm;
53   unsigned int i;
54
55   xbt_dynar_t worker_pms = MSG_process_get_data(MSG_process_self());
56   int nb_workers = xbt_dynar_length(worker_pms);
57
58   xbt_dynar_t vms = xbt_dynar_new(sizeof(msg_vm_t), NULL);
59
60
61   /* Launch VMs and worker processes. One VM per PM, and one worker process per VM. */
62
63   XBT_INFO("# Launch %d VMs", nb_workers);
64   for (i = 0; i< nb_workers; i++) {
65     char *vm_name = bprintf("VM%02d", i);
66     char *pr_name = bprintf("WRK%02d", i);
67
68     msg_host_t pm = xbt_dynar_get_as(worker_pms, i, msg_host_t);
69
70     XBT_INFO("create %s on PM(%s)", vm_name, MSG_host_get_name(pm));
71     msg_vm_t vm = MSG_vm_create_core(pm, vm_name);
72
73     s_vm_params_t params;
74     memset(&params, 0, sizeof(params));
75     params.ramsize = 1L * 1024 * 1024 * 1024; // 1Gbytes
76     MSG_host_set_params(vm, &params);
77
78     MSG_vm_start(vm);
79     xbt_dynar_push(vms, &vm);
80
81     XBT_INFO("put a process (%s) on %s", pr_name, vm_name);
82     MSG_process_create(pr_name, worker_fun, NULL, vm);
83
84     xbt_free(vm_name);
85     xbt_free(pr_name);
86   }
87
88
89   /* Send a bunch of work to every one */
90   XBT_INFO("# Send a task to %d worker process", nb_workers);
91   send_tasks(nb_workers);
92
93   XBT_INFO("# Suspend all VMs");
94   xbt_dynar_foreach(vms, i, vm) {
95     const char *vm_name = MSG_host_get_name(vm);
96     XBT_INFO("suspend %s", vm_name);
97     MSG_vm_suspend(vm);
98   }
99
100   XBT_INFO("# Wait a while");
101   MSG_process_sleep(2);
102
103   XBT_INFO("# Resume all VMs");
104   xbt_dynar_foreach(vms, i, vm) {
105     MSG_vm_resume(vm);
106   }
107
108
109   XBT_INFO("# Sleep long enough for everyone to be done with previous batch of work");
110   MSG_process_sleep(1000 - MSG_get_clock());
111
112   XBT_INFO("# Add one more process on each VM");
113   xbt_dynar_foreach(vms, i, vm) {
114     unsigned int index = i + xbt_dynar_length(vms);
115     char *vm_name = bprintf("VM%02d", i);
116     char *pr_name = bprintf("WRK%02d", index);
117
118     XBT_INFO("put a process (%s) on %s", pr_name, vm_name);
119     MSG_process_create(pr_name, worker_fun, NULL, vm);
120
121     xbt_free(vm_name);
122     xbt_free(pr_name);
123   }
124
125   XBT_INFO("# Send a task to %d worker process", nb_workers * 2);
126   send_tasks(nb_workers * 2);
127
128   msg_host_t worker_pm0 = xbt_dynar_get_as(worker_pms, 0, msg_host_t);
129   msg_host_t worker_pm1 = xbt_dynar_get_as(worker_pms, 1, msg_host_t);
130
131   XBT_INFO("# Migrate all VMs to PM(%s)", MSG_host_get_name(worker_pm0));
132   xbt_dynar_foreach(vms, i, vm) {
133     MSG_vm_migrate(vm, worker_pm0);
134   }
135
136   XBT_INFO("# Migrate all VMs to PM(%s)", MSG_host_get_name(worker_pm1));
137   xbt_dynar_foreach(vms, i, vm) {
138     MSG_vm_migrate(vm, worker_pm1);
139   }
140
141
142   XBT_INFO("# Shutdown the half of worker processes gracefuly. The remaining half will be forcibly killed.");
143   for (i = 0; i < nb_workers; i++) {
144     char mbox[MAXMBOXLEN];
145     snprintf(mbox, MAXMBOXLEN, "MBOX:WRK%02d", i);
146     msg_task_t finalize = MSG_task_create("finalize", 0, 0, 0);
147     MSG_task_send(finalize, mbox);
148   }
149
150   XBT_INFO("# Wait a while before effective shutdown.");
151   MSG_process_sleep(2);
152
153
154   XBT_INFO("# Shutdown and destroy all the VMs. The remaining worker processes will be forcibly killed.");
155   xbt_dynar_foreach(vms, i, vm) {
156     XBT_INFO("shutdown %s", MSG_host_get_name(vm));
157     MSG_vm_shutdown(vm);
158     XBT_INFO("destroy %s", MSG_host_get_name(vm));
159     MSG_vm_destroy(vm);
160   }
161
162   XBT_INFO("# Goodbye now!");
163   xbt_dynar_free(&vms);
164
165   return 0;
166 }
167
168 /** Receiver function  */
169 int worker_fun(int argc, char *argv[])
170 {
171   const char *pr_name = MSG_process_get_name(MSG_process_self());
172   char mbox[MAXMBOXLEN];
173   snprintf(mbox, MAXMBOXLEN, "MBOX:%s", pr_name);
174
175   XBT_INFO("%s is listenning on mailbox(%s)", pr_name, mbox);
176
177   for (;;) {
178     msg_task_t task = NULL;
179
180     msg_error_t res = MSG_task_receive(&task, mbox);
181     if (res != MSG_OK) {
182       XBT_CRITICAL("MSG_task_get failed");
183       DIE_IMPOSSIBLE;
184     }
185
186     XBT_INFO("%s received task(%s) from mailbox(%s)",
187         pr_name, MSG_task_get_name(task), mbox);
188
189     if (!strcmp(MSG_task_get_name(task), "finalize")) {
190       MSG_task_destroy(task);
191       break;
192     }
193
194     MSG_task_execute(task);
195     XBT_INFO("%s executed task(%s)", pr_name, MSG_task_get_name(task));
196     MSG_task_destroy(task);
197   }
198
199   return 0;
200 }
201
202
203
204
205 int main(int argc, char *argv[])
206 {
207   const int nb_workers = 2;
208
209   MSG_init(&argc, argv);
210   if (argc != 2) {
211     printf("Usage: %s example/msg/msg_platform.xml\n", argv[0]);
212     return 1;
213   }
214
215   /* Load the platform file */
216   MSG_create_environment(argv[1]);
217
218   /* Retrieve hosts from the platform file */
219   xbt_dynar_t pms = MSG_hosts_as_dynar();
220
221   /* we need a master node and worker nodes */
222   if (xbt_dynar_length(pms) < nb_workers + 1) {
223     XBT_CRITICAL("need %d hosts", nb_workers + 1);
224     return 1;
225   }
226
227   /* the first pm is the master, the others are workers */
228   msg_host_t master_pm = xbt_dynar_get_as(pms, 0, msg_host_t);
229
230   xbt_dynar_t worker_pms = xbt_dynar_new(sizeof(msg_host_t), NULL);
231   int i;
232   for (i = 1; i < nb_workers + 1; i++) {
233     msg_host_t pm = xbt_dynar_get_as(pms, i, msg_host_t);
234     xbt_dynar_push(worker_pms, &pm);
235   }
236
237
238   /* Start the master process on the master pm. */
239   MSG_process_create("master", master_fun, worker_pms, master_pm);
240
241   msg_error_t res = MSG_main();
242   XBT_INFO("Bye (simulation time %g)", MSG_get_clock());
243
244   xbt_dynar_free(&worker_pms);
245   xbt_dynar_free(&pms);
246
247   return !(res == MSG_OK);
248 }