Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
slave2worker cont'd
[simgrid.git] / examples / msg / cloud / master_worker_vm.c
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "simgrid/msg.h"
8
9 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test, "Messages specific for this msg example");
10
11 #define MAXMBOXLEN 64
12
13 /** @addtogroup MSG_examples
14  *
15  *  - <b>cloud/master_worker_vm.c: Master/workers
16  *    example on a cloud</b>. The classical example revisited to demonstrate the use of virtual machines.
17  */
18
19 const double task_comp_size = 10000000;
20 const double task_comm_size = 10000000;
21
22 static void send_tasks(int nb_workers)
23 {
24   for (int i = 0; i < nb_workers; i++) {
25     char *tname = bprintf("Task%02d", i);
26     char *mbox  = bprintf("MBOX:WRK%02d", i);
27
28     msg_task_t task = MSG_task_create(tname, task_comp_size, task_comm_size, NULL);
29
30     XBT_INFO("Send task(%s) to mailbox(%s)", tname, mbox);
31     MSG_task_send(task, mbox);
32
33     free(tname);
34     free(mbox);
35   }
36 }
37
38 static int worker_fun(int argc, char *argv[])
39 {
40   const char *pr_name = MSG_process_get_name(MSG_process_self());
41   char mbox[MAXMBOXLEN];
42   snprintf(mbox, MAXMBOXLEN, "MBOX:%s", pr_name);
43
44   XBT_INFO("%s is listening on mailbox(%s)", pr_name, mbox);
45
46   for (;;) {
47     msg_task_t task = NULL;
48
49     msg_error_t res = MSG_task_receive(&task, mbox);
50     if (res != MSG_OK) {
51       XBT_CRITICAL("MSG_task_get failed");
52       DIE_IMPOSSIBLE;
53     }
54
55     XBT_INFO("%s received task(%s) from mailbox(%s)", pr_name, MSG_task_get_name(task), mbox);
56
57     if (!strcmp(MSG_task_get_name(task), "finalize")) {
58       MSG_task_destroy(task);
59       break;
60     }
61
62     MSG_task_execute(task);
63     XBT_INFO("%s executed task(%s)", pr_name, MSG_task_get_name(task));
64     MSG_task_destroy(task);
65   }
66   return 0;
67 }
68
69 static int master_fun(int argc, char *argv[])
70 {
71   msg_vm_t vm;
72   unsigned int i;
73
74   xbt_dynar_t worker_pms = MSG_process_get_data(MSG_process_self());
75   int nb_workers = xbt_dynar_length(worker_pms);
76
77   xbt_dynar_t vms = xbt_dynar_new(sizeof(msg_vm_t), NULL);
78
79   /* Launch VMs and worker processes. One VM per PM, and one worker process per VM. */
80   XBT_INFO("# Launch %d VMs", nb_workers);
81   for (int i = 0; i< nb_workers; i++) {
82     char *vm_name = bprintf("VM%02d", i);
83     char *pr_name = bprintf("WRK%02d", i);
84
85     msg_host_t pm = xbt_dynar_get_as(worker_pms, i, msg_host_t);
86
87     XBT_INFO("create %s on PM(%s)", vm_name, MSG_host_get_name(pm));
88     msg_vm_t vm = MSG_vm_create_core(pm, vm_name);
89
90     s_vm_params_t params;
91     memset(&params, 0, sizeof(params));
92     params.ramsize = 1L * 1024 * 1024 * 1024; // 1Gbytes
93     MSG_host_set_params(vm, &params);
94
95     MSG_vm_start(vm);
96     xbt_dynar_push(vms, &vm);
97
98     XBT_INFO("put a process (%s) on %s", pr_name, vm_name);
99     MSG_process_create(pr_name, worker_fun, NULL, vm);
100
101     xbt_free(vm_name);
102     xbt_free(pr_name);
103   }
104
105   /* Send a bunch of work to every one */
106   XBT_INFO("# Send a task to %d worker process", nb_workers);
107   send_tasks(nb_workers);
108
109   XBT_INFO("# Suspend all VMs");
110   xbt_dynar_foreach(vms, i, vm) {
111     const char *vm_name = MSG_host_get_name(vm);
112     XBT_INFO("suspend %s", vm_name);
113     MSG_vm_suspend(vm);
114   }
115
116   XBT_INFO("# Wait a while");
117   MSG_process_sleep(2);
118
119   XBT_INFO("# Resume all VMs");
120   xbt_dynar_foreach(vms, i, vm) {
121     MSG_vm_resume(vm);
122   }
123
124   XBT_INFO("# Sleep long enough for everyone to be done with previous batch of work");
125   MSG_process_sleep(1000 - MSG_get_clock());
126
127   XBT_INFO("# Add one more process on each VM");
128   xbt_dynar_foreach(vms, i, vm) {
129     unsigned int index = i + xbt_dynar_length(vms);
130     char *vm_name = bprintf("VM%02d", i);
131     char *pr_name = bprintf("WRK%02d", index);
132
133     XBT_INFO("put a process (%s) on %s", pr_name, vm_name);
134     MSG_process_create(pr_name, worker_fun, NULL, vm);
135
136     xbt_free(vm_name);
137     xbt_free(pr_name);
138   }
139
140   XBT_INFO("# Send a task to %d worker process", nb_workers * 2);
141   send_tasks(nb_workers * 2);
142
143   msg_host_t worker_pm0 = xbt_dynar_get_as(worker_pms, 0, msg_host_t);
144   msg_host_t worker_pm1 = xbt_dynar_get_as(worker_pms, 1, msg_host_t);
145
146   XBT_INFO("# Migrate all VMs to PM(%s)", MSG_host_get_name(worker_pm0));
147   xbt_dynar_foreach(vms, i, vm) {
148     MSG_vm_migrate(vm, worker_pm0);
149   }
150
151   XBT_INFO("# Migrate all VMs to PM(%s)", MSG_host_get_name(worker_pm1));
152   xbt_dynar_foreach(vms, i, vm) {
153     MSG_vm_migrate(vm, worker_pm1);
154   }
155
156   XBT_INFO("# Shutdown the half of worker processes gracefuly. The remaining half will be forcibly killed.");
157   for (i = 0; i < nb_workers; i++) {
158     char mbox[MAXMBOXLEN];
159     snprintf(mbox, MAXMBOXLEN, "MBOX:WRK%02d", i);
160     msg_task_t finalize = MSG_task_create("finalize", 0, 0, 0);
161     MSG_task_send(finalize, mbox);
162   }
163
164   XBT_INFO("# Wait a while before effective shutdown.");
165   MSG_process_sleep(2);
166
167   XBT_INFO("# Shutdown and destroy all the VMs. The remaining worker processes will be forcibly killed.");
168   xbt_dynar_foreach(vms, i, vm) {
169     XBT_INFO("shutdown %s", MSG_host_get_name(vm));
170     MSG_vm_shutdown(vm);
171     XBT_INFO("destroy %s", MSG_host_get_name(vm));
172     MSG_vm_destroy(vm);
173   }
174
175   XBT_INFO("# Goodbye now!");
176   xbt_dynar_free(&vms);
177   return 0;
178 }
179
180 /** Receiver function  */
181 int main(int argc, char *argv[])
182 {
183   const int nb_workers = 2;
184
185   MSG_init(&argc, argv);
186   xbt_assert(argc >1,"Usage: %s example/msg/msg_platform.xml\n", argv[0]);
187
188   /* Load the platform file */
189   MSG_create_environment(argv[1]);
190
191   /* Retrieve hosts from the platform file */
192   xbt_dynar_t pms = MSG_hosts_as_dynar();
193
194   /* we need a master node and worker nodes */
195   xbt_assert(xbt_dynar_length(pms) > nb_workers,"need %d hosts", nb_workers + 1);
196
197   /* the first pm is the master, the others are workers */
198   msg_host_t master_pm = xbt_dynar_get_as(pms, 0, msg_host_t);
199
200   xbt_dynar_t worker_pms = xbt_dynar_new(sizeof(msg_host_t), NULL);
201   for (int i = 1; i < nb_workers + 1; i++) {
202     msg_host_t pm = xbt_dynar_get_as(pms, i, msg_host_t);
203     xbt_dynar_push(worker_pms, &pm);
204   }
205
206   /* Start the master process on the master pm. */
207   MSG_process_create("master", master_fun, worker_pms, master_pm);
208
209   msg_error_t res = MSG_main();
210   XBT_INFO("Bye (simulation time %g)", MSG_get_clock());
211
212   xbt_dynar_free(&worker_pms);
213   xbt_dynar_free(&pms);
214
215   return !(res == MSG_OK);
216 }