From: Jean-Baptiste Hervé Date: Fri, 10 Aug 2012 15:52:53 +0000 (+0200) Subject: Add a master/slave example using a generated platform, and using generated event... X-Git-Tag: v3_8~165 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/12802de52c0a297e8c3253b7d9fc875c7e9cb0d9?hp=0b08c884ed04abf2b6948e445c05a8af2818565f Add a master/slave example using a generated platform, and using generated event traces This example fails at the end for the moment, apparently because of autorestart. --- diff --git a/examples/msg/masterslave/CMakeLists.txt b/examples/msg/masterslave/CMakeLists.txt index 44ed1b670f..e68e0da4c0 100644 --- a/examples/msg/masterslave/CMakeLists.txt +++ b/examples/msg/masterslave/CMakeLists.txt @@ -11,6 +11,7 @@ add_executable(masterslave_cluster "masterslave_cluster.c") add_executable(masterslave_kill "masterslave_kill.c") add_executable(masterslave_arg "masterslave_arg.c") add_executable(masterslave_platfgen "masterslave_platfgen.c") +add_executable(masterslave_failure_platfgen "masterslave_failure_platfgen.c") ### Add definitions for compile if(WIN32) @@ -22,6 +23,7 @@ if(WIN32) target_link_libraries(masterslave_kill simgrid ) target_link_libraries(masterslave_arg simgrid ) target_link_libraries(masterslave_platfgen simgrid ) + target_link_libraries(masterslave_failure_platfgen simgrid ) else(WIN32) target_link_libraries(masterslave_forwarder simgrid m ) target_link_libraries(masterslave_failure simgrid m ) @@ -31,6 +33,7 @@ else(WIN32) target_link_libraries(masterslave_kill simgrid m ) target_link_libraries(masterslave_arg simgrid m ) target_link_libraries(masterslave_platfgen simgrid m ) + target_link_libraries(masterslave_failure_platfgen simgrid m ) endif(WIN32) target_link_libraries(masterslave_cluster simgrid) @@ -77,6 +80,7 @@ set(examples_src ${CMAKE_CURRENT_SOURCE_DIR}/masterslave_kill.c ${CMAKE_CURRENT_SOURCE_DIR}/masterslave_mailbox.c ${CMAKE_CURRENT_SOURCE_DIR}/masterslave_platfgen.c + ${CMAKE_CURRENT_SOURCE_DIR}/masterslave_failure_platfgen.c PARENT_SCOPE ) set(bin_files diff --git a/examples/msg/masterslave/masterslave_failure_platfgen.c b/examples/msg/masterslave/masterslave_failure_platfgen.c new file mode 100644 index 0000000000..8194e32062 --- /dev/null +++ b/examples/msg/masterslave/masterslave_failure_platfgen.c @@ -0,0 +1,348 @@ +/* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include +#include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */ +#include "xbt/sysdep.h" /* calloc, printf */ + +/* Create a log channel to have nice outputs. */ +#include "xbt/log.h" +#include "xbt/asserts.h" + +/* we are going to use a generated platform */ +#include "simgrid/platf_generator.h" +#include "simgrid/platf_interface.h" + +XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test, + "Messages specific for this msg example"); + +int master(int argc, char *argv[]); +int slave(int argc, char *argv[]); +void promoter_1(context_node_t node); +void labeler_1(context_edge_t edge); + +#define FINALIZE ((void*)221297) /* a magic number to tell people to stop working */ + +#define TASK_COUNT_PER_HOST 5 /* Number of tasks each slave has to do */ +#define TASK_COMP_SIZE 15000000 /* computation cost of each task */ +#define TASK_COMM_SIZE 1200000 /* communication time of task */ + +/** Promoter function + * Just promote each node into a host, with fixed power + * Set one node as master + * Add a state trace on other nodes + */ +void promoter_1(context_node_t node) { + + s_sg_platf_host_cbarg_t host_parameters; + static int master_choosen = FALSE; + + host_parameters.id = NULL; + host_parameters.power_peak = 1000000; + + host_parameters.core_amount = 1; + host_parameters.power_scale = 1; + host_parameters.power_trace = NULL; + host_parameters.initial_state = SURF_RESOURCE_ON; + host_parameters.state_trace = NULL; + host_parameters.coord = NULL; + host_parameters.properties = NULL; + + if(!master_choosen) { + master_choosen = TRUE; + host_parameters.id = "host_master"; + } else { + /* + * The bug #14699 cannot allow us to set up an event trace which + * begin by SURF_RESOURCE_OFF, otherwise, the host will be down at startup + * and the associate process will fail to start. So, here, we generate a + * first useless event. + */ + //Set a availability trace for the node + char* generator_id = bprintf("state_host_%ld", node->id); + probabilist_event_generator_t date_generator = + tmgr_event_generator_new_weibull(generator_id, 80, 1.5); + host_parameters.state_trace = tmgr_trace_generator_state(generator_id, + date_generator, + SURF_RESOURCE_ON); + + //Set a power trace + char* pw_date_generator_id = bprintf("pw_date_host_%ld", node->id); + char* pw_value_generator_id = bprintf("pw_value_host_%ld", node->id); + probabilist_event_generator_t pw_date_generator = + tmgr_event_generator_new_uniform(pw_date_generator_id, 5, 10); + probabilist_event_generator_t pw_value_generator = + tmgr_event_generator_new_uniform(pw_value_generator_id, 0.6, 1.0); + host_parameters.power_trace = + tmgr_trace_generator_value(bprintf("pw_host_%ld", node->id), + pw_date_generator, + pw_value_generator); + } + + platf_graph_promote_to_host(node, &host_parameters); + +} + +/** Labeler function + * Set all links with the same bandwidth and latency + * Add a state trace to each link + */ +void labeler_1(context_edge_t edge) { + s_sg_platf_link_cbarg_t link_parameters; + link_parameters.id = NULL; + link_parameters.bandwidth = 100000000; + link_parameters.bandwidth_trace = NULL; + link_parameters.latency = 0.01; + link_parameters.latency_trace = NULL; + link_parameters.state = SURF_RESOURCE_ON; + link_parameters.state_trace = NULL; + link_parameters.policy = SURF_LINK_SHARED; + link_parameters.properties = NULL; + + char* avail_generator_id = bprintf("avail_link_%ld", edge->id); + char* unavail_generator_id = bprintf("unavail_link_%ld", edge->id); + + probabilist_event_generator_t avail_generator = + tmgr_event_generator_new_exponential(avail_generator_id, 0.0001); + probabilist_event_generator_t unavail_generator = + tmgr_event_generator_new_uniform(unavail_generator_id, 10, 20); + + link_parameters.state_trace = + tmgr_trace_generator_avail_unavail(bprintf("state_link_%ld", edge->id), + avail_generator, + unavail_generator, + SURF_RESOURCE_ON); + + + platf_graph_link_label(edge, &link_parameters); + +} + +/** Emitter function */ +int master(int argc, char *argv[]) +{ + int slaves_count = 0; + msg_host_t *slaves = NULL; + int number_of_tasks = 0; + double task_comp_size = 0; + double task_comm_size = 0; + int i; + _XBT_GNUC_UNUSED int read; + + number_of_tasks = TASK_COUNT_PER_HOST*argc; + task_comp_size = TASK_COMP_SIZE; + task_comm_size = TASK_COMM_SIZE; + + { /* Process organisation */ + slaves_count = argc; + slaves = xbt_new0(msg_host_t, slaves_count); + + for (i = 0; i < argc; i++) { + slaves[i] = MSG_get_host_by_name(argv[i]); + if (slaves[i] == NULL) { + XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]); + abort(); + } + } + } + + XBT_INFO("Got %d slave(s) :", slaves_count); + for (i = 0; i < slaves_count; i++) + XBT_INFO("%s", MSG_host_get_name(slaves[i])); + + XBT_INFO("Got %d task to process :", number_of_tasks); + + for (i = 0; i < number_of_tasks; i++) { + msg_task_t task = MSG_task_create("Task", task_comp_size, task_comm_size, + xbt_new0(double, 1)); + int a; + *((double *) task->data) = MSG_get_clock(); + + a = MSG_task_send_with_timeout(task,MSG_host_get_name(slaves[i % slaves_count]),10.0); + + if (a == MSG_OK) { + XBT_INFO("Send completed"); + } else if (a == MSG_HOST_FAILURE) { + XBT_INFO + ("Gloups. The cpu on which I'm running just turned off!. See you!"); + free(task->data); + MSG_task_destroy(task); + free(slaves); + return 0; + } else if (a == MSG_TRANSFER_FAILURE) { + XBT_INFO + ("Mmh. Something went wrong with '%s'. Nevermind. Let's keep going!", + MSG_host_get_name(slaves[i % slaves_count])); + free(task->data); + MSG_task_destroy(task); + } else if (a == MSG_TIMEOUT) { + XBT_INFO + ("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!", + MSG_host_get_name(slaves[i % slaves_count])); + free(task->data); + MSG_task_destroy(task); + } else { + XBT_INFO("Hey ?! What's up ? "); + xbt_die( "Unexpected behavior"); + } + } + + XBT_INFO + ("All tasks have been dispatched. Let's tell everybody the computation is over."); + for (i = 0; i < slaves_count; i++) { + msg_task_t task = MSG_task_create("finalize", 0, 0, FINALIZE); + int a = MSG_task_send_with_timeout(task,MSG_host_get_name(slaves[i]),1.0); + if (a == MSG_OK) + continue; + if (a == MSG_HOST_FAILURE) { + XBT_INFO + ("Gloups. The cpu on which I'm running just turned off!. See you!"); + MSG_task_destroy(task); + free(slaves); + return 0; + } else if (a == MSG_TRANSFER_FAILURE) { + XBT_INFO("Mmh. Can't reach '%s'! Nevermind. Let's keep going!", + MSG_host_get_name(slaves[i])); + MSG_task_destroy(task); + } else if (a == MSG_TIMEOUT) { + XBT_INFO + ("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!", + MSG_host_get_name(slaves[i % slaves_count])); + MSG_task_destroy(task); + } else { + XBT_INFO("Hey ?! What's up ? "); + xbt_die("Unexpected behavior with '%s': %d", MSG_host_get_name(slaves[i]), a); + } + } + + XBT_INFO("Goodbye now!"); + free(slaves); + return 0; +} /* end_of_master */ + +/** Receiver function */ +int slave(int argc, char *argv[]) +{ + while (1) { + msg_task_t task = NULL; + int a; + double time1, time2; + + time1 = MSG_get_clock(); + a = MSG_task_receive( &(task), MSG_host_get_name(MSG_host_self()) ); + time2 = MSG_get_clock(); + if (a == MSG_OK) { + XBT_INFO("Received \"%s\"", MSG_task_get_name(task)); + if (MSG_task_get_data(task) == FINALIZE) { + MSG_task_destroy(task); + break; + } + if (time1 < *((double *) task->data)) + time1 = *((double *) task->data); + XBT_INFO("Communication time : \"%f\"", time2 - time1); + XBT_INFO("Processing \"%s\"", MSG_task_get_name(task)); + a = MSG_task_execute(task); + if (a == MSG_OK) { + XBT_INFO("\"%s\" done", MSG_task_get_name(task)); + free(task->data); + MSG_task_destroy(task); + } else if (a == MSG_HOST_FAILURE) { + XBT_INFO + ("Gloups. The cpu on which I'm running just turned off!. See you!"); + return 0; + } else { + XBT_INFO("Hey ?! What's up ? "); + xbt_die("Unexpected behavior"); + } + } else if (a == MSG_HOST_FAILURE) { + XBT_INFO + ("Gloups. The cpu on which I'm running just turned off!. See you!"); + return 0; + } else if (a == MSG_TRANSFER_FAILURE) { + XBT_INFO("Mmh. Something went wrong. Nevermind. Let's keep going!"); + } else if (a == MSG_TIMEOUT) { + XBT_INFO("Mmh. Got a timeout. Nevermind. Let's keep going!"); + } else { + XBT_INFO("Hey ?! What's up ? "); + xbt_die("Unexpected behavior"); + } + } + XBT_INFO("I'm done. See you!"); + return 0; +} /* end_of_slave */ + +/** Main function */ +int main(int argc, char *argv[]) +{ + msg_error_t res = MSG_OK; + unsigned long seed_platf_gen[] = {134, 233445, 865, 2634, 424242, 876543}; + unsigned long seed_trace_gen[] = {8865244, 356772, 42, 77465, 2098754, 8725442}; + int connected; + int max_tries = 10; + + //MSG initialisation + MSG_init(&argc, argv); + + //Set up the seed for the platform generation + platf_random_seed(seed_platf_gen); + + //Set up the RngStream for trace generation + sg_platf_rng_stream_init(seed_trace_gen); + + XBT_INFO("creating nodes..."); + platf_graph_uniform(10); + + do { + max_tries--; + XBT_INFO("creating links..."); + platf_graph_clear_links(); + platf_graph_interconnect_waxman(0.9, 0.4); + XBT_INFO("done. Check connectedness..."); + connected = platf_graph_is_connected(); + XBT_INFO("Is it connected : %s", connected ? "yes" : (max_tries ? "no, retrying" : "no")); + } while(!connected && max_tries); + + if(!connected && !max_tries) { + xbt_die("Impossible to connect the graph, aborting."); + } + + XBT_INFO("registering callbacks..."); + platf_graph_promoter(promoter_1); + platf_graph_labeler(labeler_1); + + XBT_INFO("protmoting..."); + platf_do_promote(); + + XBT_INFO("labeling..."); + platf_do_label(); + + XBT_INFO("Putting it in surf..."); + platf_generate(); + + XBT_INFO("Let's get the available hosts and dispatch work:"); + + unsigned int i; + msg_host_t host = NULL; + msg_host_t host_master = NULL; + msg_process_t process = NULL; + xbt_dynar_t host_dynar = MSG_hosts_as_dynar(); + char** hostname_list = malloc(sizeof(char*) * xbt_dynar_length(host_dynar)); + + xbt_dynar_foreach(host_dynar, i, host) { + process = MSG_process_create("slave", slave, NULL, host); + MSG_process_auto_restart_set(process, TRUE); + hostname_list[i] = (char*) MSG_host_get_name(host); + } + host_master = MSG_get_host_by_name("host_master"); + MSG_process_create_with_arguments("master", master, NULL, host_master, xbt_dynar_length(host_dynar), hostname_list); + + res = MSG_main(); + + if (res == MSG_OK) + return 0; + else + return 1; +} /* end_of_main */