Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
bba483a7d93a9e6cff419c185e8f85e1c712599a
[simgrid.git] / examples / msg / masterslave / masterslave_failure_platfgen.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdio.h>
8 #include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
9 #include "xbt/sysdep.h"         /* calloc, printf */
10
11 /* Create a log channel to have nice outputs. */
12 #include "xbt/log.h"
13 #include "xbt/asserts.h"
14
15 /* we are going to use a generated platform */
16 #include "simgrid/platf_generator.h"
17 #include "simgrid/platf_interface.h"
18
19 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
20                              "Messages specific for this msg example");
21
22 int master(int argc, char *argv[]);
23 int slave(int argc, char *argv[]);
24 void promoter_1(context_node_t node);
25 void labeler_1(context_edge_t edge);
26
27 #define FINALIZE ((void*)221297)        /* a magic number to tell people to stop working */
28
29 #define TASK_COUNT_PER_HOST 5           /* Number of tasks each slave has to do */
30 #define TASK_COMP_SIZE 15000000         /* computation cost of each task */
31 #define TASK_COMM_SIZE 1200000          /* communication time of task */
32
33 /** Promoter function
34  * Just promote each node into a host, with fixed power
35  * Set one node as master
36  * Add a state trace on other nodes
37  */
38 void promoter_1(context_node_t node) {
39
40   s_sg_platf_host_cbarg_t host_parameters;
41   static int master_choosen = FALSE;
42
43   host_parameters.id = NULL;
44   host_parameters.power_peak = 1000000;
45
46   host_parameters.core_amount = 1;
47   host_parameters.power_scale = 1;
48   host_parameters.power_trace = NULL;
49   host_parameters.initial_state = SURF_RESOURCE_ON;
50   host_parameters.state_trace = NULL;
51   host_parameters.coord = NULL;
52   host_parameters.properties = NULL;
53
54   if(!master_choosen) {
55     master_choosen = TRUE;
56     host_parameters.id = "host_master";
57   } else {
58     /*
59      * The bug #14699 cannot allow us to set up an event trace which
60      * begin by SURF_RESOURCE_OFF, otherwise, the host will be down at startup
61      * and the associate process will fail to start. So, here, we generate a
62      * first useless event.
63      */
64     //Set a availability trace for the node
65     char* generator_id = bprintf("state_host_%ld", node->id);
66     probabilist_event_generator_t date_generator =
67                             tmgr_event_generator_new_weibull(generator_id, 80, 1.5);
68     host_parameters.state_trace = tmgr_trace_generator_state(generator_id,
69                                                             date_generator,
70                                                             SURF_RESOURCE_ON);
71
72     //Set a power trace
73     char* pw_date_generator_id = bprintf("pw_date_host_%ld", node->id);
74     char* pw_value_generator_id = bprintf("pw_value_host_%ld", node->id);
75     probabilist_event_generator_t pw_date_generator =
76                           tmgr_event_generator_new_uniform(pw_date_generator_id, 5, 10);
77     probabilist_event_generator_t pw_value_generator =
78                           tmgr_event_generator_new_uniform(pw_value_generator_id, 0.6, 1.0);
79     host_parameters.power_trace =
80               tmgr_trace_generator_value(bprintf("pw_host_%ld", node->id),
81                                                 pw_date_generator,
82                                                 pw_value_generator);
83   }
84
85   platf_graph_promote_to_host(node, &host_parameters);
86
87 }
88
89 /** Labeler function
90  * Set all links with the same bandwidth and latency
91  * Add a state trace to each link
92  */
93 void labeler_1(context_edge_t edge) {
94   s_sg_platf_link_cbarg_t link_parameters;
95   link_parameters.id = NULL;
96   link_parameters.bandwidth = 100000000;
97   link_parameters.bandwidth_trace = NULL;
98   link_parameters.latency = 0.01;
99   link_parameters.latency_trace = NULL;
100   link_parameters.state = SURF_RESOURCE_ON;
101   link_parameters.state_trace = NULL;
102   link_parameters.policy = SURF_LINK_SHARED;
103   link_parameters.properties = NULL;
104
105   char* avail_generator_id = bprintf("avail_link_%ld", edge->id);
106   char* unavail_generator_id = bprintf("unavail_link_%ld", edge->id);
107
108   probabilist_event_generator_t avail_generator =
109                           tmgr_event_generator_new_exponential(avail_generator_id, 0.0001);
110   probabilist_event_generator_t unavail_generator =
111                           tmgr_event_generator_new_uniform(unavail_generator_id, 10, 20);
112
113   link_parameters.state_trace =
114               tmgr_trace_generator_avail_unavail(bprintf("state_link_%ld", edge->id),
115                                                 avail_generator,
116                                                 unavail_generator,
117                                                 SURF_RESOURCE_ON);
118
119
120   platf_graph_link_label(edge, &link_parameters);
121
122 }
123
124 /** Emitter function  */
125 int master(int argc, char *argv[])
126 {
127   int slaves_count = 0;
128   msg_host_t *slaves = NULL;
129   int number_of_tasks = 0;
130   double task_comp_size = 0;
131   double task_comm_size = 0;
132   int i;
133
134   number_of_tasks = TASK_COUNT_PER_HOST*argc;
135   task_comp_size = TASK_COMP_SIZE;
136   task_comm_size = TASK_COMM_SIZE;
137
138   {                             /* Process organisation */
139     slaves_count = argc;
140     slaves = xbt_new0(msg_host_t, slaves_count);
141
142     for (i = 0; i < argc; i++) {
143       slaves[i] = MSG_get_host_by_name(argv[i]);
144       if (slaves[i] == NULL) {
145         XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
146         abort();
147       }
148     }
149   }
150
151   XBT_INFO("Got %d slave(s) :", slaves_count);
152   for (i = 0; i < slaves_count; i++)
153     XBT_INFO("%s", MSG_host_get_name(slaves[i]));
154
155   XBT_INFO("Got %d task to process :", number_of_tasks);
156
157   for (i = 0; i < number_of_tasks; i++) {
158     msg_task_t task = MSG_task_create("Task", task_comp_size, task_comm_size,
159                                     xbt_new0(double, 1));
160     int a;
161     *((double *) task->data) = MSG_get_clock();
162
163     a = MSG_task_send_with_timeout(task,MSG_host_get_name(slaves[i % slaves_count]),10.0);
164
165     if (a == MSG_OK) {
166       XBT_INFO("Send completed");
167     } else if (a == MSG_HOST_FAILURE) {
168       XBT_INFO
169           ("Gloups. The cpu on which I'm running just turned off!. See you!");
170       free(task->data);
171       MSG_task_destroy(task);
172       free(slaves);
173       return 0;
174     } else if (a == MSG_TRANSFER_FAILURE) {
175       XBT_INFO
176           ("Mmh. Something went wrong with '%s'. Nevermind. Let's keep going!",
177               MSG_host_get_name(slaves[i % slaves_count]));
178       free(task->data);
179       MSG_task_destroy(task);
180     } else if (a == MSG_TIMEOUT) {
181       XBT_INFO
182           ("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!",
183               MSG_host_get_name(slaves[i % slaves_count]));
184       free(task->data);
185       MSG_task_destroy(task);
186     } else {
187       XBT_INFO("Hey ?! What's up ? ");
188       xbt_die( "Unexpected behavior");
189     }
190   }
191
192   XBT_INFO
193       ("All tasks have been dispatched. Let's tell everybody the computation is over.");
194   for (i = 0; i < slaves_count; i++) {
195     msg_task_t task = MSG_task_create("finalize", 0, 0, FINALIZE);
196     int a = MSG_task_send_with_timeout(task,MSG_host_get_name(slaves[i]),1.0);
197     if (a == MSG_OK)
198       continue;
199     if (a == MSG_HOST_FAILURE) {
200       XBT_INFO
201           ("Gloups. The cpu on which I'm running just turned off!. See you!");
202       MSG_task_destroy(task);
203       free(slaves);
204       return 0;
205     } else if (a == MSG_TRANSFER_FAILURE) {
206       XBT_INFO("Mmh. Can't reach '%s'! Nevermind. Let's keep going!",
207           MSG_host_get_name(slaves[i]));
208       MSG_task_destroy(task);
209     } else if (a == MSG_TIMEOUT) {
210       XBT_INFO
211           ("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!",
212               MSG_host_get_name(slaves[i % slaves_count]));
213       MSG_task_destroy(task);
214     } else {
215       XBT_INFO("Hey ?! What's up ? ");
216       xbt_die("Unexpected behavior with '%s': %d", MSG_host_get_name(slaves[i]), a);
217     }
218   }
219
220   XBT_INFO("Goodbye now!");
221   free(slaves);
222   return 0;
223 }                               /* end_of_master */
224
225 /** Receiver function  */
226 int slave(int argc, char *argv[])
227 {
228   while (1) {
229     msg_task_t task = NULL;
230     int a;
231     double time1, time2;
232
233     time1 = MSG_get_clock();
234     a = MSG_task_receive( &(task), MSG_host_get_name(MSG_host_self()) );
235     time2 = MSG_get_clock();
236     if (a == MSG_OK) {
237       XBT_INFO("Received \"%s\"", MSG_task_get_name(task));
238       if (MSG_task_get_data(task) == FINALIZE) {
239         MSG_task_destroy(task);
240         break;
241       }
242       if (time1 < *((double *) task->data))
243         time1 = *((double *) task->data);
244       XBT_INFO("Communication time : \"%f\"", time2 - time1);
245       XBT_INFO("Processing \"%s\"", MSG_task_get_name(task));
246       a = MSG_task_execute(task);
247       if (a == MSG_OK) {
248         XBT_INFO("\"%s\" done", MSG_task_get_name(task));
249         free(task->data);
250         MSG_task_destroy(task);
251       } else if (a == MSG_HOST_FAILURE) {
252         XBT_INFO
253             ("Gloups. The cpu on which I'm running just turned off!. See you!");
254         return 0;
255       } else {
256         XBT_INFO("Hey ?! What's up ? ");
257         xbt_die("Unexpected behavior");
258       }
259     } else if (a == MSG_HOST_FAILURE) {
260       XBT_INFO
261           ("Gloups. The cpu on which I'm running just turned off!. See you!");
262       return 0;
263     } else if (a == MSG_TRANSFER_FAILURE) {
264       XBT_INFO("Mmh. Something went wrong. Nevermind. Let's keep going!");
265     } else if (a == MSG_TIMEOUT) {
266       XBT_INFO("Mmh. Got a timeout. Nevermind. Let's keep going!");
267     } else {
268       XBT_INFO("Hey ?! What's up ? ");
269       xbt_die("Unexpected behavior");
270     }
271   }
272   XBT_INFO("I'm done. See you!");
273   return 0;
274 }                               /* end_of_slave */
275
276 /** Main function */
277 int main(int argc, char *argv[])
278 {
279   msg_error_t res = MSG_OK;
280   unsigned long seed_platf_gen[] = {134, 233445, 865, 2634, 424242, 876543};
281   unsigned long seed_trace_gen[] = {8865244, 356772, 42, 77465, 2098754, 8725442};
282   int connected;
283   int max_tries = 10;
284
285   //MSG initialisation
286   MSG_init(&argc, argv);
287
288   //Set up the seed for the platform generation
289   platf_random_seed(seed_platf_gen);
290
291   //Set up the RngStream for trace generation
292   sg_platf_rng_stream_init(seed_trace_gen);
293
294   XBT_INFO("creating nodes...");
295   platf_graph_uniform(10);
296
297   do {
298     max_tries--;
299     XBT_INFO("creating links...");
300     platf_graph_clear_links();
301     platf_graph_interconnect_waxman(0.9, 0.4);
302     XBT_INFO("done. Check connectedness...");
303     connected = platf_graph_is_connected();
304     XBT_INFO("Is it connected : %s", connected ? "yes" : (max_tries ? "no, retrying" : "no"));
305   } while(!connected && max_tries);
306
307   if(!connected && !max_tries) {
308     xbt_die("Impossible to connect the graph, aborting.");
309   }
310
311   XBT_INFO("registering callbacks...");
312   platf_graph_promoter(promoter_1);
313   platf_graph_labeler(labeler_1);
314
315   XBT_INFO("protmoting...");
316   platf_do_promote();
317
318   XBT_INFO("labeling...");
319   platf_do_label();
320
321   XBT_INFO("Putting it in surf...");
322   platf_generate();
323
324   XBT_INFO("Let's get the available hosts and dispatch work:");
325
326   unsigned int i;
327   msg_host_t host = NULL;
328   msg_host_t host_master = NULL;
329   msg_process_t process = NULL;
330   xbt_dynar_t host_dynar = MSG_hosts_as_dynar();
331   char** hostname_list = xbt_malloc(sizeof(char*) * xbt_dynar_length(host_dynar));
332
333   xbt_dynar_foreach(host_dynar, i, host) {
334     process = MSG_process_create("slave", slave, NULL, host);
335     MSG_process_auto_restart_set(process, TRUE);
336     hostname_list[i] = (char*) MSG_host_get_name(host);
337   }
338   host_master = MSG_get_host_by_name("host_master");
339   MSG_process_create_with_arguments("master", master, NULL, host_master, xbt_dynar_length(host_dynar), hostname_list);
340
341   res = MSG_main();
342
343   if (res == MSG_OK)
344     return 0;
345   else
346     return 1;
347 }                               /* end_of_main */