Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
dd77241e6f16f5e2d582f610f53c02c5874e3178
[simgrid.git] / examples / msg / cloud / masterslave_virtual_machines.c
1 /* Copyright (c) 2007-2012. The SimGrid Team. All rights reserved.                             */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <stdio.h>
7 #include "msg/msg.h"
8 #include "xbt/sysdep.h"         /* calloc, printf */
9
10 /* Create a log channel to have nice outputs. */
11 #include "xbt/log.h"
12 #include "xbt/asserts.h"
13 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
14                              "Messages specific for this msg example");
15
16 /** @addtogroup MSG_examples
17  * 
18  *  - <b>cloud/masterslave_virtual_machines.c: Master/workers
19  *    example on a cloud</b>. The classical example revisited to demonstrate the use of virtual machines.
20  */
21
22 double task_comp_size = 10000000;
23 double task_comm_size = 10000000;
24
25
26 int master_fun(int argc, char *argv[]);
27 int worker_fun(int argc, char *argv[]);
28
29 static void work_batch(int workers_count)
30 {
31   int i;
32   for (i = 0; i < workers_count; i++) {
33     char *tname = bprintf("Task%02d", i);
34     char *mbox =  bprintf("MBOX:WRK%02d", i);
35
36     msg_task_t task = MSG_task_create(tname, task_comp_size, task_comm_size, NULL);
37
38     XBT_INFO("send task(%s) to mailbox(%s)", tname, mbox);
39     MSG_task_send(task, mbox);
40
41     free(tname);
42     free(mbox);
43   }
44 }
45
46 int master_fun(int argc, char *argv[])
47 {
48   msg_vm_t vm;
49   unsigned int i;
50   int workers_count = argc - 1;
51
52   msg_host_t *pms = xbt_new(msg_host_t, workers_count);
53   xbt_dynar_t vms = xbt_dynar_new(sizeof(msg_vm_t), NULL);
54
55   /* Retrive the PMs that launch worker processes. */
56   for (i = 1; i < argc; i++)
57     pms[i - 1] = MSG_get_host_by_name(argv[i]);
58
59
60   /* Launch VMs and worker processes. One VM per PM, and one worker process per VM. */
61
62   XBT_INFO("Launch %ld VMs", workers_count);
63   for (i=0; i< workers_count; i++) {
64     char *vm_name = bprintf("VM%02d", i);
65     char *pr_name = bprintf("WRK%02d", i);
66     char *mbox = bprintf("MBOX:WRK%02d", i);
67
68     char **wrk_argv = xbt_new(char*, 3);
69     wrk_argv[0] = pr_name;
70     wrk_argv[1] = mbox;
71     wrk_argv[2] = NULL;
72
73     XBT_INFO("create %s", vm_name);
74     msg_vm_t vm = MSG_vm_create_core(pms[i], vm_name);
75     MSG_vm_start(vm);
76     xbt_dynar_push(vms, &vm);
77
78     XBT_INFO("put %s on %s", pr_name, vm_name);
79     MSG_process_create_with_arguments(pr_name, worker_fun, NULL, vm, 2, wrk_argv);
80   }
81
82
83   /* Send a bunch of work to every one */
84   XBT_INFO("Send a task to %d worker process", workers_count);
85   work_batch(workers_count);
86
87   XBT_INFO("Suspend all VMs");
88   xbt_dynar_foreach(vms, i, vm) {
89     const char *vm_name = MSG_host_get_name(vm);
90     XBT_INFO("suspend %s", vm_name);
91     MSG_vm_suspend(vm);
92   }
93
94   XBT_INFO("Wait a while");
95   MSG_process_sleep(2);
96
97   XBT_INFO("Resume all VMs");
98   xbt_dynar_foreach(vms, i, vm) {
99     MSG_vm_resume(vm);
100   }
101
102
103   XBT_INFO("Sleep long enough for everyone to be done with previous batch of work");
104   MSG_process_sleep(1000 - MSG_get_clock());
105
106   XBT_INFO("Add one more process per VM");
107   xbt_dynar_foreach(vms, i, vm) {
108     unsigned int index = i + xbt_dynar_length(vms);
109     char *vm_name = bprintf("VM%02d", i);
110     char *pr_name = bprintf("WRK%02d", index);
111     char *mbox = bprintf("MBOX:WRK%02d", index);
112
113     char **wrk_argv = xbt_new(char*, 3);
114     wrk_argv[0] = pr_name;
115     wrk_argv[1] = mbox;
116     wrk_argv[2] = NULL;
117
118     XBT_INFO("put %s on %s", pr_name, vm_name);
119     MSG_process_create_with_arguments(pr_name, worker_fun, NULL, vm, 2, wrk_argv);
120   }
121
122   XBT_INFO("Send a task to %d worker process", workers_count * 2);
123   work_batch(workers_count * 2);
124
125   XBT_INFO("Migrate all VMs to PM(%s)", MSG_host_get_name(pms[1]));
126   xbt_dynar_foreach(vms, i, vm) {
127     MSG_vm_migrate(vm, pms[1]);
128   }
129
130   /* FIXME: Do we need to support cold migration? Yes, but how should
131    * parameters of a migration be passed? */
132   XBT_INFO("Migrate all VMs to PM(%s)", MSG_host_get_name(pms[2]));
133   xbt_dynar_foreach(vms, i, vm) {
134     // MSG_vm_suspend(vm);
135     MSG_vm_migrate(vm, pms[2]);
136     // MSG_vm_resume(vm);
137   }
138
139
140   XBT_INFO("Shutdown the first 10 worker processes gracefuly. The   the second half will forcefully get killed");
141   for (i = 0; i < workers_count; i++) {
142     char mbox[64];
143     sprintf(mbox, "MBOX:WRK%02d", i);
144     msg_task_t finalize = MSG_task_create("finalize", 0, 0, 0);
145     MSG_task_send(finalize, mbox);
146   }
147
148   XBT_INFO("Wait a while before effective shutdown.");
149   MSG_process_sleep(2);
150
151
152   XBT_INFO("Shutdown and destroy all the VMs. The remaining worker processes will be forcibly killed.");
153   xbt_dynar_foreach(vms, i, vm) {
154     XBT_INFO("shutdown %s", MSG_host_get_name(vm));
155     MSG_vm_shutdown(vm);
156     XBT_INFO("destroy %s", MSG_host_get_name(vm));
157     MSG_vm_destroy(vm);
158   }
159
160   XBT_INFO("Goodbye now!");
161   free(pms);
162   xbt_dynar_free(&vms);
163
164   return 0;
165 }
166
167 /** Receiver function  */
168 int worker_fun(int argc, char *argv[])
169 {
170   xbt_assert(argc == 2, "need mbox in arguments");
171
172   char *mbox = argv[1];
173   const char *pr_name = MSG_process_get_name(MSG_process_self());
174   XBT_INFO("%s is listenning on mailbox(%s)", pr_name, mbox);
175
176   for (;;) {
177     msg_task_t task = NULL;
178
179     msg_error_t res = MSG_task_receive(&task, mbox);
180     if (res != MSG_OK) {
181       XBT_CRITICAL("MSG_task_get failed");
182       DIE_IMPOSSIBLE;
183     }
184
185     XBT_INFO("%s received task(%s) from mailbox(%s)",
186         pr_name, MSG_task_get_name(task), mbox);
187
188     if (!strcmp(MSG_task_get_name(task), "finalize")) {
189       MSG_task_destroy(task);
190       break;
191     }
192
193     MSG_task_execute(task);
194     XBT_INFO("%s executed task(%s)", pr_name, MSG_task_get_name(task));
195     MSG_task_destroy(task);
196   }
197
198   return 0;
199 }
200
201 /** Main function */
202 int main(int argc, char *argv[])
203 {
204   MSG_init(&argc, argv);
205   if (argc != 2) {
206     printf("Usage: %s example/msg/msg_platform.xml\n", argv[0]);
207     return 1;
208   }
209
210   /* Load the platform file */
211   MSG_create_environment(argv[1]);
212
213   /* Retrieve the 10 first hosts from the platform file */
214   xbt_dynar_t hosts_dynar = MSG_hosts_as_dynar();
215
216   if (xbt_dynar_length(hosts_dynar) <= 10) {
217     XBT_CRITICAL("need 10 hosts");
218     return 1;
219   }
220
221   msg_host_t master_pm;
222   char **master_argv = xbt_new(char *, 12);
223   master_argv[0] = xbt_strdup("master");
224   master_argv[11] = NULL;
225
226   unsigned int i;
227   msg_host_t host;
228   xbt_dynar_foreach(hosts_dynar, i, host) {
229     if (i == 0) {
230       master_pm = host;
231       continue;
232     }
233
234     master_argv[i] = xbt_strdup(MSG_host_get_name(host));
235
236     if (i == 10)
237       break;
238   }
239
240
241   MSG_process_create_with_arguments("master", master_fun, NULL, master_pm, 11, master_argv);
242
243   msg_error_t res = MSG_main();
244   XBT_INFO("Simulation time %g", MSG_get_clock());
245
246   xbt_dynar_free(&hosts_dynar);
247
248   return !(res == MSG_OK);
249 }