Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add a master/slave example using a generated platform, and using generated event...
[simgrid.git] / examples / msg / masterslave / masterslave_failure_platfgen.c
diff --git a/examples/msg/masterslave/masterslave_failure_platfgen.c b/examples/msg/masterslave/masterslave_failure_platfgen.c
new file mode 100644 (file)
index 0000000..8194e32
--- /dev/null
@@ -0,0 +1,348 @@
+/* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <stdio.h>
+#include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
+#include "xbt/sysdep.h"         /* calloc, printf */
+
+/* Create a log channel to have nice outputs. */
+#include "xbt/log.h"
+#include "xbt/asserts.h"
+
+/* we are going to use a generated platform */
+#include "simgrid/platf_generator.h"
+#include "simgrid/platf_interface.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
+                             "Messages specific for this msg example");
+
+int master(int argc, char *argv[]);
+int slave(int argc, char *argv[]);
+void promoter_1(context_node_t node);
+void labeler_1(context_edge_t edge);
+
+#define FINALIZE ((void*)221297)        /* a magic number to tell people to stop working */
+
+#define TASK_COUNT_PER_HOST 5           /* Number of tasks each slave has to do */
+#define TASK_COMP_SIZE 15000000         /* computation cost of each task */
+#define TASK_COMM_SIZE 1200000          /* communication time of task */
+
+/** Promoter function
+ * Just promote each node into a host, with fixed power
+ * Set one node as master
+ * Add a state trace on other nodes
+ */
+void promoter_1(context_node_t node) {
+
+  s_sg_platf_host_cbarg_t host_parameters;
+  static int master_choosen = FALSE;
+
+  host_parameters.id = NULL;
+  host_parameters.power_peak = 1000000;
+
+  host_parameters.core_amount = 1;
+  host_parameters.power_scale = 1;
+  host_parameters.power_trace = NULL;
+  host_parameters.initial_state = SURF_RESOURCE_ON;
+  host_parameters.state_trace = NULL;
+  host_parameters.coord = NULL;
+  host_parameters.properties = NULL;
+
+  if(!master_choosen) {
+    master_choosen = TRUE;
+    host_parameters.id = "host_master";
+  } else {
+    /*
+     * The bug #14699 cannot allow us to set up an event trace which
+     * begin by SURF_RESOURCE_OFF, otherwise, the host will be down at startup
+     * and the associate process will fail to start. So, here, we generate a
+     * first useless event.
+     */
+    //Set a availability trace for the node
+    char* generator_id = bprintf("state_host_%ld", node->id);
+    probabilist_event_generator_t date_generator =
+                            tmgr_event_generator_new_weibull(generator_id, 80, 1.5);
+    host_parameters.state_trace = tmgr_trace_generator_state(generator_id,
+                                                            date_generator,
+                                                            SURF_RESOURCE_ON);
+
+    //Set a power trace
+    char* pw_date_generator_id = bprintf("pw_date_host_%ld", node->id);
+    char* pw_value_generator_id = bprintf("pw_value_host_%ld", node->id);
+    probabilist_event_generator_t pw_date_generator =
+                          tmgr_event_generator_new_uniform(pw_date_generator_id, 5, 10);
+    probabilist_event_generator_t pw_value_generator =
+                          tmgr_event_generator_new_uniform(pw_value_generator_id, 0.6, 1.0);
+    host_parameters.power_trace =
+              tmgr_trace_generator_value(bprintf("pw_host_%ld", node->id),
+                                                pw_date_generator,
+                                                pw_value_generator);
+  }
+
+  platf_graph_promote_to_host(node, &host_parameters);
+
+}
+
+/** Labeler function
+ * Set all links with the same bandwidth and latency
+ * Add a state trace to each link
+ */
+void labeler_1(context_edge_t edge) {
+  s_sg_platf_link_cbarg_t link_parameters;
+  link_parameters.id = NULL;
+  link_parameters.bandwidth = 100000000;
+  link_parameters.bandwidth_trace = NULL;
+  link_parameters.latency = 0.01;
+  link_parameters.latency_trace = NULL;
+  link_parameters.state = SURF_RESOURCE_ON;
+  link_parameters.state_trace = NULL;
+  link_parameters.policy = SURF_LINK_SHARED;
+  link_parameters.properties = NULL;
+
+  char* avail_generator_id = bprintf("avail_link_%ld", edge->id);
+  char* unavail_generator_id = bprintf("unavail_link_%ld", edge->id);
+
+  probabilist_event_generator_t avail_generator =
+                          tmgr_event_generator_new_exponential(avail_generator_id, 0.0001);
+  probabilist_event_generator_t unavail_generator =
+                          tmgr_event_generator_new_uniform(unavail_generator_id, 10, 20);
+
+  link_parameters.state_trace =
+              tmgr_trace_generator_avail_unavail(bprintf("state_link_%ld", edge->id),
+                                                avail_generator,
+                                                unavail_generator,
+                                                SURF_RESOURCE_ON);
+
+
+  platf_graph_link_label(edge, &link_parameters);
+
+}
+
+/** Emitter function  */
+int master(int argc, char *argv[])
+{
+  int slaves_count = 0;
+  msg_host_t *slaves = NULL;
+  int number_of_tasks = 0;
+  double task_comp_size = 0;
+  double task_comm_size = 0;
+  int i;
+  _XBT_GNUC_UNUSED int read;
+
+  number_of_tasks = TASK_COUNT_PER_HOST*argc;
+  task_comp_size = TASK_COMP_SIZE;
+  task_comm_size = TASK_COMM_SIZE;
+
+  {                             /* Process organisation */
+    slaves_count = argc;
+    slaves = xbt_new0(msg_host_t, slaves_count);
+
+    for (i = 0; i < argc; i++) {
+      slaves[i] = MSG_get_host_by_name(argv[i]);
+      if (slaves[i] == NULL) {
+        XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
+        abort();
+      }
+    }
+  }
+
+  XBT_INFO("Got %d slave(s) :", slaves_count);
+  for (i = 0; i < slaves_count; i++)
+    XBT_INFO("%s", MSG_host_get_name(slaves[i]));
+
+  XBT_INFO("Got %d task to process :", number_of_tasks);
+
+  for (i = 0; i < number_of_tasks; i++) {
+    msg_task_t task = MSG_task_create("Task", task_comp_size, task_comm_size,
+                                    xbt_new0(double, 1));
+    int a;
+    *((double *) task->data) = MSG_get_clock();
+
+    a = MSG_task_send_with_timeout(task,MSG_host_get_name(slaves[i % slaves_count]),10.0);
+
+    if (a == MSG_OK) {
+      XBT_INFO("Send completed");
+    } else if (a == MSG_HOST_FAILURE) {
+      XBT_INFO
+          ("Gloups. The cpu on which I'm running just turned off!. See you!");
+      free(task->data);
+      MSG_task_destroy(task);
+      free(slaves);
+      return 0;
+    } else if (a == MSG_TRANSFER_FAILURE) {
+      XBT_INFO
+          ("Mmh. Something went wrong with '%s'. Nevermind. Let's keep going!",
+              MSG_host_get_name(slaves[i % slaves_count]));
+      free(task->data);
+      MSG_task_destroy(task);
+    } else if (a == MSG_TIMEOUT) {
+      XBT_INFO
+          ("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!",
+              MSG_host_get_name(slaves[i % slaves_count]));
+      free(task->data);
+      MSG_task_destroy(task);
+    } else {
+      XBT_INFO("Hey ?! What's up ? ");
+      xbt_die( "Unexpected behavior");
+    }
+  }
+
+  XBT_INFO
+      ("All tasks have been dispatched. Let's tell everybody the computation is over.");
+  for (i = 0; i < slaves_count; i++) {
+    msg_task_t task = MSG_task_create("finalize", 0, 0, FINALIZE);
+    int a = MSG_task_send_with_timeout(task,MSG_host_get_name(slaves[i]),1.0);
+    if (a == MSG_OK)
+      continue;
+    if (a == MSG_HOST_FAILURE) {
+      XBT_INFO
+          ("Gloups. The cpu on which I'm running just turned off!. See you!");
+      MSG_task_destroy(task);
+      free(slaves);
+      return 0;
+    } else if (a == MSG_TRANSFER_FAILURE) {
+      XBT_INFO("Mmh. Can't reach '%s'! Nevermind. Let's keep going!",
+          MSG_host_get_name(slaves[i]));
+      MSG_task_destroy(task);
+    } else if (a == MSG_TIMEOUT) {
+      XBT_INFO
+          ("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!",
+              MSG_host_get_name(slaves[i % slaves_count]));
+      MSG_task_destroy(task);
+    } else {
+      XBT_INFO("Hey ?! What's up ? ");
+      xbt_die("Unexpected behavior with '%s': %d", MSG_host_get_name(slaves[i]), a);
+    }
+  }
+
+  XBT_INFO("Goodbye now!");
+  free(slaves);
+  return 0;
+}                               /* end_of_master */
+
+/** Receiver function  */
+int slave(int argc, char *argv[])
+{
+  while (1) {
+    msg_task_t task = NULL;
+    int a;
+    double time1, time2;
+
+    time1 = MSG_get_clock();
+    a = MSG_task_receive( &(task), MSG_host_get_name(MSG_host_self()) );
+    time2 = MSG_get_clock();
+    if (a == MSG_OK) {
+      XBT_INFO("Received \"%s\"", MSG_task_get_name(task));
+      if (MSG_task_get_data(task) == FINALIZE) {
+        MSG_task_destroy(task);
+        break;
+      }
+      if (time1 < *((double *) task->data))
+        time1 = *((double *) task->data);
+      XBT_INFO("Communication time : \"%f\"", time2 - time1);
+      XBT_INFO("Processing \"%s\"", MSG_task_get_name(task));
+      a = MSG_task_execute(task);
+      if (a == MSG_OK) {
+        XBT_INFO("\"%s\" done", MSG_task_get_name(task));
+        free(task->data);
+        MSG_task_destroy(task);
+      } else if (a == MSG_HOST_FAILURE) {
+        XBT_INFO
+            ("Gloups. The cpu on which I'm running just turned off!. See you!");
+        return 0;
+      } else {
+        XBT_INFO("Hey ?! What's up ? ");
+        xbt_die("Unexpected behavior");
+      }
+    } else if (a == MSG_HOST_FAILURE) {
+      XBT_INFO
+          ("Gloups. The cpu on which I'm running just turned off!. See you!");
+      return 0;
+    } else if (a == MSG_TRANSFER_FAILURE) {
+      XBT_INFO("Mmh. Something went wrong. Nevermind. Let's keep going!");
+    } else if (a == MSG_TIMEOUT) {
+      XBT_INFO("Mmh. Got a timeout. Nevermind. Let's keep going!");
+    } else {
+      XBT_INFO("Hey ?! What's up ? ");
+      xbt_die("Unexpected behavior");
+    }
+  }
+  XBT_INFO("I'm done. See you!");
+  return 0;
+}                               /* end_of_slave */
+
+/** Main function */
+int main(int argc, char *argv[])
+{
+  msg_error_t res = MSG_OK;
+  unsigned long seed_platf_gen[] = {134, 233445, 865, 2634, 424242, 876543};
+  unsigned long seed_trace_gen[] = {8865244, 356772, 42, 77465, 2098754, 8725442};
+  int connected;
+  int max_tries = 10;
+
+  //MSG initialisation
+  MSG_init(&argc, argv);
+
+  //Set up the seed for the platform generation
+  platf_random_seed(seed_platf_gen);
+
+  //Set up the RngStream for trace generation
+  sg_platf_rng_stream_init(seed_trace_gen);
+
+  XBT_INFO("creating nodes...");
+  platf_graph_uniform(10);
+
+  do {
+    max_tries--;
+    XBT_INFO("creating links...");
+    platf_graph_clear_links();
+    platf_graph_interconnect_waxman(0.9, 0.4);
+    XBT_INFO("done. Check connectedness...");
+    connected = platf_graph_is_connected();
+    XBT_INFO("Is it connected : %s", connected ? "yes" : (max_tries ? "no, retrying" : "no"));
+  } while(!connected && max_tries);
+
+  if(!connected && !max_tries) {
+    xbt_die("Impossible to connect the graph, aborting.");
+  }
+
+  XBT_INFO("registering callbacks...");
+  platf_graph_promoter(promoter_1);
+  platf_graph_labeler(labeler_1);
+
+  XBT_INFO("protmoting...");
+  platf_do_promote();
+
+  XBT_INFO("labeling...");
+  platf_do_label();
+
+  XBT_INFO("Putting it in surf...");
+  platf_generate();
+
+  XBT_INFO("Let's get the available hosts and dispatch work:");
+
+  unsigned int i;
+  msg_host_t host = NULL;
+  msg_host_t host_master = NULL;
+  msg_process_t process = NULL;
+  xbt_dynar_t host_dynar = MSG_hosts_as_dynar();
+  char** hostname_list = malloc(sizeof(char*) * xbt_dynar_length(host_dynar));
+
+  xbt_dynar_foreach(host_dynar, i, host) {
+    process = MSG_process_create("slave", slave, NULL, host);
+    MSG_process_auto_restart_set(process, TRUE);
+    hostname_list[i] = (char*) MSG_host_get_name(host);
+  }
+  host_master = MSG_get_host_by_name("host_master");
+  MSG_process_create_with_arguments("master", master, NULL, host_master, xbt_dynar_length(host_dynar), hostname_list);
+
+  res = MSG_main();
+
+  if (res == MSG_OK)
+    return 0;
+  else
+    return 1;
+}                               /* end_of_main */