Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Free memory in msg/cloud examples.
[simgrid.git] / examples / msg / cloud / masterslave_virtual_machines.c
1 /* Copyright (c) 2007-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdio.h>
8 #include "msg/msg.h"
9 #include "xbt/sysdep.h"         /* calloc, printf */
10
11 /* Create a log channel to have nice outputs. */
12 #include "xbt/log.h"
13 #include "xbt/asserts.h"
14 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
15                              "Messages specific for this msg example");
16
17 /** @addtogroup MSG_examples
18  * 
19  *  - <b>cloud/masterslave_virtual_machines.c: Master/workers
20  *    example on a cloud</b>. The classical example revisited to demonstrate the use of virtual machines.
21  */
22
23 const double task_comp_size = 10000000;
24 const double task_comm_size = 10000000;
25
26
27 int master_fun(int argc, char *argv[]);
28 int worker_fun(int argc, char *argv[]);
29
30
31 static void work_batch(int workers_count)
32 {
33   int i;
34   for (i = 0; i < workers_count; i++) {
35     char *tname = bprintf("Task%02d", i);
36     char *mbox =  bprintf("MBOX:WRK%02d", i);
37
38     msg_task_t task = MSG_task_create(tname, task_comp_size, task_comm_size, NULL);
39
40     XBT_INFO("send task(%s) to mailbox(%s)", tname, mbox);
41     MSG_task_send(task, mbox);
42
43     free(tname);
44     free(mbox);
45   }
46 }
47
48 int master_fun(int argc, char *argv[])
49 {
50   msg_vm_t vm;
51   unsigned int i;
52   int workers_count = argc - 1;
53
54   msg_host_t *pms = xbt_new(msg_host_t, workers_count);
55   xbt_dynar_t vms = xbt_dynar_new(sizeof(msg_vm_t), NULL);
56
57   /* Retrieve the PMs that will launch worker processes. */
58   for (i = 1; i < argc; i++)
59     pms[i - 1] = MSG_get_host_by_name(argv[i]);
60
61
62   /* Launch VMs and worker processes. One VM per PM, and one worker process per VM. */
63
64   XBT_INFO("Launch %d VMs", workers_count);
65   for (i=0; i< workers_count; i++) {
66     char *vm_name = bprintf("VM%02d", i);
67     char *pr_name = bprintf("WRK%02d", i);
68     char *mbox = bprintf("MBOX:WRK%02d", i);
69
70     char **wrk_argv = xbt_new(char*, 3);
71     wrk_argv[0] = pr_name;
72     wrk_argv[1] = mbox;
73     wrk_argv[2] = NULL;
74
75     XBT_INFO("create %s", vm_name);
76     msg_vm_t vm = MSG_vm_create_core(pms[i], vm_name);
77
78     s_ws_params_t params;
79     memset(&params, 0, sizeof(params));
80     params.ramsize = 1L * 1024 * 1024 * 1024; // 1Gbytes
81     MSG_host_set_params(vm, &params);
82
83     MSG_vm_start(vm);
84     xbt_dynar_push(vms, &vm);
85
86     XBT_INFO("put %s on %s", pr_name, vm_name);
87     MSG_process_create_with_arguments(pr_name, worker_fun, NULL, vm, 2, wrk_argv);
88     xbt_free(vm_name);
89   }
90
91
92   /* Send a bunch of work to every one */
93   XBT_INFO("Send a task to %d worker process", workers_count);
94   work_batch(workers_count);
95
96   XBT_INFO("Suspend all VMs");
97   xbt_dynar_foreach(vms, i, vm) {
98     const char *vm_name = MSG_host_get_name(vm);
99     XBT_INFO("suspend %s", vm_name);
100     MSG_vm_suspend(vm);
101   }
102
103   XBT_INFO("Wait a while");
104   MSG_process_sleep(2);
105
106   XBT_INFO("Resume all VMs");
107   xbt_dynar_foreach(vms, i, vm) {
108     MSG_vm_resume(vm);
109   }
110
111
112   XBT_INFO("Sleep long enough for everyone to be done with previous batch of work");
113   MSG_process_sleep(1000 - MSG_get_clock());
114
115   XBT_INFO("Add one more process per VM");
116   xbt_dynar_foreach(vms, i, vm) {
117     unsigned int index = i + xbt_dynar_length(vms);
118     char *vm_name = bprintf("VM%02d", i);
119     char *pr_name = bprintf("WRK%02d", index);
120     char *mbox = bprintf("MBOX:WRK%02d", index);
121
122     char **wrk_argv = xbt_new(char*, 3);
123     wrk_argv[0] = pr_name;
124     wrk_argv[1] = mbox;
125     wrk_argv[2] = NULL;
126
127     XBT_INFO("put %s on %s", pr_name, vm_name);
128     MSG_process_create_with_arguments(pr_name, worker_fun, NULL, vm, 2, wrk_argv);
129     xbt_free(vm_name);
130   }
131
132   XBT_INFO("Send a task to %d worker process", workers_count * 2);
133   work_batch(workers_count * 2);
134
135   XBT_INFO("Migrate all VMs to PM(%s)", MSG_host_get_name(pms[1]));
136   xbt_dynar_foreach(vms, i, vm) {
137     MSG_vm_migrate(vm, pms[1]);
138   }
139
140   /* Migration with default policy is called (i.e. live migration with pre-copy strategy) */
141   /* If you want to use other policy such as post-copy or cold migration, you should add a third parameter that defines the policy */
142   XBT_INFO("Migrate all VMs to PM(%s)", MSG_host_get_name(pms[2]));
143   xbt_dynar_foreach(vms, i, vm) {
144     // MSG_vm_suspend(vm);
145     MSG_vm_migrate(vm, pms[2]);
146     // MSG_vm_resume(vm);
147   }
148
149
150   XBT_INFO("Shutdown the half of worker processes gracefuly. The remaining half will be forcibly killed");
151   for (i = 0; i < workers_count; i++) {
152     char mbox[64];
153     sprintf(mbox, "MBOX:WRK%02d", i);
154     msg_task_t finalize = MSG_task_create("finalize", 0, 0, 0);
155     MSG_task_send(finalize, mbox);
156   }
157
158   XBT_INFO("Wait a while before effective shutdown.");
159   MSG_process_sleep(2);
160
161
162   XBT_INFO("Shutdown and destroy all the VMs. The remaining worker processes will be forcibly killed.");
163   xbt_dynar_foreach(vms, i, vm) {
164     XBT_INFO("shutdown %s", MSG_host_get_name(vm));
165     MSG_vm_shutdown(vm);
166     XBT_INFO("destroy %s", MSG_host_get_name(vm));
167     MSG_vm_destroy(vm);
168   }
169
170   XBT_INFO("Goodbye now!");
171   free(pms);
172   xbt_dynar_free(&vms);
173
174   return 0;
175 }
176
177 /** Receiver function  */
178 int worker_fun(int argc, char *argv[])
179 {
180   xbt_assert(argc == 2, "need mbox in arguments");
181
182   char *mbox = argv[1];
183   const char *pr_name = MSG_process_get_name(MSG_process_self());
184   XBT_INFO("%s is listenning on mailbox(%s)", pr_name, mbox);
185
186   for (;;) {
187     msg_task_t task = NULL;
188
189     msg_error_t res = MSG_task_receive(&task, mbox);
190     if (res != MSG_OK) {
191       XBT_CRITICAL("MSG_task_get failed");
192       DIE_IMPOSSIBLE;
193     }
194
195     XBT_INFO("%s received task(%s) from mailbox(%s)",
196         pr_name, MSG_task_get_name(task), mbox);
197
198     if (!strcmp(MSG_task_get_name(task), "finalize")) {
199       MSG_task_destroy(task);
200       break;
201     }
202
203     MSG_task_execute(task);
204     XBT_INFO("%s executed task(%s)", pr_name, MSG_task_get_name(task));
205     MSG_task_destroy(task);
206   }
207
208   return 0;
209 }
210
211
212 /** Main function */
213 int main(int argc, char *argv[])
214 {
215   const int nb_hosts = 3;
216
217   MSG_init(&argc, argv);
218   if (argc != 2) {
219     printf("Usage: %s example/msg/msg_platform.xml\n", argv[0]);
220     return 1;
221   }
222
223   /* Load the platform file */
224   MSG_create_environment(argv[1]);
225
226   /* Retrieve hosts from the platform file */
227   xbt_dynar_t hosts_dynar = MSG_hosts_as_dynar();
228
229   if (xbt_dynar_length(hosts_dynar) <= nb_hosts) {
230     XBT_CRITICAL("need %d hosts", nb_hosts);
231     return 1;
232   }
233
234   msg_host_t master_pm = NULL;
235   char **master_argv = xbt_new(char *, 12);
236   master_argv[0] = xbt_strdup("master");
237   master_argv[11] = NULL;
238
239   unsigned int i;
240   msg_host_t host;
241   xbt_dynar_foreach(hosts_dynar, i, host) {
242     if (i == 0) {
243       master_pm = host;
244       continue;
245     }
246
247     master_argv[i] = xbt_strdup(MSG_host_get_name(host));
248
249     if (i == nb_hosts)
250       break;
251   }
252
253   msg_error_t res = 1;
254   if (master_pm!=NULL){
255     MSG_process_create_with_arguments("master", master_fun, NULL, master_pm, nb_hosts + 1, master_argv);
256
257     res = MSG_main();
258     XBT_INFO("Bye (simulation time %g)", MSG_get_clock());
259   }
260   xbt_dynar_free(&hosts_dynar);
261
262   return !(res == MSG_OK);
263 }