Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Deployment file generator in Ruby, takes a platform file, parses hostnames and output...
authorMaximiliano Geier <maximiliano.geier@loria.fr>
Mon, 15 Oct 2012 16:14:01 +0000 (18:14 +0200)
committerMaximiliano Geier <maximiliano.geier@loria.fr>
Wed, 28 Nov 2012 10:49:25 +0000 (11:49 +0100)
examples/msg/kadeploy/broadcaster.c
examples/msg/kadeploy/broadcaster.h
examples/msg/kadeploy/common.h
examples/msg/kadeploy/generate_deployment_file.rb [new file with mode: 0755]
examples/msg/kadeploy/kadeploy.c
examples/msg/kadeploy/messages.c
examples/msg/kadeploy/peer.c
examples/msg/kadeploy/peer.h

index fea16bc..bcc1097 100644 (file)
@@ -14,13 +14,13 @@ xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
     hostname = xbt_new(char, HOSTNAME_LENGTH);
     snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
     //XBT_INFO("%s", hostname);
-    h = MSG_get_host_by_name(hostname);
+    /*h = MSG_get_host_by_name(hostname);
     if (h == NULL) {
       XBT_INFO("Unknown host %s. Stopping Now! ", hostname);
       abort();
-    } else {
+    } else {*/
       xbt_dynar_push(host_list, &hostname);
-    }
+    /*}*/
   }
   return host_list;
 }
@@ -49,12 +49,11 @@ void delete_hostlist(xbt_dynar_t h)
   xbt_dynar_free(&h);
 }
 
-int broadcaster_build_chain(const char **first, xbt_dynar_t host_list)
+int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar_iterator_t it)
 {
-  xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
   msg_task_t task = NULL;
   char **cur = (char**)xbt_dynar_iterator_next(it);
-  const char *me = MSG_host_get_name(MSG_host_self());
+  const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
   const char *current_host = NULL;
   const char *prev = NULL;
   const char *next = NULL;
@@ -87,7 +86,6 @@ int broadcaster_build_chain(const char **first, xbt_dynar_t host_list)
       last = current_host;
     } while (cur != NULL);
   }
-  xbt_dynar_iterator_delete(it);
 
   return MSG_OK;
 }
@@ -107,22 +105,22 @@ int broadcaster_send_file(const char *first)
     XBT_INFO("Sending (send) from %s into mailbox %s", me, first);
     status = MSG_task_send(task, first);
    
-    xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
+    xbt_assert(status == MSG_OK, "broadcaster_send_file() failed");
   }
 
   return MSG_OK;
 }
 
-/* FIXME: I should iterate nodes in the same order as the one used to build the chain */
-int broadcaster_finish(xbt_dynar_t host_list)
+int broadcaster_finish(xbt_dynar_iterator_t it)
 {
-  xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
   msg_task_t task = NULL;
-  const char *me = MSG_host_get_name(MSG_host_self());
+  const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
   const char *current_host = NULL;
   char **cur = NULL;
 
-  /* Send goodbye message to every peer */
+  xbt_dynar_iterator_seek(it, 0);
+
+  /* Send goodbye message to every peer in the order generated by iterator it */
   for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
     /* Send message to current peer */
     current_host = *cur;
@@ -144,16 +142,20 @@ int broadcaster(int argc, char *argv[])
 
   XBT_INFO("broadcaster");
 
-  /* Check that every host given by the hostcount in argv[1] exists and add it
-     to a dynamic array */
+  /* Add every mailbox given by the hostcount in argv[1] to a dynamic array */
   host_list = build_hostlist_from_hostcount(atoi(argv[1]));
   /*host_list = build_hostlist_from_argv(argc, argv);*/
   
+  /* Initialize iterator */
+  xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
+
   /* TODO: Error checking */
-  status = broadcaster_build_chain(&first, host_list);
+  status = broadcaster_build_chain(&first, host_list, it);
   status = broadcaster_send_file(first);
-  status = broadcaster_finish(host_list);
+  status = broadcaster_finish(it);
 
+  /* Destroy iterator and hostlist */
+  xbt_dynar_iterator_delete(it);
   delete_hostlist(host_list);
 
   return status;
index 2c55bee..b20cee6 100644 (file)
 
 #include "messages.h"
 #include "iterator.h"
+#include "common.h"
 
-#define HOSTNAME_LENGTH 20
-#define PIECE_COUNT 1000
+#define PIECE_COUNT 50
 
 xbt_dynar_t build_hostlist_from_hostcount(int hostcount); 
 /*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[]);*/
 
 /* Broadcaster: helper functions */
-int broadcaster_build_chain(const char **first, xbt_dynar_t host_list);
+int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar_iterator_t it);
 int broadcaster_send_file(const char *first);
