Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
273af4c24768a9e8f6a94ef36bb25e2b21f9ee6d
[simgrid.git] / examples / msg / cloud / master_worker_vm.c
1 /* Copyright (c) 2007-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdio.h>
8 #include "msg/msg.h"
9 #include "xbt/sysdep.h"         /* calloc, printf */
10
11 /* Create a log channel to have nice outputs. */
12 #include "xbt/log.h"
13 #include "xbt/asserts.h"
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
15                              "Messages specific for this msg example");
16
17 /** @addtogroup MSG_examples
18  *
19  *  - <b>cloud/masterslave_virtual_machines.c: Master/workers
20  *    example on a cloud</b>. The classical example revisited to demonstrate the use of virtual machines.
21  */
22
23 const double task_comp_size = 10000000;
24 const double task_comm_size = 10000000;
25
26
27 int master_fun(int argc, char *argv[]);
28 int worker_fun(int argc, char *argv[]);
29
30
31 static void send_tasks(int nb_workers)
32 {
33   int i;
34   for (i = 0; i < nb_workers; i++) {
35     char *tname = bprintf("Task%02d", i);
36     char *mbox  = bprintf("MBOX:WRK%02d", i);
37
38     msg_task_t task = MSG_task_create(tname, task_comp_size, task_comm_size, NULL);
39
40     XBT_INFO("Send task(%s) to mailbox(%s)", tname, mbox);
41     MSG_task_send(task, mbox);
42
43     free(tname);
44     free(mbox);
45   }
46 }
47
48 int master_fun(int argc, char *argv[])
49 {
50   msg_vm_t vm;
51   unsigned int i;
52
53   xbt_dynar_t worker_pms = MSG_process_get_data(MSG_process_self());
54   int nb_workers = xbt_dynar_length(worker_pms);
55
56   xbt_dynar_t vms = xbt_dynar_new(sizeof(msg_vm_t), NULL);
57
58
59   /* Launch VMs and worker processes. One VM per PM, and one worker process per VM. */
60
61   XBT_INFO("# Launch %d VMs", nb_workers);
62   for (i = 0; i< nb_workers; i++) {
63     char *vm_name = bprintf("VM%02d", i);
64     char *pr_name = bprintf("WRK%02d", i);
65
66     msg_host_t pm = xbt_dynar_get_as(worker_pms, i, msg_host_t);
67
68     XBT_INFO("create %s on PM(%s)", vm_name, MSG_host_get_name(pm));
69     msg_vm_t vm = MSG_vm_create_core(pm, vm_name);
70
71     s_ws_params_t params;
72     memset(&params, 0, sizeof(params));
73     params.ramsize = 1L * 1024 * 1024 * 1024; // 1Gbytes
74     MSG_host_set_params(vm, &params);
75
76     MSG_vm_start(vm);
77     xbt_dynar_push(vms, &vm);
78
79     XBT_INFO("put a process (%s) on %s", pr_name, vm_name);
80     MSG_process_create(pr_name, worker_fun, NULL, vm);
81
82     xbt_free(vm_name);
83     xbt_free(pr_name);
84   }
85
86
87   /* Send a bunch of work to every one */
88   XBT_INFO("# Send a task to %d worker process", nb_workers);
89   send_tasks(nb_workers);
90
91   XBT_INFO("# Suspend all VMs");
92   xbt_dynar_foreach(vms, i, vm) {
93     const char *vm_name = MSG_host_get_name(vm);
94     XBT_INFO("suspend %s", vm_name);
95     MSG_vm_suspend(vm);
96   }
97
98   XBT_INFO("# Wait a while");
99   MSG_process_sleep(2);
100
101   XBT_INFO("# Resume all VMs");
102   xbt_dynar_foreach(vms, i, vm) {
103     MSG_vm_resume(vm);
104   }
105
106
107   XBT_INFO("# Sleep long enough for everyone to be done with previous batch of work");
108   MSG_process_sleep(1000 - MSG_get_clock());
109
110   XBT_INFO("# Add one more process on each VM");
111   xbt_dynar_foreach(vms, i, vm) {
112     unsigned int index = i + xbt_dynar_length(vms);
113     char *vm_name = bprintf("VM%02d", i);
114     char *pr_name = bprintf("WRK%02d", index);
115
116     XBT_INFO("put a process (%s) on %s", pr_name, vm_name);
117     MSG_process_create(pr_name, worker_fun, NULL, vm);
118
119     xbt_free(vm_name);
120     xbt_free(pr_name);
121   }
122
123   XBT_INFO("# Send a task to %d worker process", nb_workers * 2);
124   send_tasks(nb_workers * 2);
125
126   msg_host_t worker_pm0 = xbt_dynar_get_as(worker_pms, 0, msg_host_t);
127   msg_host_t worker_pm1 = xbt_dynar_get_as(worker_pms, 1, msg_host_t);
128
129   XBT_INFO("# Migrate all VMs to PM(%s)", MSG_host_get_name(worker_pm0));
130   xbt_dynar_foreach(vms, i, vm) {
131     MSG_vm_migrate(vm, worker_pm0);
132   }
133
134   XBT_INFO("# Migrate all VMs to PM(%s)", MSG_host_get_name(worker_pm1));
135   xbt_dynar_foreach(vms, i, vm) {
136     MSG_vm_migrate(vm, worker_pm1);
137   }
138
139
140   XBT_INFO("# Shutdown the half of worker processes gracefuly. The remaining half will be forcibly killed.");
141   for (i = 0; i < nb_workers; i++) {
142     char mbox[64];
143     sprintf(mbox, "MBOX:WRK%02d", i);
144     msg_task_t finalize = MSG_task_create("finalize", 0, 0, 0);
145     MSG_task_send(finalize, mbox);
146   }
147
148   XBT_INFO("# Wait a while before effective shutdown.");
149   MSG_process_sleep(2);
150
151
152   XBT_INFO("# Shutdown and destroy all the VMs. The remaining worker processes will be forcibly killed.");
153   xbt_dynar_foreach(vms, i, vm) {
154     XBT_INFO("shutdown %s", MSG_host_get_name(vm));
155     MSG_vm_shutdown(vm);
156     XBT_INFO("destroy %s", MSG_host_get_name(vm));
157     MSG_vm_destroy(vm);
158   }
159
160   XBT_INFO("# Goodbye now!");
161   xbt_dynar_free(&vms);
162
163   return 0;
164 }
165
166 /** Receiver function  */
167 int worker_fun(int argc, char *argv[])
168 {
169   const char *pr_name = MSG_process_get_name(MSG_process_self());
170   char *mbox = bprintf("MBOX:%s", pr_name);
171
172   XBT_INFO("%s is listenning on mailbox(%s)", pr_name, mbox);
173
174   for (;;) {
175     msg_task_t task = NULL;
176
177     msg_error_t res = MSG_task_receive(&task, mbox);
178     if (res != MSG_OK) {
179       XBT_CRITICAL("MSG_task_get failed");
180       DIE_IMPOSSIBLE;
181     }
182
183     XBT_INFO("%s received task(%s) from mailbox(%s)",
184         pr_name, MSG_task_get_name(task), mbox);
185
186     if (!strcmp(MSG_task_get_name(task), "finalize")) {
187       MSG_task_destroy(task);
188       break;
189     }
190
191     MSG_task_execute(task);
192     XBT_INFO("%s executed task(%s)", pr_name, MSG_task_get_name(task));
193     MSG_task_destroy(task);
194   }
195
196   free(mbox);
197
198   return 0;
199 }
200
201
202
203
204 int main(int argc, char *argv[])
205 {
206   const int nb_workers = 2;
207
208   MSG_init(&argc, argv);
209   if (argc != 2) {
210     printf("Usage: %s example/msg/msg_platform.xml\n", argv[0]);
211     return 1;
212   }
213
214   /* Load the platform file */
215   MSG_create_environment(argv[1]);
216
217   /* Retrieve hosts from the platform file */
218   xbt_dynar_t pms = MSG_hosts_as_dynar();
219
220   /* we need a master node and worker nodes */
221   if (xbt_dynar_length(pms) < nb_workers + 1) {
222     XBT_CRITICAL("need %d hosts", nb_workers + 1);
223     return 1;
224   }
225
226   /* the first pm is the master, the others are workers */
227   msg_host_t master_pm = xbt_dynar_get_as(pms, 0, msg_host_t);
228
229   xbt_dynar_t worker_pms = xbt_dynar_new(sizeof(msg_host_t), NULL);
230   int i;
231   for (i = 1; i < nb_workers + 1; i++) {
232     msg_host_t pm = xbt_dynar_get_as(pms, i, msg_host_t);
233     xbt_dynar_push(worker_pms, &pm);
234   }
235
236
237   /* Start the master process on the master pm. */
238   MSG_process_create("master", master_fun, worker_pms, master_pm);
239
240   msg_error_t res = MSG_main();
241   XBT_INFO("Bye (simulation time %g)", MSG_get_clock());
242
243   xbt_dynar_free(&worker_pms);
244   xbt_dynar_free(&pms);
245
246   return !(res == MSG_OK);
247 }