Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
merge from the java bindings work
[simgrid.git] / examples / msg / cloud / masterslave_virtual_machines.c
1 /* Copyright (c) 2007-2012. The SimGrid Team. All rights reserved.                             */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <stdio.h>
7 #include "msg/msg.h"
8 #include "xbt/sysdep.h"         /* calloc, printf */
9
10 /* Create a log channel to have nice outputs. */
11 #include "xbt/log.h"
12 #include "xbt/asserts.h"
13 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
14                              "Messages specific for this msg example");
15
16 /** @addtogroup MSG_examples
17  * 
18  *  - <b>cloud/masterslave_virtual_machines.c: Master/workers
19  *    example on a cloud</b>. The classical example revisited to demonstrate the use of virtual machines.
20  */
21
22 const double task_comp_size = 10000000;
23 const double task_comm_size = 10000000;
24
25
26 int master_fun(int argc, char *argv[]);
27 int worker_fun(int argc, char *argv[]);
28
29
30 static void work_batch(int workers_count)
31 {
32   int i;
33   for (i = 0; i < workers_count; i++) {
34     char *tname = bprintf("Task%02d", i);
35     char *mbox =  bprintf("MBOX:WRK%02d", i);
36
37     msg_task_t task = MSG_task_create(tname, task_comp_size, task_comm_size, NULL);
38
39     XBT_INFO("send task(%s) to mailbox(%s)", tname, mbox);
40     MSG_task_send(task, mbox);
41
42     free(tname);
43     free(mbox);
44   }
45 }
46
47 int master_fun(int argc, char *argv[])
48 {
49   msg_vm_t vm;
50   unsigned int i;
51   int workers_count = argc - 1;
52
53   msg_host_t *pms = xbt_new(msg_host_t, workers_count);
54   xbt_dynar_t vms = xbt_dynar_new(sizeof(msg_vm_t), NULL);
55
56   /* Retrieve the PMs that will launch worker processes. */
57   for (i = 1; i < argc; i++)
58     pms[i - 1] = MSG_get_host_by_name(argv[i]);
59
60
61   /* Launch VMs and worker processes. One VM per PM, and one worker process per VM. */
62
63   XBT_INFO("Launch %d VMs", workers_count);
64   for (i=0; i< workers_count; i++) {
65     char *vm_name = bprintf("VM%02d", i);
66     char *pr_name = bprintf("WRK%02d", i);
67     char *mbox = bprintf("MBOX:WRK%02d", i);
68
69     char **wrk_argv = xbt_new(char*, 3);
70     wrk_argv[0] = pr_name;
71     wrk_argv[1] = mbox;
72     wrk_argv[2] = NULL;
73
74     XBT_INFO("create %s", vm_name);
75     msg_vm_t vm = MSG_vm_create_core(pms[i], vm_name);
76
77     s_ws_params_t params;
78     memset(&params, 0, sizeof(params));
79     params.ramsize = 1L * 1024 * 1024 * 1024; // 1Gbytes
80     MSG_host_set_params(vm, &params);
81
82     MSG_vm_start(vm);
83     xbt_dynar_push(vms, &vm);
84
85     XBT_INFO("put %s on %s", pr_name, vm_name);
86     MSG_process_create_with_arguments(pr_name, worker_fun, NULL, vm, 2, wrk_argv);
87   }
88
89
90   /* Send a bunch of work to every one */
91   XBT_INFO("Send a task to %d worker process", workers_count);
92   work_batch(workers_count);
93
94   XBT_INFO("Suspend all VMs");
95   xbt_dynar_foreach(vms, i, vm) {
96     const char *vm_name = MSG_host_get_name(vm);
97     XBT_INFO("suspend %s", vm_name);
98     MSG_vm_suspend(vm);
99   }
100
101   XBT_INFO("Wait a while");
102   MSG_process_sleep(2);
103
104   XBT_INFO("Resume all VMs");
105   xbt_dynar_foreach(vms, i, vm) {
106     MSG_vm_resume(vm);
107   }
108
109
110   XBT_INFO("Sleep long enough for everyone to be done with previous batch of work");
111   MSG_process_sleep(1000 - MSG_get_clock());
112
113   XBT_INFO("Add one more process per VM");
114   xbt_dynar_foreach(vms, i, vm) {
115     unsigned int index = i + xbt_dynar_length(vms);
116     char *vm_name = bprintf("VM%02d", i);
117     char *pr_name = bprintf("WRK%02d", index);
118     char *mbox = bprintf("MBOX:WRK%02d", index);
119
120     char **wrk_argv = xbt_new(char*, 3);
121     wrk_argv[0] = pr_name;
122     wrk_argv[1] = mbox;
123     wrk_argv[2] = NULL;
124
125     XBT_INFO("put %s on %s", pr_name, vm_name);
126     MSG_process_create_with_arguments(pr_name, worker_fun, NULL, vm, 2, wrk_argv);
127   }
128
129   XBT_INFO("Send a task to %d worker process", workers_count * 2);
130   work_batch(workers_count * 2);
131
132   XBT_INFO("Migrate all VMs to PM(%s)", MSG_host_get_name(pms[1]));
133   xbt_dynar_foreach(vms, i, vm) {
134     MSG_vm_migrate(vm, pms[1]);
135   }
136
137   /* Migration with default policy is called (i.e. live migration with pre-copy strategy) */
138   /* If you want to use other policy such as post-copy or cold migration, you should add a third parameter that defines the policy */
139   XBT_INFO("Migrate all VMs to PM(%s)", MSG_host_get_name(pms[2]));
140   xbt_dynar_foreach(vms, i, vm) {
141     // MSG_vm_suspend(vm);
142     MSG_vm_migrate(vm, pms[2]);
143     // MSG_vm_resume(vm);
144   }
145
146
147   XBT_INFO("Shutdown the half of worker processes gracefuly. The remaining half will be forcibly killed");
148   for (i = 0; i < workers_count; i++) {
149     char mbox[64];
150     sprintf(mbox, "MBOX:WRK%02d", i);
151     msg_task_t finalize = MSG_task_create("finalize", 0, 0, 0);
152     MSG_task_send(finalize, mbox);
153   }
154
155   XBT_INFO("Wait a while before effective shutdown.");
156   MSG_process_sleep(2);
157
158
159   XBT_INFO("Shutdown and destroy all the VMs. The remaining worker processes will be forcibly killed.");
160   xbt_dynar_foreach(vms, i, vm) {
161     XBT_INFO("shutdown %s", MSG_host_get_name(vm));
162     MSG_vm_shutdown(vm);
163     XBT_INFO("destroy %s", MSG_host_get_name(vm));
164     MSG_vm_destroy(vm);
165   }
166
167   XBT_INFO("Goodbye now!");
168   free(pms);
169   xbt_dynar_free(&vms);
170
171   return 0;
172 }
173
174 /** Receiver function  */
175 int worker_fun(int argc, char *argv[])
176 {
177   xbt_assert(argc == 2, "need mbox in arguments");
178
179   char *mbox = argv[1];
180   const char *pr_name = MSG_process_get_name(MSG_process_self());
181   XBT_INFO("%s is listenning on mailbox(%s)", pr_name, mbox);
182
183   for (;;) {
184     msg_task_t task = NULL;
185
186     msg_error_t res = MSG_task_receive(&task, mbox);
187     if (res != MSG_OK) {
188       XBT_CRITICAL("MSG_task_get failed");
189       DIE_IMPOSSIBLE;
190     }
191
192     XBT_INFO("%s received task(%s) from mailbox(%s)",
193         pr_name, MSG_task_get_name(task), mbox);
194
195     if (!strcmp(MSG_task_get_name(task), "finalize")) {
196       MSG_task_destroy(task);
197       break;
198     }
199
200     MSG_task_execute(task);
201     XBT_INFO("%s executed task(%s)", pr_name, MSG_task_get_name(task));
202     MSG_task_destroy(task);
203   }
204
205   return 0;
206 }
207
208
209 /** Main function */
210 int main(int argc, char *argv[])
211 {
212   const int nb_hosts = 3;
213
214   MSG_init(&argc, argv);
215   if (argc != 2) {
216     printf("Usage: %s example/msg/msg_platform.xml\n", argv[0]);
217     return 1;
218   }
219
220   /* Load the platform file */
221   MSG_create_environment(argv[1]);
222
223   /* Retrieve hosts from the platform file */
224   xbt_dynar_t hosts_dynar = MSG_hosts_as_dynar();
225
226   if (xbt_dynar_length(hosts_dynar) <= nb_hosts) {
227     XBT_CRITICAL("need %d hosts", nb_hosts);
228     return 1;
229   }
230
231   msg_host_t master_pm;
232   char **master_argv = xbt_new(char *, 12);
233   master_argv[0] = xbt_strdup("master");
234   master_argv[11] = NULL;
235
236   unsigned int i;
237   msg_host_t host;
238   xbt_dynar_foreach(hosts_dynar, i, host) {
239     if (i == 0) {
240       master_pm = host;
241       continue;
242     }
243
244     master_argv[i] = xbt_strdup(MSG_host_get_name(host));
245
246     if (i == nb_hosts)
247       break;
248   }
249
250
251   MSG_process_create_with_arguments("master", master_fun, NULL, master_pm, nb_hosts + 1, master_argv);
252
253   msg_error_t res = MSG_main();
254   XBT_INFO("Bye (simulation time %g)", MSG_get_clock());
255
256   xbt_dynar_free(&hosts_dynar);
257
258   return !(res == MSG_OK);
259 }