Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add a master/slave example using a generated platform, and using generated event...
[simgrid.git] / examples / msg / masterslave / masterslave_failure_platfgen.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdio.h>
8 #include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
9 #include "xbt/sysdep.h"         /* calloc, printf */
10
11 /* Create a log channel to have nice outputs. */
12 #include "xbt/log.h"
13 #include "xbt/asserts.h"
14
15 /* we are going to use a generated platform */
16 #include "simgrid/platf_generator.h"
17 #include "simgrid/platf_interface.h"
18
19 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
20                              "Messages specific for this msg example");
21
22 int master(int argc, char *argv[]);
23 int slave(int argc, char *argv[]);
24 void promoter_1(context_node_t node);
25 void labeler_1(context_edge_t edge);
26
27 #define FINALIZE ((void*)221297)        /* a magic number to tell people to stop working */
28
29 #define TASK_COUNT_PER_HOST 5           /* Number of tasks each slave has to do */
30 #define TASK_COMP_SIZE 15000000         /* computation cost of each task */
31 #define TASK_COMM_SIZE 1200000          /* communication time of task */
32
33 /** Promoter function
34  * Just promote each node into a host, with fixed power
35  * Set one node as master
36  * Add a state trace on other nodes
37  */
38 void promoter_1(context_node_t node) {
39
40   s_sg_platf_host_cbarg_t host_parameters;
41   static int master_choosen = FALSE;
42
43   host_parameters.id = NULL;
44   host_parameters.power_peak = 1000000;
45
46   host_parameters.core_amount = 1;
47   host_parameters.power_scale = 1;
48   host_parameters.power_trace = NULL;
49   host_parameters.initial_state = SURF_RESOURCE_ON;
50   host_parameters.state_trace = NULL;
51   host_parameters.coord = NULL;
52   host_parameters.properties = NULL;
53
54   if(!master_choosen) {
55     master_choosen = TRUE;
56     host_parameters.id = "host_master";
57   } else {
58     /*
59      * The bug #14699 cannot allow us to set up an event trace which
60      * begin by SURF_RESOURCE_OFF, otherwise, the host will be down at startup
61      * and the associate process will fail to start. So, here, we generate a
62      * first useless event.
63      */
64     //Set a availability trace for the node
65     char* generator_id = bprintf("state_host_%ld", node->id);
66     probabilist_event_generator_t date_generator =
67                             tmgr_event_generator_new_weibull(generator_id, 80, 1.5);
68     host_parameters.state_trace = tmgr_trace_generator_state(generator_id,
69                                                             date_generator,
70                                                             SURF_RESOURCE_ON);
71
72     //Set a power trace
73     char* pw_date_generator_id = bprintf("pw_date_host_%ld", node->id);
74     char* pw_value_generator_id = bprintf("pw_value_host_%ld", node->id);
75     probabilist_event_generator_t pw_date_generator =
76                           tmgr_event_generator_new_uniform(pw_date_generator_id, 5, 10);
77     probabilist_event_generator_t pw_value_generator =
78                           tmgr_event_generator_new_uniform(pw_value_generator_id, 0.6, 1.0);
79     host_parameters.power_trace =
80               tmgr_trace_generator_value(bprintf("pw_host_%ld", node->id),
81                                                 pw_date_generator,
82                                                 pw_value_generator);
83   }
84
85   platf_graph_promote_to_host(node, &host_parameters);
86
87 }
88
89 /** Labeler function
90  * Set all links with the same bandwidth and latency
91  * Add a state trace to each link
92  */
93 void labeler_1(context_edge_t edge) {
94   s_sg_platf_link_cbarg_t link_parameters;
95   link_parameters.id = NULL;
96   link_parameters.bandwidth = 100000000;
97   link_parameters.bandwidth_trace = NULL;
98   link_parameters.latency = 0.01;
99   link_parameters.latency_trace = NULL;
100   link_parameters.state = SURF_RESOURCE_ON;
101   link_parameters.state_trace = NULL;
102   link_parameters.policy = SURF_LINK_SHARED;
103   link_parameters.properties = NULL;
104
105   char* avail_generator_id = bprintf("avail_link_%ld", edge->id);
106   char* unavail_generator_id = bprintf("unavail_link_%ld", edge->id);
107
108   probabilist_event_generator_t avail_generator =
109                           tmgr_event_generator_new_exponential(avail_generator_id, 0.0001);
110   probabilist_event_generator_t unavail_generator =
111                           tmgr_event_generator_new_uniform(unavail_generator_id, 10, 20);
112
113   link_parameters.state_trace =
114               tmgr_trace_generator_avail_unavail(bprintf("state_link_%ld", edge->id),
115                                                 avail_generator,
116                                                 unavail_generator,
117                                                 SURF_RESOURCE_ON);
118
119
120   platf_graph_link_label(edge, &link_parameters);
121
122 }
123
124 /** Emitter function  */
125 int master(int argc, char *argv[])
126 {
127   int slaves_count = 0;
128   msg_host_t *slaves = NULL;
129   int number_of_tasks = 0;
130   double task_comp_size = 0;
131   double task_comm_size = 0;
132   int i;
133   _XBT_GNUC_UNUSED int read;
134
135   number_of_tasks = TASK_COUNT_PER_HOST*argc;
136   task_comp_size = TASK_COMP_SIZE;
137   task_comm_size = TASK_COMM_SIZE;
138
139   {                             /* Process organisation */
140     slaves_count = argc;
141     slaves = xbt_new0(msg_host_t, slaves_count);
142
143     for (i = 0; i < argc; i++) {
144       slaves[i] = MSG_get_host_by_name(argv[i]);
145       if (slaves[i] == NULL) {
146         XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
147         abort();
148       }
149     }
150   }
151
152   XBT_INFO("Got %d slave(s) :", slaves_count);
153   for (i = 0; i < slaves_count; i++)
154     XBT_INFO("%s", MSG_host_get_name(slaves[i]));
155
156   XBT_INFO("Got %d task to process :", number_of_tasks);
157
158   for (i = 0; i < number_of_tasks; i++) {
159     msg_task_t task = MSG_task_create("Task", task_comp_size, task_comm_size,
160                                     xbt_new0(double, 1));
161     int a;
162     *((double *) task->data) = MSG_get_clock();
163
164     a = MSG_task_send_with_timeout(task,MSG_host_get_name(slaves[i % slaves_count]),10.0);
165
166     if (a == MSG_OK) {
167       XBT_INFO("Send completed");
168     } else if (a == MSG_HOST_FAILURE) {
169       XBT_INFO
170           ("Gloups. The cpu on which I'm running just turned off!. See you!");
171       free(task->data);
172       MSG_task_destroy(task);
173       free(slaves);
174       return 0;
175     } else if (a == MSG_TRANSFER_FAILURE) {
176       XBT_INFO
177           ("Mmh. Something went wrong with '%s'. Nevermind. Let's keep going!",
178               MSG_host_get_name(slaves[i % slaves_count]));
179       free(task->data);
180       MSG_task_destroy(task);
181     } else if (a == MSG_TIMEOUT) {
182       XBT_INFO
183           ("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!",
184               MSG_host_get_name(slaves[i % slaves_count]));
185       free(task->data);
186       MSG_task_destroy(task);
187     } else {
188       XBT_INFO("Hey ?! What's up ? ");
189       xbt_die( "Unexpected behavior");
190     }
191   }
192
193   XBT_INFO
194       ("All tasks have been dispatched. Let's tell everybody the computation is over.");
195   for (i = 0; i < slaves_count; i++) {
196     msg_task_t task = MSG_task_create("finalize", 0, 0, FINALIZE);
197     int a = MSG_task_send_with_timeout(task,MSG_host_get_name(slaves[i]),1.0);
198     if (a == MSG_OK)
199       continue;
200     if (a == MSG_HOST_FAILURE) {
201       XBT_INFO
202           ("Gloups. The cpu on which I'm running just turned off!. See you!");
203       MSG_task_destroy(task);
204       free(slaves);
205       return 0;
206     } else if (a == MSG_TRANSFER_FAILURE) {
207       XBT_INFO("Mmh. Can't reach '%s'! Nevermind. Let's keep going!",
208           MSG_host_get_name(slaves[i]));
209       MSG_task_destroy(task);
210     } else if (a == MSG_TIMEOUT) {
211       XBT_INFO
212           ("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!",
213               MSG_host_get_name(slaves[i % slaves_count]));
214       MSG_task_destroy(task);
215     } else {
216       XBT_INFO("Hey ?! What's up ? ");
217       xbt_die("Unexpected behavior with '%s': %d", MSG_host_get_name(slaves[i]), a);
218     }
219   }
220
221   XBT_INFO("Goodbye now!");
222   free(slaves);
223   return 0;
224 }                               /* end_of_master */
225
226 /** Receiver function  */
227 int slave(int argc, char *argv[])
228 {
229   while (1) {
230     msg_task_t task = NULL;
231     int a;
232     double time1, time2;
233
234     time1 = MSG_get_clock();
235     a = MSG_task_receive( &(task), MSG_host_get_name(MSG_host_self()) );
236     time2 = MSG_get_clock();
237     if (a == MSG_OK) {
238       XBT_INFO("Received \"%s\"", MSG_task_get_name(task));
239       if (MSG_task_get_data(task) == FINALIZE) {
240         MSG_task_destroy(task);
241         break;
242       }
243       if (time1 < *((double *) task->data))
244         time1 = *((double *) task->data);
245       XBT_INFO("Communication time : \"%f\"", time2 - time1);
246       XBT_INFO("Processing \"%s\"", MSG_task_get_name(task));
247       a = MSG_task_execute(task);
248       if (a == MSG_OK) {
249         XBT_INFO("\"%s\" done", MSG_task_get_name(task));
250         free(task->data);
251         MSG_task_destroy(task);
252       } else if (a == MSG_HOST_FAILURE) {
253         XBT_INFO
254             ("Gloups. The cpu on which I'm running just turned off!. See you!");
255         return 0;
256       } else {
257         XBT_INFO("Hey ?! What's up ? ");
258         xbt_die("Unexpected behavior");
259       }
260     } else if (a == MSG_HOST_FAILURE) {
261       XBT_INFO
262           ("Gloups. The cpu on which I'm running just turned off!. See you!");
263       return 0;
264     } else if (a == MSG_TRANSFER_FAILURE) {
265       XBT_INFO("Mmh. Something went wrong. Nevermind. Let's keep going!");
266     } else if (a == MSG_TIMEOUT) {
267       XBT_INFO("Mmh. Got a timeout. Nevermind. Let's keep going!");
268     } else {
269       XBT_INFO("Hey ?! What's up ? ");
270       xbt_die("Unexpected behavior");
271     }
272   }
273   XBT_INFO("I'm done. See you!");
274   return 0;
275 }                               /* end_of_slave */
276
277 /** Main function */
278 int main(int argc, char *argv[])
279 {
280   msg_error_t res = MSG_OK;
281   unsigned long seed_platf_gen[] = {134, 233445, 865, 2634, 424242, 876543};
282   unsigned long seed_trace_gen[] = {8865244, 356772, 42, 77465, 2098754, 8725442};
283   int connected;
284   int max_tries = 10;
285
286   //MSG initialisation
287   MSG_init(&argc, argv);
288
289   //Set up the seed for the platform generation
290   platf_random_seed(seed_platf_gen);
291
292   //Set up the RngStream for trace generation
293   sg_platf_rng_stream_init(seed_trace_gen);
294
295   XBT_INFO("creating nodes...");
296   platf_graph_uniform(10);
297
298   do {
299     max_tries--;
300     XBT_INFO("creating links...");
301     platf_graph_clear_links();
302     platf_graph_interconnect_waxman(0.9, 0.4);
303     XBT_INFO("done. Check connectedness...");
304     connected = platf_graph_is_connected();
305     XBT_INFO("Is it connected : %s", connected ? "yes" : (max_tries ? "no, retrying" : "no"));
306   } while(!connected && max_tries);
307
308   if(!connected && !max_tries) {
309     xbt_die("Impossible to connect the graph, aborting.");
310   }
311
312   XBT_INFO("registering callbacks...");
313   platf_graph_promoter(promoter_1);
314   platf_graph_labeler(labeler_1);
315
316   XBT_INFO("protmoting...");
317   platf_do_promote();
318
319   XBT_INFO("labeling...");
320   platf_do_label();
321
322   XBT_INFO("Putting it in surf...");
323   platf_generate();
324
325   XBT_INFO("Let's get the available hosts and dispatch work:");
326
327   unsigned int i;
328   msg_host_t host = NULL;
329   msg_host_t host_master = NULL;
330   msg_process_t process = NULL;
331   xbt_dynar_t host_dynar = MSG_hosts_as_dynar();
332   char** hostname_list = malloc(sizeof(char*) * xbt_dynar_length(host_dynar));
333
334   xbt_dynar_foreach(host_dynar, i, host) {
335     process = MSG_process_create("slave", slave, NULL, host);
336     MSG_process_auto_restart_set(process, TRUE);
337     hostname_list[i] = (char*) MSG_host_get_name(host);
338   }
339   host_master = MSG_get_host_by_name("host_master");
340   MSG_process_create_with_arguments("master", master, NULL, host_master, xbt_dynar_length(host_dynar), hostname_list);
341
342   res = MSG_main();
343
344   if (res == MSG_OK)
345     return 0;
346   else
347     return 1;
348 }                               /* end_of_main */