Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
0fb1776ac54f52f753f4466c26a4fdb93e99b07f
[simgrid.git] / examples / c / cloud-masterworker / cloud-masterworker.c
1 /* Copyright (c) 2007-2020. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "simgrid/actor.h"
7 #include "simgrid/engine.h"
8 #include "simgrid/exec.h"
9 #include "simgrid/host.h"
10 #include "simgrid/mailbox.h"
11 #include "simgrid/plugins/live_migration.h"
12 #include "simgrid/vm.h"
13
14 #include "xbt/asserts.h"
15 #include "xbt/log.h"
16 #include "xbt/str.h"
17
18 XBT_LOG_NEW_DEFAULT_CATEGORY(cloud_masterworker, "Messages specific for this example");
19
20 #define MAXMBOXLEN 64
21 #define FINALIZE 221297 /* a magic number to tell people to stop working */
22
23 const double comp_size = 10000000;
24 const double comm_size = 10000000;
25
26 static void send_tasks(int nb_workers)
27 {
28   for (int i = 0; i < nb_workers; i++) {
29     char mbox_name[MAXMBOXLEN];
30     snprintf(mbox_name, MAXMBOXLEN, "MBOX:WRK%02d", i);
31     double* payload   = (double*)malloc(sizeof(double));
32     *payload          = comp_size;
33     sg_mailbox_t mbox = sg_mailbox_by_name(mbox_name);
34
35     XBT_INFO("Send to mailbox(%s)", mbox_name);
36     sg_mailbox_put(mbox, payload, comm_size);
37   }
38 }
39
40 static void worker_fun(XBT_ATTRIB_UNUSED int argc, XBT_ATTRIB_UNUSED char* argv[])
41 {
42   const char* pr_name = sg_actor_self_get_name();
43   char mbox_name[MAXMBOXLEN];
44   snprintf(mbox_name, MAXMBOXLEN, "MBOX:%s", pr_name);
45   sg_mailbox_t mbox = sg_mailbox_by_name(mbox_name);
46   double* payload   = NULL;
47
48   XBT_INFO("%s is listening on mailbox(%s)", pr_name, mbox_name);
49
50   for (;;) {
51     payload = (double*)sg_mailbox_get(mbox);
52
53     XBT_INFO("%s received from mailbox(%s)", pr_name, mbox_name);
54
55     if (*payload == FINALIZE) {
56       free(payload);
57       break;
58     }
59
60     sg_actor_execute(*payload);
61     XBT_INFO("%s executed", pr_name);
62     free(payload);
63   }
64 }
65
66 static void master_fun(XBT_ATTRIB_UNUSED int argc, XBT_ATTRIB_UNUSED char* argv[])
67 {
68   unsigned int i;
69
70   sg_host_t* worker_pms = sg_actor_self_data();
71
72   sg_vm_t* vms = (sg_vm_t*)malloc(2 * sizeof(sg_vm_t));
73
74   /* Launch VMs and worker actors. One VM per PM, and one worker actor per VM. */
75   XBT_INFO("# Launch 2 VMs");
76   for (int i = 0; i < 2; i++) {
77     char* vm_name = bprintf("VM%02d", i);
78     char* pr_name = bprintf("WRK%02d", i);
79
80     sg_host_t pm = worker_pms[i];
81
82     XBT_INFO("create %s on PM(%s)", vm_name, sg_host_get_name(pm));
83     sg_vm_t vm = sg_vm_create_core(pm, vm_name);
84
85     sg_vm_set_ramsize(vm, 1L * 1024 * 1024 * 1024); // 1GiB
86
87     sg_vm_start(vm);
88     vms[i] = vm;
89
90     XBT_INFO("put an actor (%s) on %s", pr_name, vm_name);
91     sg_actor_create(pr_name, (sg_host_t)vm, worker_fun, 0, NULL);
92
93     xbt_free(vm_name);
94     xbt_free(pr_name);
95   }
96
97   /* Send a bunch of work to every one */
98   XBT_INFO("# Send to 2 worker actors");
99   send_tasks(2);
100
101   XBT_INFO("# Suspend all VMs");
102   for (int i = 0; i < 2; i++) {
103     XBT_INFO("suspend %s", sg_vm_get_name(vms[i]));
104     sg_vm_suspend(vms[i]);
105   }
106
107   XBT_INFO("# Wait a while");
108   sg_actor_sleep_for(2);
109
110   XBT_INFO("# Resume all VMs");
111   for (int i = 0; i < 2; i++) {
112     sg_vm_resume(vms[i]);
113   }
114
115   XBT_INFO("# Sleep long enough for everyone to be done with previous batch of work");
116   sg_actor_sleep_for(10 - simgrid_get_clock());
117
118   XBT_INFO("# Add one more actor on each VM");
119   for (unsigned int i = 0; i < 2; i++) {
120     char* vm_name = bprintf("VM%02u", i);
121     char* pr_name = bprintf("WRK%02u", i + 2);
122
123     XBT_INFO("put an actor (%s) on %s", pr_name, vm_name);
124     sg_actor_create(pr_name, (sg_host_t)vms[i], worker_fun, 0, NULL);
125
126     free(vm_name);
127     free(pr_name);
128   }
129
130   XBT_INFO("# Send to 4 worker actors");
131   send_tasks(4);
132
133   sg_host_t worker_pm0 = worker_pms[0];
134   sg_host_t worker_pm1 = worker_pms[1];
135
136   XBT_INFO("# Migrate all VMs to PM(%s)", sg_host_get_name(worker_pm0));
137   for (int i = 0; i < 2; i++) {
138     sg_vm_migrate(vms[i], worker_pm0);
139   }
140
141   XBT_INFO("# Migrate all VMs to PM(%s)", sg_host_get_name(worker_pm1));
142   for (int i = 0; i < 2; i++) {
143     sg_vm_migrate(vms[i], worker_pm1);
144   }
145
146   XBT_INFO("# Shutdown the half of worker actors gracefully. The remaining half will be forcibly killed.");
147   for (i = 0; i < 2; i++) {
148     char mbox_name[MAXMBOXLEN];
149     snprintf(mbox_name, MAXMBOXLEN, "MBOX:WRK%02u", i);
150     sg_mailbox_t mbox = sg_mailbox_by_name(mbox_name);
151     double* payload   = (double*)malloc(sizeof(double));
152     *payload          = FINALIZE;
153     sg_mailbox_put(mbox, payload, 0);
154   }
155
156   XBT_INFO("# Wait a while before effective shutdown.");
157   sg_actor_sleep_for(2);
158
159   XBT_INFO("# Shutdown and destroy all the VMs. The remaining worker actors will be forcibly killed.");
160   for (int i = 0; i < 2; i++) {
161     XBT_INFO("shutdown %s", sg_vm_get_name(vms[i]));
162     sg_vm_shutdown(vms[i]);
163     XBT_INFO("destroy %s", sg_vm_get_name(vms[i]));
164     sg_vm_destroy(vms[i]);
165   }
166
167   XBT_INFO("# Goodbye now!");
168   free(vms);
169 }
170
171 int main(int argc, char* argv[])
172 {
173   simgrid_init(&argc, argv);
174   sg_vm_live_migration_plugin_init();
175
176   xbt_assert(argc > 1, "Usage: %s example/platforms/cluster_backbone.xml\n", argv[0]);
177
178   /* Load the platform file */
179   simgrid_load_platform(argv[1]);
180
181   /* Retrieve hosts from the platform file */
182   sg_host_t* pms = sg_host_list();
183
184   /* we need a master node and worker nodes */
185   xbt_assert(sg_host_count() > 2, "need at least 3 hosts");
186
187   /* the first pm is the master, the others are workers */
188   sg_host_t master_pm = pms[0];
189
190   sg_host_t* worker_pms = (sg_host_t*)malloc(2 * sizeof(sg_host_t));
191   for (int i = 0; i < 2 + 1; i++)
192     worker_pms[i] = pms[i + 1];
193
194   free(pms);
195
196   sg_actor_t actor = sg_actor_init("master", master_pm);
197   sg_actor_data_set(actor, worker_pms);
198   sg_actor_start(actor, master_fun, 0, NULL);
199
200   simgrid_run();
201   XBT_INFO("Bye (simulation time %g)", simgrid_get_clock());
202
203   free(worker_pms);
204
205   return 0;
206 }