Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add few comments - Adrien
[simgrid.git] / examples / msg / cloud / masterslave_virtual_machines.c
1 /* Copyright (c) 2007-2012. The SimGrid Team. All rights reserved.                             */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <stdio.h>
7 #include "msg/msg.h"
8 #include "xbt/sysdep.h"         /* calloc, printf */
9
10 /* Create a log channel to have nice outputs. */
11 #include "xbt/log.h"
12 #include "xbt/asserts.h"
13 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
14                              "Messages specific for this msg example");
15
16 /** @addtogroup MSG_examples
17  * 
18  *  - <b>cloud/masterslave_virtual_machines.c: Master/workers
19  *    example on a cloud</b>. The classical example revisited to demonstrate the use of virtual machines.
20  */
21
22 double task_comp_size = 10000000;
23 double task_comm_size = 10000000;
24
25
26 int master_fun(int argc, char *argv[]);
27 int worker_fun(int argc, char *argv[]);
28
29 int nb_hosts=3;
30 //int nb_vms=nb_host*2; // 2 VMs per PM
31  
32 static void work_batch(int workers_count)
33 {
34   int i;
35   for (i = 0; i < workers_count; i++) {
36     char *tname = bprintf("Task%02d", i);
37     char *mbox =  bprintf("MBOX:WRK%02d", i);
38
39     msg_task_t task = MSG_task_create(tname, task_comp_size, task_comm_size, NULL);
40
41     XBT_INFO("send task(%s) to mailbox(%s)", tname, mbox);
42     MSG_task_send(task, mbox);
43
44     free(tname);
45     free(mbox);
46   }
47 }
48
49 int master_fun(int argc, char *argv[])
50 {
51   msg_vm_t vm;
52   unsigned int i;
53   int workers_count = argc - 1;
54
55   msg_host_t *pms = xbt_new(msg_host_t, workers_count);
56   xbt_dynar_t vms = xbt_dynar_new(sizeof(msg_vm_t), NULL);
57
58   /* Retrive the PMs that launch worker processes. */
59   for (i = 1; i < argc; i++)
60     pms[i - 1] = MSG_get_host_by_name(argv[i]);
61
62
63   /* Launch VMs and worker processes. One VM per PM, and one worker process per VM. */
64
65   XBT_INFO("Launch %ld VMs", workers_count);
66   for (i=0; i< workers_count; i++) {
67     char *vm_name = bprintf("VM%02d", i);
68     char *pr_name = bprintf("WRK%02d", i);
69     char *mbox = bprintf("MBOX:WRK%02d", i);
70
71     char **wrk_argv = xbt_new(char*, 3);
72     wrk_argv[0] = pr_name;
73     wrk_argv[1] = mbox;
74     wrk_argv[2] = NULL;
75
76     XBT_INFO("create %s", vm_name);
77     msg_vm_t vm = MSG_vm_create_core(pms[i], vm_name);
78     MSG_vm_start(vm);
79     xbt_dynar_push(vms, &vm);
80
81     XBT_INFO("put %s on %s", pr_name, vm_name);
82     MSG_process_create_with_arguments(pr_name, worker_fun, NULL, vm, 2, wrk_argv);
83   }
84
85
86   /* Send a bunch of work to every one */
87   XBT_INFO("Send a task to %d worker process", workers_count);
88   work_batch(workers_count);
89
90   XBT_INFO("Suspend all VMs");
91   xbt_dynar_foreach(vms, i, vm) {
92     const char *vm_name = MSG_host_get_name(vm);
93     XBT_INFO("suspend %s", vm_name);
94     MSG_vm_suspend(vm);
95   }
96
97   XBT_INFO("Wait a while");
98   MSG_process_sleep(2);
99
100   XBT_INFO("Resume all VMs");
101   xbt_dynar_foreach(vms, i, vm) {
102     MSG_vm_resume(vm);
103   }
104
105
106   XBT_INFO("Sleep long enough for everyone to be done with previous batch of work");
107   MSG_process_sleep(1000 - MSG_get_clock());
108
109   XBT_INFO("Add one more process per VM");
110   xbt_dynar_foreach(vms, i, vm) {
111     unsigned int index = i + xbt_dynar_length(vms);
112     char *vm_name = bprintf("VM%02d", i);
113     char *pr_name = bprintf("WRK%02d", index);
114     char *mbox = bprintf("MBOX:WRK%02d", index);
115
116     char **wrk_argv = xbt_new(char*, 3);
117     wrk_argv[0] = pr_name;
118     wrk_argv[1] = mbox;
119     wrk_argv[2] = NULL;
120
121     XBT_INFO("put %s on %s", pr_name, vm_name);
122     MSG_process_create_with_arguments(pr_name, worker_fun, NULL, vm, 2, wrk_argv);
123   }
124
125   XBT_INFO("Send a task to %d worker process", workers_count * 2);
126   work_batch(workers_count * 2);
127
128   XBT_INFO("Migrate all VMs to PM(%s)", MSG_host_get_name(pms[1]));
129   xbt_dynar_foreach(vms, i, vm) {
130     MSG_vm_migrate(vm, pms[1]);
131   }
132
133   /* Migration with default policy is called (i.e. live migration with pre-copy strategy */
134   /* If you want to use other policy such as post-copy or cold migration, you should add a third parameter that defines the policy */
135   XBT_INFO("Migrate all VMs to PM(%s)", MSG_host_get_name(pms[2]));
136   xbt_dynar_foreach(vms, i, vm) {
137     // MSG_vm_suspend(vm);
138     MSG_vm_migrate(vm, pms[2]);
139     // MSG_vm_resume(vm);
140   }
141
142
143   XBT_INFO("Shutdown the first worker processes gracefuly. The   the second half will forcefully get killed");
144   for (i = 0; i < workers_count; i++) {
145     char mbox[64];
146     sprintf(mbox, "MBOX:WRK%02d", i);
147     msg_task_t finalize = MSG_task_create("finalize", 0, 0, 0);
148     MSG_task_send(finalize, mbox);
149   }
150
151   XBT_INFO("Wait a while before effective shutdown.");
152   MSG_process_sleep(2);
153
154
155   XBT_INFO("Shutdown and destroy all the VMs. The remaining worker processes will be forcibly killed.");
156   xbt_dynar_foreach(vms, i, vm) {
157     XBT_INFO("shutdown %s", MSG_host_get_name(vm));
158     MSG_vm_shutdown(vm);
159     XBT_INFO("destroy %s", MSG_host_get_name(vm));
160     MSG_vm_destroy(vm);
161   }
162
163   XBT_INFO("Goodbye now!");
164   free(pms);
165   xbt_dynar_free(&vms);
166
167   return 0;
168 }
169
170 /** Receiver function  */
171 int worker_fun(int argc, char *argv[])
172 {
173   xbt_assert(argc == 2, "need mbox in arguments");
174
175   char *mbox = argv[1];
176   const char *pr_name = MSG_process_get_name(MSG_process_self());
177   XBT_INFO("%s is listenning on mailbox(%s)", pr_name, mbox);
178
179   for (;;) {
180     msg_task_t task = NULL;
181
182     msg_error_t res = MSG_task_receive(&task, mbox);
183     if (res != MSG_OK) {
184       XBT_CRITICAL("MSG_task_get failed");
185       DIE_IMPOSSIBLE;
186     }
187
188     XBT_INFO("%s received task(%s) from mailbox(%s)",
189         pr_name, MSG_task_get_name(task), mbox);
190
191     if (!strcmp(MSG_task_get_name(task), "finalize")) {
192       MSG_task_destroy(task);
193       break;
194     }
195
196     MSG_task_execute(task);
197     XBT_INFO("%s executed task(%s)", pr_name, MSG_task_get_name(task));
198     MSG_task_destroy(task);
199   }
200
201   return 0;
202 }
203
204 /** Main function */
205 int main(int argc, char *argv[])
206 {
207   MSG_init(&argc, argv);
208   if (argc != 2) {
209     printf("Usage: %s example/msg/msg_platform.xml\n", argv[0]);
210     return 1;
211   }
212  /* Load the platform file */
213   MSG_create_environment(argv[1]);
214
215   /* Retrieve the  first hosts from the platform file */
216   xbt_dynar_t hosts_dynar = MSG_hosts_as_dynar();
217
218   if (xbt_dynar_length(hosts_dynar) <= nb_hosts) {
219     XBT_CRITICAL("need %d hosts", nb_hosts);
220     return 1;
221   }
222
223   msg_host_t master_pm;
224   char **master_argv = xbt_new(char *, 12);
225   master_argv[0] = xbt_strdup("master");
226   master_argv[11] = NULL;
227
228   unsigned int i;
229   msg_host_t host;
230   xbt_dynar_foreach(hosts_dynar, i, host) {
231     if (i == 0) {
232       master_pm = host;
233       continue;
234     }
235
236     master_argv[i] = xbt_strdup(MSG_host_get_name(host));
237
238     if (i == nb_hosts)
239       break;
240   }
241
242
243   MSG_process_create_with_arguments("master", master_fun, NULL, master_pm, nb_hosts+1, master_argv);
244
245   msg_error_t res = MSG_main();
246   XBT_INFO("Simulation time %g", MSG_get_clock());
247
248   xbt_dynar_free(&hosts_dynar);
249
250   return !(res == MSG_OK);
251 }