-int broadcaster_finish(xbt_dynar_t host_list);
+int broadcaster_finish(xbt_dynar_iterator_t it);
 
 /* Tasks */
 int broadcaster(int argc, char *argv[]);
index 68bdc33..f0d9997 100644 (file)
@@ -5,7 +5,6 @@
 #include "xbt/sysdep.h"
 
 #define MESSAGE_SIZE 1
-
-
+#define HOSTNAME_LENGTH 20
 
 #endif /* KADEPLOY_COMMON_H */
diff --git a/examples/msg/kadeploy/generate_deployment_file.rb b/examples/msg/kadeploy/generate_deployment_file.rb
new file mode 100755 (executable)
index 0000000..fae59a8
--- /dev/null
@@ -0,0 +1,97 @@
+#!/usr/bin/env ruby
+
+require 'rexml/document'
+
+class HostsExtractor
+  @@doc = nil
+  @@hosts = []
+
+  def initialize(xml)
+    @@doc = REXML::Document.new(xml)
+    @@doc.elements.each('platform') do |platform|
+      extract_hosts(platform)
+    end
+  end
+
+  def extract_hosts(doc)
+    doc.elements.each('AS') do |as|
+      extract_hosts_from_AS(as)
+      extract_hosts(as)
+    end
+  end
+
+  def extract_hosts_from_AS(doc)
+    doc.elements.each('host') do |h|
+      @@hosts << h.attributes['id']
+      puts "hosts %s" % h.attributes['id']
+    end
+
+    doc.elements.each('cluster') do |c|
+      prefix = c.attributes['prefix']
+      suffix = c.attributes['suffix']
+      puts "%s %s %s" % [prefix, c.attributes['radical'], suffix]
+      expand_radical(c.attributes['radical']).each do |num|
+        @@hosts << "%s%s%s" % [prefix, num, suffix]
+      end
+    end
+  end
+
+  def hosts
+    return @@hosts
+  end
+
+  def expand_radical(radical)
+   l = []
+   puts radical
+   radical.split(',').each do |range|
+     range.scan(/^\d+$/) { |x| l << x }
+     range.scan(/^(\d+)-(\d+)$/) { |x, y| (x..y).each do |i| l << i end }
+   end
+   return l
+  end
+end
+
+class DeploymentGenerator
+  @@outfile = nil
+
+  def initialize(fname)
+    @@outfile = File.new(fname, "w")
+  end
+
+  def write_header
+    @@outfile.puts "<?xml version='1.0'?>"
+    @@outfile.puts "<!DOCTYPE platform SYSTEM \"http://simgrid.gforge.inria.fr/simgrid.dtd\">"
+    @@outfile.puts "<platform version=\"3\">"
+  end
+
+  def write_process(name, function, hosts, args)
+    @@outfile.puts "  <!-- %s -->" % name
+    hosts.zip(args).each do |h, a|
+      @@outfile.puts "  <process host=\"%s\" function=\"%s\">" % [h, function]
+      @@outfile.puts "    <argument value=\"%s\"/>" % [a]
+      @@outfile.puts "  </process>"
+    end
+  end
+
+  def write_footer
+    @@outfile.puts "</platform>"
+  end
+end
+
+xml = File.read(ARGV.shift)
+he = HostsExtractor.new(xml)
+
+raise "Cannot run with less than 2 hosts" unless he.hosts.size > 1
+
+output = ARGV.shift
+dg = DeploymentGenerator.new(output)
+dg.write_header
+
+puts he.hosts
+broadcaster = he.hosts.shift
+peers = he.hosts
+
+dg.write_process("Broadcaster", "broadcaster", [broadcaster], [he.hosts.size])
+dg.write_process("Peers", "peer", peers, (1..he.hosts.size))
+
+dg.write_footer
index 8a784cc..2beb023 100644 (file)
@@ -58,7 +58,10 @@ msg_error_t test_all(const char *platform_file,
   TRACE_category_with_color("host2", "0 1 1");
   TRACE_category_with_color("host3", "1 0 0");
   TRACE_category_with_color("host4", "1 0 1");
-  TRACE_category_with_color("host5", "1 1 0");
+  TRACE_category_with_color("host5", "0 0 0");
+  TRACE_category_with_color("host6", "1 1 0");
+  TRACE_category_with_color("host7", "1 1 1");
+  TRACE_category_with_color("host8", "0 1 0");
 
   /*   Application deployment */
   MSG_function_register("broadcaster", broadcaster);
index 7b661a3..177b4e8 100644 (file)
@@ -24,7 +24,8 @@ msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailb
 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
 {
   msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox);
-  if (strcmp(mailbox, "host4") == 0) MSG_task_set_category(task, mailbox);
+  //if (strcmp(mailbox, "host4") == 0) 
+  //MSG_task_set_category(task, mailbox);
   message_t msg = MSG_task_get_data(task);
   msg->data_block = block;
   msg->data_length = len;
index dcb8329..ba75b8f 100644 (file)
@@ -60,13 +60,13 @@ msg_error_t peer_wait_for_message(peer_t peer)
   int done = 0;
 
   while (!done) {
-    if (comm == NULL)
+    if (comm == NULL) // FIXME I should have a recv queue
       comm = MSG_task_irecv(&task, peer->me);
 
     if (MSG_comm_test(comm)) {
       status = MSG_comm_get_status(comm);
       //XBT_INFO("peer_wait_for_message: error code = %d", status);
-      xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed");
+      xbt_assert(status == MSG_OK, "peer_wait_for_message() failed");
       MSG_comm_destroy(comm);
       comm = NULL;
       done = peer_execute_task(peer, task);
@@ -81,7 +81,7 @@ msg_error_t peer_wait_for_message(peer_t peer)
   return status;
 }
 
-void peer_init(peer_t p)
+void peer_init(peer_t p, int argc, char *argv[])
 {
   p->init = 0;
   p->prev = NULL;
@@ -89,7 +89,13 @@ void peer_init(peer_t p)
   p->pieces = 0;
   p->close_asap = 0;
   p->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
-  p->me = MSG_host_get_name(MSG_host_self());
+  p->me = xbt_new(char, HOSTNAME_LENGTH);
+  /* Set mailbox name: use host number from argv or hostname if no argument given */
+  if (argc > 1) {
+    snprintf(p->me, HOSTNAME_LENGTH, "host%s", argv[1]);
+  } else {
+    strncpy(p->me, MSG_host_get_name(MSG_host_self()), HOSTNAME_LENGTH);
+  }
 }
 
 void peer_shutdown(peer_t p)
@@ -105,6 +111,7 @@ void peer_shutdown(peer_t p)
 
   xbt_assert(xbt_dynar_length(p->pending_sends) == 0, "Shutdown failed, sends still pending after deadline");
   xbt_dynar_free(&p->pending_sends);
+  xbt_free(p->me);
 
   xbt_free(p);
 }
@@ -117,7 +124,7 @@ int peer(int argc, char *argv[])
 
   XBT_INFO("peer");
 
-  peer_init(p);
+  peer_init(p, argc, argv);
   status = peer_wait_for_message(p);
   peer_shutdown(p);
 
index b7a9529..5827862 100644 (file)
@@ -5,6 +5,7 @@
 #include "xbt/sysdep.h"
 
 #include "messages.h"
+#include "common.h"
 
 #define PEER_SHUTDOWN_DEADLINE 6000
 
@@ -13,7 +14,7 @@ typedef struct s_peer {
   int init;
   const char *prev;
   const char *next;
-  const char *me;
+  char *me;
   int pieces;
   xbt_dynar_t pending_sends;
   int close_asap; /* TODO: unused */
@@ -24,7 +25,7 @@ msg_error_t peer_wait_for_message(peer_t peer);
 int peer_execute_task(peer_t peer, msg_task_t task);
 void peer_init_chain(peer_t peer, message_t msg);
 void peer_shutdown(peer_t p);
-void peer_init(peer_t p);
+void peer_init(peer_t p, int argc, char *argv[]);
 
 int peer(int argc, char *argv[]);