Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
ee179e572ad2a12dd22c40e1841cc3733c998f50
[simgrid.git] / examples / msg / masterslave / masterslave_failure_platfgen.c
1 /* Copyright (c) 2007-2010, 2012-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdio.h>
8 #include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
9 #include "xbt/sysdep.h"         /* calloc, printf */
10
11 /* Create a log channel to have nice outputs. */
12 #include "xbt/log.h"
13 #include "xbt/asserts.h"
14
15 /* we are going to use a generated platform */
16 #include "simgrid/platf_generator.h"
17 #include "simgrid/platf_interface.h"
18
19 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
20                              "Messages specific for this msg example");
21
22 int master(int argc, char *argv[]);
23 int slave(int argc, char *argv[]);
24 void promoter_1(context_node_t node);
25 void labeler_1(context_edge_t edge);
26
27 #define FINALIZE ((void*)221297)        /* a magic number to tell people to stop working */
28
29 #define TASK_COUNT_PER_HOST 5           /* Number of tasks each slave has to do */
30 #define TASK_COMP_SIZE 15000000         /* computation cost of each task */
31 #define TASK_COMM_SIZE 1200000          /* communication time of task */
32
33 /** Promoter function
34  * Just promote each node into a host, with fixed power
35  * Set one node as master
36  * Add a state trace on other nodes
37  */
38 void promoter_1(context_node_t node) {
39
40   s_sg_platf_host_cbarg_t host_parameters;
41   static int master_choosen = FALSE;
42
43   host_parameters.id = NULL;
44   host_parameters.power_peak = xbt_dynar_new(sizeof(double), NULL);
45   xbt_dynar_push_as(host_parameters.power_peak, double, 25000000.0);
46   host_parameters.core_amount = 1;
47   host_parameters.power_scale = 1;
48   host_parameters.power_trace = NULL;
49   host_parameters.initial_state = SURF_RESOURCE_ON;
50   host_parameters.state_trace = NULL;
51   host_parameters.coord = NULL;
52   host_parameters.properties = NULL;
53
54   if(!master_choosen) {
55     master_choosen = TRUE;
56     host_parameters.id = "host_master";
57   } else {
58     //Set a power trace
59     char* pw_date_generator_id = bprintf("pw_date_host_%ld", node->id);
60     char* pw_value_generator_id = bprintf("pw_value_host_%ld", node->id);
61     probabilist_event_generator_t pw_date_generator =
62                           tmgr_event_generator_new_uniform(pw_date_generator_id, 5, 10);
63     probabilist_event_generator_t pw_value_generator =
64                           tmgr_event_generator_new_uniform(pw_value_generator_id, 0.6, 1.0);
65     host_parameters.power_trace =
66               tmgr_trace_generator_value(bprintf("pw_host_%ld", node->id),
67                                                 pw_date_generator,
68                                                 pw_value_generator);
69   }
70
71   platf_graph_promote_to_host(node, &host_parameters);
72
73 }
74
75 /** Labeler function
76  * Set all links with the same bandwidth and latency
77  * Add a state trace to each link
78  */
79 void labeler_1(context_edge_t edge) {
80   s_sg_platf_link_cbarg_t link_parameters;
81   link_parameters.id = NULL;
82   link_parameters.bandwidth = 100000000;
83   link_parameters.bandwidth_trace = NULL;
84   link_parameters.latency = 0.01;
85   link_parameters.latency_trace = NULL;
86   link_parameters.state = SURF_RESOURCE_ON;
87   link_parameters.state_trace = NULL;
88   link_parameters.policy = SURF_LINK_SHARED;
89   link_parameters.properties = NULL;
90
91   char* avail_generator_id = bprintf("avail_link_%ld", edge->id);
92   char* unavail_generator_id = bprintf("unavail_link_%ld", edge->id);
93
94   probabilist_event_generator_t avail_generator =
95                           tmgr_event_generator_new_exponential(avail_generator_id, 0.0001);
96   probabilist_event_generator_t unavail_generator =
97                           tmgr_event_generator_new_uniform(unavail_generator_id, 10, 20);
98
99   link_parameters.state_trace =
100               tmgr_trace_generator_avail_unavail(bprintf("state_link_%ld", edge->id),
101                                                 avail_generator,
102                                                 unavail_generator,
103                                                 SURF_RESOURCE_ON);
104
105
106   platf_graph_link_label(edge, &link_parameters);
107
108 }
109
110 int master(int argc, char *argv[])
111 {
112   int slaves_count = 0;
113   msg_host_t *slaves = NULL;
114   int number_of_tasks = 0;
115   double task_comp_size = 0;
116   double task_comm_size = 0;
117   int i;
118
119   number_of_tasks = TASK_COUNT_PER_HOST*argc;
120   task_comp_size = TASK_COMP_SIZE;
121   task_comm_size = TASK_COMM_SIZE;
122
123   {                             /* Process organization */
124     slaves_count = argc;
125     slaves = xbt_new0(msg_host_t, slaves_count);
126
127     for (i = 0; i < argc; i++) {
128       slaves[i] = MSG_get_host_by_name(argv[i]);
129       if (slaves[i] == NULL) {
130         XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
131         abort();
132       }
133     }
134   }
135
136   XBT_INFO("Got %d slave(s) :", slaves_count);
137   for (i = 0; i < slaves_count; i++)
138     XBT_INFO("%s", MSG_host_get_name(slaves[i]));
139
140   XBT_INFO("Got %d task to process :", number_of_tasks);
141
142   for (i = 0; i < number_of_tasks; i++) {
143     msg_task_t task = MSG_task_create("Task", task_comp_size, task_comm_size,
144                                     xbt_new0(double, 1));
145     int a;
146     *((double *) task->data) = MSG_get_clock();
147
148     a = MSG_task_send_with_timeout(task,MSG_host_get_name(slaves[i % slaves_count]),10.0);
149
150     if (a == MSG_OK) {
151       XBT_INFO("Send completed");
152     } else if (a == MSG_HOST_FAILURE) {
153       XBT_INFO
154           ("Gloups. The cpu on which I'm running just turned off!. See you!");
155       free(task->data);
156       MSG_task_destroy(task);
157       free(slaves);
158       return 0;
159     } else if (a == MSG_TRANSFER_FAILURE) {
160       XBT_INFO
161           ("Mmh. Something went wrong with '%s'. Nevermind. Let's keep going!",
162               MSG_host_get_name(slaves[i % slaves_count]));
163       free(task->data);
164       MSG_task_destroy(task);
165     } else if (a == MSG_TIMEOUT) {
166       XBT_INFO
167           ("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!",
168               MSG_host_get_name(slaves[i % slaves_count]));
169       free(task->data);
170       MSG_task_destroy(task);
171     } else {
172       XBT_INFO("Hey ?! What's up ? ");
173       xbt_die( "Unexpected behavior");
174     }
175   }
176
177   XBT_INFO
178       ("All tasks have been dispatched. Let's tell everybody the computation is over.");
179   for (i = 0; i < slaves_count; i++) {
180     msg_task_t task = MSG_task_create("finalize", 0, 0, FINALIZE);
181     int a = MSG_task_send_with_timeout(task,MSG_host_get_name(slaves[i]),1.0);
182     if (a == MSG_OK)
183       continue;
184     if (a == MSG_HOST_FAILURE) {
185       XBT_INFO
186           ("Gloups. The cpu on which I'm running just turned off!. See you!");
187       MSG_task_destroy(task);
188       free(slaves);
189       return 0;
190     } else if (a == MSG_TRANSFER_FAILURE) {
191       XBT_INFO("Mmh. Can't reach '%s'! Nevermind. Let's keep going!",
192           MSG_host_get_name(slaves[i]));
193       MSG_task_destroy(task);
194     } else if (a == MSG_TIMEOUT) {
195       XBT_INFO
196           ("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!",
197               MSG_host_get_name(slaves[i % slaves_count]));
198       MSG_task_destroy(task);
199     } else {
200       XBT_INFO("Hey ?! What's up ? ");
201       xbt_die("Unexpected behavior with '%s': %d", MSG_host_get_name(slaves[i]), a);
202     }
203   }
204
205   XBT_INFO("Goodbye now!");
206   free(slaves);
207   return 0;
208 }                               /* end_of_master */
209
210 int slave(int argc, char *argv[])
211 {
212   while (1) {
213     msg_task_t task = NULL;
214     int a;
215     double time1, time2;
216
217     time1 = MSG_get_clock();
218     a = MSG_task_receive( &(task), MSG_host_get_name(MSG_host_self()) );
219     time2 = MSG_get_clock();
220     if (a == MSG_OK) {
221       XBT_INFO("Received \"%s\"", MSG_task_get_name(task));
222       if (MSG_task_get_data(task) == FINALIZE) {
223         MSG_task_destroy(task);
224         break;
225       }
226       if (time1 < *((double *) task->data))
227         time1 = *((double *) task->data);
228       XBT_INFO("Communication time : \"%f\"", time2 - time1);
229       XBT_INFO("Processing \"%s\"", MSG_task_get_name(task));
230       a = MSG_task_execute(task);
231       if (a == MSG_OK) {
232         XBT_INFO("\"%s\" done", MSG_task_get_name(task));
233         free(task->data);
234         MSG_task_destroy(task);
235       } else if (a == MSG_HOST_FAILURE) {
236         XBT_INFO
237             ("Gloups. The cpu on which I'm running just turned off!. See you!");
238         return 0;
239       } else {
240         XBT_INFO("Hey ?! What's up ? ");
241         xbt_die("Unexpected behavior");
242       }
243     } else if (a == MSG_HOST_FAILURE) {
244       XBT_INFO
245           ("Gloups. The cpu on which I'm running just turned off!. See you!");
246       return 0;
247     } else if (a == MSG_TRANSFER_FAILURE) {
248       XBT_INFO("Mmh. Something went wrong. Nevermind. Let's keep going!");
249     } else if (a == MSG_TIMEOUT) {
250       XBT_INFO("Mmh. Got a timeout. Nevermind. Let's keep going!");
251     } else {
252       XBT_INFO("Hey ?! What's up ? ");
253       xbt_die("Unexpected behavior");
254     }
255   }
256   XBT_INFO("I'm done. See you!");
257   return 0;
258 }                               /* end_of_slave */
259
260 /** Main function */
261 int main(int argc, char *argv[])
262 {
263   msg_error_t res = MSG_OK;
264   unsigned long seed_platf_gen[] = {134, 233445, 865, 2634, 424242, 876543};
265   unsigned long seed_trace_gen[] = {8865244, 356772, 42, 77465, 2098754, 8725442};
266   int connected;
267   int max_tries = 10;
268
269   //MSG initialization
270   MSG_init(&argc, argv);
271
272   //Set up the seed for the platform generation
273   platf_random_seed(seed_platf_gen);
274
275   //Set up the RngStream for trace generation
276   sg_platf_rng_stream_init(seed_trace_gen);
277
278   XBT_INFO("creating nodes...");
279   platf_graph_uniform(10);
280
281   do {
282     max_tries--;
283     XBT_INFO("creating links...");
284     platf_graph_clear_links();
285     platf_graph_interconnect_waxman(0.9, 0.4);
286     XBT_INFO("done. Check connectedness...");
287     connected = platf_graph_is_connected();
288     XBT_INFO("Is it connected : %s", connected ? "yes" : (max_tries ? "no, retrying" : "no"));
289   } while(!connected && max_tries);
290
291   if(!connected && !max_tries) {
292     xbt_die("Impossible to connect the graph, aborting.");
293   }
294
295   XBT_INFO("registering callbacks...");
296   platf_graph_promoter(promoter_1);
297   platf_graph_labeler(labeler_1);
298
299   XBT_INFO("promoting...");
300   platf_do_promote();
301
302   XBT_INFO("labeling...");
303   platf_do_label();
304
305   XBT_INFO("Putting it in surf...");
306   platf_generate();
307
308   XBT_INFO("Let's get the available hosts and dispatch work:");
309
310   unsigned int i;
311   msg_host_t host = NULL;
312   msg_host_t host_master = NULL;
313   msg_process_t process = NULL;
314   xbt_dynar_t host_dynar = MSG_hosts_as_dynar();
315   char** hostname_list =
316     xbt_malloc(sizeof(char*) * xbt_dynar_length(host_dynar));
317
318   xbt_dynar_foreach(host_dynar, i, host) {
319     process = MSG_process_create("slave", slave, NULL, host);
320     MSG_process_auto_restart_set(process, TRUE);
321     hostname_list[i] = (char*) MSG_host_get_name(host);
322   }
323   host_master = MSG_get_host_by_name("host_master");
324   MSG_process_create_with_arguments("master", master, NULL, host_master,
325                                     xbt_dynar_length(host_dynar),
326                                     hostname_list);
327
328   res = MSG_main();
329
330   if (res == MSG_OK)
331     return 0;
332   else
333     return 1;
334 }                               /* end_of_main */