1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */
9 #include "xbt/sysdep.h" /* calloc, printf */
11 /* Create a log channel to have nice outputs. */
13 #include "xbt/asserts.h"
15 /* we are going to use a generated platform */
16 #include "simgrid/platf_generator.h"
17 #include "simgrid/platf_interface.h"
19 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
20 "Messages specific for this msg example");
22 int master(int argc, char *argv[]);
23 int slave(int argc, char *argv[]);
24 void promoter_1(context_node_t node);
25 void labeler_1(context_edge_t edge);
27 #define FINALIZE ((void*)221297) /* a magic number to tell people to stop working */
29 #define TASK_COUNT_PER_HOST 5 /* Number of tasks each slave has to do */
30 #define TASK_COMP_SIZE 15000000 /* computation cost of each task */
31 #define TASK_COMM_SIZE 1200000 /* communication time of task */
34 * Just promote each node into a host, with fixed power
35 * Set one node as master
36 * Add a state trace on other nodes
38 void promoter_1(context_node_t node) {
40 s_sg_platf_host_cbarg_t host_parameters;
41 static int master_choosen = FALSE;
43 host_parameters.id = NULL;
44 host_parameters.power_peak = 1000000;
46 host_parameters.core_amount = 1;
47 host_parameters.power_scale = 1;
48 host_parameters.power_trace = NULL;
49 host_parameters.initial_state = SURF_RESOURCE_ON;
50 host_parameters.state_trace = NULL;
51 host_parameters.coord = NULL;
52 host_parameters.properties = NULL;
55 master_choosen = TRUE;
56 host_parameters.id = "host_master";
59 * The bug #14699 cannot allow us to set up an event trace which
60 * begin by SURF_RESOURCE_OFF, otherwise, the host will be down at startup
61 * and the associate process will fail to start. So, here, we generate a
62 * first useless event.
64 //Set a availability trace for the node
65 char* generator_id = bprintf("state_host_%ld", node->id);
66 probabilist_event_generator_t date_generator =
67 tmgr_event_generator_new_weibull(generator_id, 80, 1.5);
68 host_parameters.state_trace = tmgr_trace_generator_state(generator_id,
73 char* pw_date_generator_id = bprintf("pw_date_host_%ld", node->id);
74 char* pw_value_generator_id = bprintf("pw_value_host_%ld", node->id);
75 probabilist_event_generator_t pw_date_generator =
76 tmgr_event_generator_new_uniform(pw_date_generator_id, 5, 10);
77 probabilist_event_generator_t pw_value_generator =
78 tmgr_event_generator_new_uniform(pw_value_generator_id, 0.6, 1.0);
79 host_parameters.power_trace =
80 tmgr_trace_generator_value(bprintf("pw_host_%ld", node->id),
85 platf_graph_promote_to_host(node, &host_parameters);
90 * Set all links with the same bandwidth and latency
91 * Add a state trace to each link
93 void labeler_1(context_edge_t edge) {
94 s_sg_platf_link_cbarg_t link_parameters;
95 link_parameters.id = NULL;
96 link_parameters.bandwidth = 100000000;
97 link_parameters.bandwidth_trace = NULL;
98 link_parameters.latency = 0.01;
99 link_parameters.latency_trace = NULL;
100 link_parameters.state = SURF_RESOURCE_ON;
101 link_parameters.state_trace = NULL;
102 link_parameters.policy = SURF_LINK_SHARED;
103 link_parameters.properties = NULL;
105 char* avail_generator_id = bprintf("avail_link_%ld", edge->id);
106 char* unavail_generator_id = bprintf("unavail_link_%ld", edge->id);
108 probabilist_event_generator_t avail_generator =
109 tmgr_event_generator_new_exponential(avail_generator_id, 0.0001);
110 probabilist_event_generator_t unavail_generator =
111 tmgr_event_generator_new_uniform(unavail_generator_id, 10, 20);
113 link_parameters.state_trace =
114 tmgr_trace_generator_avail_unavail(bprintf("state_link_%ld", edge->id),
120 platf_graph_link_label(edge, &link_parameters);
124 /** Emitter function */
125 int master(int argc, char *argv[])
127 int slaves_count = 0;
128 msg_host_t *slaves = NULL;
129 int number_of_tasks = 0;
130 double task_comp_size = 0;
131 double task_comm_size = 0;
133 _XBT_GNUC_UNUSED int read;
135 number_of_tasks = TASK_COUNT_PER_HOST*argc;
136 task_comp_size = TASK_COMP_SIZE;
137 task_comm_size = TASK_COMM_SIZE;
139 { /* Process organisation */
141 slaves = xbt_new0(msg_host_t, slaves_count);
143 for (i = 0; i < argc; i++) {
144 slaves[i] = MSG_get_host_by_name(argv[i]);
145 if (slaves[i] == NULL) {
146 XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
152 XBT_INFO("Got %d slave(s) :", slaves_count);
153 for (i = 0; i < slaves_count; i++)
154 XBT_INFO("%s", MSG_host_get_name(slaves[i]));
156 XBT_INFO("Got %d task to process :", number_of_tasks);
158 for (i = 0; i < number_of_tasks; i++) {
159 msg_task_t task = MSG_task_create("Task", task_comp_size, task_comm_size,
160 xbt_new0(double, 1));
162 *((double *) task->data) = MSG_get_clock();
164 a = MSG_task_send_with_timeout(task,MSG_host_get_name(slaves[i % slaves_count]),10.0);
167 XBT_INFO("Send completed");
168 } else if (a == MSG_HOST_FAILURE) {
170 ("Gloups. The cpu on which I'm running just turned off!. See you!");
172 MSG_task_destroy(task);
175 } else if (a == MSG_TRANSFER_FAILURE) {
177 ("Mmh. Something went wrong with '%s'. Nevermind. Let's keep going!",
178 MSG_host_get_name(slaves[i % slaves_count]));
180 MSG_task_destroy(task);
181 } else if (a == MSG_TIMEOUT) {
183 ("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!",
184 MSG_host_get_name(slaves[i % slaves_count]));
186 MSG_task_destroy(task);
188 XBT_INFO("Hey ?! What's up ? ");
189 xbt_die( "Unexpected behavior");
194 ("All tasks have been dispatched. Let's tell everybody the computation is over.");
195 for (i = 0; i < slaves_count; i++) {
196 msg_task_t task = MSG_task_create("finalize", 0, 0, FINALIZE);
197 int a = MSG_task_send_with_timeout(task,MSG_host_get_name(slaves[i]),1.0);
200 if (a == MSG_HOST_FAILURE) {
202 ("Gloups. The cpu on which I'm running just turned off!. See you!");
203 MSG_task_destroy(task);
206 } else if (a == MSG_TRANSFER_FAILURE) {
207 XBT_INFO("Mmh. Can't reach '%s'! Nevermind. Let's keep going!",
208 MSG_host_get_name(slaves[i]));
209 MSG_task_destroy(task);
210 } else if (a == MSG_TIMEOUT) {
212 ("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!",
213 MSG_host_get_name(slaves[i % slaves_count]));
214 MSG_task_destroy(task);
216 XBT_INFO("Hey ?! What's up ? ");
217 xbt_die("Unexpected behavior with '%s': %d", MSG_host_get_name(slaves[i]), a);
221 XBT_INFO("Goodbye now!");
224 } /* end_of_master */
226 /** Receiver function */
227 int slave(int argc, char *argv[])
230 msg_task_t task = NULL;
234 time1 = MSG_get_clock();
235 a = MSG_task_receive( &(task), MSG_host_get_name(MSG_host_self()) );
236 time2 = MSG_get_clock();
238 XBT_INFO("Received \"%s\"", MSG_task_get_name(task));
239 if (MSG_task_get_data(task) == FINALIZE) {
240 MSG_task_destroy(task);
243 if (time1 < *((double *) task->data))
244 time1 = *((double *) task->data);
245 XBT_INFO("Communication time : \"%f\"", time2 - time1);
246 XBT_INFO("Processing \"%s\"", MSG_task_get_name(task));
247 a = MSG_task_execute(task);
249 XBT_INFO("\"%s\" done", MSG_task_get_name(task));
251 MSG_task_destroy(task);
252 } else if (a == MSG_HOST_FAILURE) {
254 ("Gloups. The cpu on which I'm running just turned off!. See you!");
257 XBT_INFO("Hey ?! What's up ? ");
258 xbt_die("Unexpected behavior");
260 } else if (a == MSG_HOST_FAILURE) {
262 ("Gloups. The cpu on which I'm running just turned off!. See you!");
264 } else if (a == MSG_TRANSFER_FAILURE) {
265 XBT_INFO("Mmh. Something went wrong. Nevermind. Let's keep going!");
266 } else if (a == MSG_TIMEOUT) {
267 XBT_INFO("Mmh. Got a timeout. Nevermind. Let's keep going!");
269 XBT_INFO("Hey ?! What's up ? ");
270 xbt_die("Unexpected behavior");
273 XBT_INFO("I'm done. See you!");
278 int main(int argc, char *argv[])
280 msg_error_t res = MSG_OK;
281 unsigned long seed_platf_gen[] = {134, 233445, 865, 2634, 424242, 876543};
282 unsigned long seed_trace_gen[] = {8865244, 356772, 42, 77465, 2098754, 8725442};
287 MSG_init(&argc, argv);
289 //Set up the seed for the platform generation
290 platf_random_seed(seed_platf_gen);
292 //Set up the RngStream for trace generation
293 sg_platf_rng_stream_init(seed_trace_gen);
295 XBT_INFO("creating nodes...");
296 platf_graph_uniform(10);
300 XBT_INFO("creating links...");
301 platf_graph_clear_links();
302 platf_graph_interconnect_waxman(0.9, 0.4);
303 XBT_INFO("done. Check connectedness...");
304 connected = platf_graph_is_connected();
305 XBT_INFO("Is it connected : %s", connected ? "yes" : (max_tries ? "no, retrying" : "no"));
306 } while(!connected && max_tries);
308 if(!connected && !max_tries) {
309 xbt_die("Impossible to connect the graph, aborting.");
312 XBT_INFO("registering callbacks...");
313 platf_graph_promoter(promoter_1);
314 platf_graph_labeler(labeler_1);
316 XBT_INFO("protmoting...");
319 XBT_INFO("labeling...");
322 XBT_INFO("Putting it in surf...");
325 XBT_INFO("Let's get the available hosts and dispatch work:");
328 msg_host_t host = NULL;
329 msg_host_t host_master = NULL;
330 msg_process_t process = NULL;
331 xbt_dynar_t host_dynar = MSG_hosts_as_dynar();
332 char** hostname_list = malloc(sizeof(char*) * xbt_dynar_length(host_dynar));
334 xbt_dynar_foreach(host_dynar, i, host) {
335 process = MSG_process_create("slave", slave, NULL, host);
336 MSG_process_auto_restart_set(process, TRUE);
337 hostname_list[i] = (char*) MSG_host_get_name(host);
339 host_master = MSG_get_host_by_name("host_master");
340 MSG_process_create_with_arguments("master", master, NULL, host_master, xbt_dynar_length(host_dynar), hostname_list);