Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
28f8cc9e6282254c15ca956a0a18704f222fc7ab
[simgrid.git] / examples / c / cloud-masterworker / cloud-masterworker.c
1 /* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "simgrid/actor.h"
7 #include "simgrid/engine.h"
8 #include "simgrid/exec.h"
9 #include "simgrid/host.h"
10 #include "simgrid/mailbox.h"
11 #include "simgrid/plugins/live_migration.h"
12 #include "simgrid/vm.h"
13
14 #include "xbt/asserts.h"
15 #include "xbt/log.h"
16 #include "xbt/str.h"
17
18 XBT_LOG_NEW_DEFAULT_CATEGORY(cloud_masterworker, "Messages specific for this example");
19
20 #define MAXMBOXLEN 64
21 #define FINALIZE 221297 /* a magic number to tell people to stop working */
22
23 const double comp_size = 10000000;
24 const long comm_size   = 10000000;
25
26 static void send_tasks(int nb_workers)
27 {
28   for (int i = 0; i < nb_workers; i++) {
29     char mbox_name[MAXMBOXLEN];
30     snprintf(mbox_name, MAXMBOXLEN, "MBOX:WRK%02d", i);
31     double* payload   = xbt_malloc(sizeof(double));
32     *payload          = comp_size;
33     sg_mailbox_t mbox = sg_mailbox_by_name(mbox_name);
34
35     XBT_INFO("Send to mailbox(%s)", mbox_name);
36     sg_mailbox_put(mbox, payload, comm_size);
37   }
38 }
39
40 static void worker_fun(int argc, char* argv[])
41 {
42   const char* pr_name = sg_actor_self_get_name();
43   char mbox_name[MAXMBOXLEN];
44   snprintf(mbox_name, MAXMBOXLEN, "MBOX:%s", pr_name);
45   sg_mailbox_t mbox = sg_mailbox_by_name(mbox_name);
46   double* payload   = NULL;
47
48   XBT_INFO("%s is listening on mailbox(%s)", pr_name, mbox_name);
49
50   for (;;) {
51     payload = (double*)sg_mailbox_get(mbox);
52
53     XBT_INFO("%s received from mailbox(%s)", pr_name, mbox_name);
54
55     if (*payload == FINALIZE) {
56       free(payload);
57       break;
58     }
59
60     sg_actor_execute(*payload);
61     XBT_INFO("%s executed", pr_name);
62     free(payload);
63   }
64 }
65
66 static void master_fun(int argc, char* argv[])
67 {
68   sg_host_t* worker_pms = sg_actor_self_get_data();
69
70   sg_vm_t* vms = xbt_malloc(2 * sizeof(sg_vm_t));
71
72   /* Launch VMs and worker actors. One VM per PM, and one worker actor per VM. */
73   XBT_INFO("# Launch 2 VMs");
74   for (int i = 0; i < 2; i++) {
75     char* vm_name = bprintf("VM%02d", i);
76     char* pr_name = bprintf("WRK%02d", i);
77
78     sg_host_t pm = worker_pms[i];
79
80     XBT_INFO("create %s on PM(%s)", vm_name, sg_host_get_name(pm));
81     sg_vm_t vm = sg_vm_create_core(pm, vm_name);
82
83     sg_vm_set_ramsize(vm, 1L * 1024 * 1024 * 1024); // 1GiB
84
85     sg_vm_start(vm);
86     vms[i] = vm;
87
88     XBT_INFO("put an actor (%s) on %s", pr_name, vm_name);
89     sg_actor_create(pr_name, (sg_host_t)vm, worker_fun, 0, NULL);
90
91     xbt_free(vm_name);
92     xbt_free(pr_name);
93   }
94
95   /* Send a bunch of work to every one */
96   XBT_INFO("# Send to 2 worker actors");
97   send_tasks(2);
98
99   XBT_INFO("# Suspend all VMs");
100   for (int i = 0; i < 2; i++) {
101     XBT_INFO("suspend %s", sg_vm_get_name(vms[i]));
102     sg_vm_suspend(vms[i]);
103   }
104
105   XBT_INFO("# Wait a while");
106   sg_actor_sleep_for(2);
107
108   XBT_INFO("# Resume all VMs");
109   for (int i = 0; i < 2; i++) {
110     sg_vm_resume(vms[i]);
111   }
112
113   XBT_INFO("# Sleep long enough for everyone to be done with previous batch of work");
114   sg_actor_sleep_for(10 - simgrid_get_clock());
115
116   XBT_INFO("# Add one more actor on each VM");
117   for (int i = 0; i < 2; i++) {
118     char* vm_name = bprintf("VM%02d", i);
119     char* pr_name = bprintf("WRK%02d", i + 2);
120
121     XBT_INFO("put an actor (%s) on %s", pr_name, vm_name);
122     sg_actor_create(pr_name, (sg_host_t)vms[i], worker_fun, 0, NULL);
123
124     free(vm_name);
125     free(pr_name);
126   }
127
128   XBT_INFO("# Send to 4 worker actors");
129   send_tasks(4);
130
131   sg_host_t worker_pm0 = worker_pms[0];
132   sg_host_t worker_pm1 = worker_pms[1];
133
134   XBT_INFO("# Migrate all VMs to PM(%s)", sg_host_get_name(worker_pm0));
135   for (int i = 0; i < 2; i++) {
136     sg_vm_migrate(vms[i], worker_pm0);
137   }
138
139   XBT_INFO("# Migrate all VMs to PM(%s)", sg_host_get_name(worker_pm1));
140   for (int i = 0; i < 2; i++) {
141     sg_vm_migrate(vms[i], worker_pm1);
142   }
143
144   XBT_INFO("# Shutdown the half of worker actors gracefully. The remaining half will be forcibly killed.");
145   for (int i = 0; i < 2; i++) {
146     char mbox_name[MAXMBOXLEN];
147     snprintf(mbox_name, MAXMBOXLEN, "MBOX:WRK%02d", i);
148     sg_mailbox_t mbox = sg_mailbox_by_name(mbox_name);
149     double* payload   = xbt_malloc(sizeof(double));
150     *payload          = FINALIZE;
151     sg_mailbox_put(mbox, payload, 0);
152   }
153
154   XBT_INFO("# Wait a while before effective shutdown.");
155   sg_actor_sleep_for(2);
156
157   XBT_INFO("# Shutdown and destroy all the VMs. The remaining worker actors will be forcibly killed.");
158   for (int i = 0; i < 2; i++) {
159     XBT_INFO("shutdown %s", sg_vm_get_name(vms[i]));
160     sg_vm_shutdown(vms[i]);
161     XBT_INFO("destroy %s", sg_vm_get_name(vms[i]));
162     sg_vm_destroy(vms[i]);
163   }
164
165   XBT_INFO("# Goodbye now!");
166   free(vms);
167 }
168
169 int main(int argc, char* argv[])
170 {
171   simgrid_init(&argc, argv);
172   sg_vm_live_migration_plugin_init();
173
174   xbt_assert(argc > 1, "Usage: %s example/platforms/cluster_backbone.xml\n", argv[0]);
175
176   /* Load the platform file */
177   simgrid_load_platform(argv[1]);
178
179   /* Retrieve hosts from the platform file */
180   sg_host_t* pms = sg_host_list();
181
182   /* we need a master node and worker nodes */
183   xbt_assert(sg_host_count() > 2, "need at least 3 hosts");
184
185   /* the first pm is the master, the others are workers */
186   sg_host_t master_pm = pms[0];
187
188   sg_host_t* worker_pms = xbt_malloc(2 * sizeof(sg_host_t));
189   for (int i = 0; i < 2; i++)
190     worker_pms[i] = pms[i + 1];
191
192   free(pms);
193
194   sg_actor_t actor = sg_actor_init("master", master_pm);
195   sg_actor_set_data(actor, worker_pms);
196   sg_actor_start(actor, master_fun, 0, NULL);
197
198   simgrid_run();
199   XBT_INFO("Bye (simulation time %g)", simgrid_get_clock());
200
201   free(worker_pms);
202
203   return 0;
204 }