Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Code is now modular and tidy
authorMaximiliano Geier <maximiliano.geier@loria.fr>
Thu, 11 Oct 2012 13:29:43 +0000 (15:29 +0200)
committerMaximiliano Geier <maximiliano.geier@loria.fr>
Wed, 28 Nov 2012 10:49:25 +0000 (11:49 +0100)
12 files changed:
examples/msg/kadeploy/CMakeLists.txt
examples/msg/kadeploy/broadcaster.c [new file with mode: 0644]
examples/msg/kadeploy/broadcaster.h [new file with mode: 0644]
examples/msg/kadeploy/common.c [new file with mode: 0644]
examples/msg/kadeploy/common.h [new file with mode: 0644]
examples/msg/kadeploy/iterator.c [new file with mode: 0644]
examples/msg/kadeploy/iterator.h [new file with mode: 0644]
examples/msg/kadeploy/kadeploy.c
examples/msg/kadeploy/messages.c [new file with mode: 0644]
examples/msg/kadeploy/messages.h [new file with mode: 0644]
examples/msg/kadeploy/peer.c [new file with mode: 0644]
examples/msg/kadeploy/peer.h [new file with mode: 0644]

index c3f9e4f..f28d923 100644 (file)
@@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 2.6)
 
 set(EXECUTABLE_OUTPUT_PATH "${CMAKE_CURRENT_BINARY_DIR}")
 
 
 set(EXECUTABLE_OUTPUT_PATH "${CMAKE_CURRENT_BINARY_DIR}")
 
-add_executable(kadeploy kadeploy.c)
+add_executable(kadeploy kadeploy.c iterator.c common.c messages.c broadcaster.c peer.c)
 
 ### Add definitions for compile
 target_link_libraries(kadeploy simgrid m )
 
 ### Add definitions for compile
 target_link_libraries(kadeploy simgrid m )
@@ -23,6 +23,11 @@ set(xml_files
   )
 set(examples_src
   ${examples_src}
   )
 set(examples_src
   ${examples_src}
+  ${CMAKE_CURRENT_SOURCE_DIR}/iterator.c
+  ${CMAKE_CURRENT_SOURCE_DIR}/common.c
+  ${CMAKE_CURRENT_SOURCE_DIR}/messages.c
+  ${CMAKE_CURRENT_SOURCE_DIR}/broadcaster.c
+  ${CMAKE_CURRENT_SOURCE_DIR}/peer.c
   ${CMAKE_CURRENT_SOURCE_DIR}/kadeploy.c
   PARENT_SCOPE
   )
   ${CMAKE_CURRENT_SOURCE_DIR}/kadeploy.c
   PARENT_SCOPE
   )
diff --git a/examples/msg/kadeploy/broadcaster.c b/examples/msg/kadeploy/broadcaster.c
new file mode 100644 (file)
index 0000000..fea16bc
--- /dev/null
@@ -0,0 +1,160 @@
+#include "broadcaster.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(msg_broadcaster,
+                             "Messages specific for kadeploy");
+
+xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
+{
+  xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
+  char *hostname = NULL;
+  msg_host_t h = NULL;
+  int i = 1;
+  
+  for (; i < hostcount+1; i++) {
+    hostname = xbt_new(char, HOSTNAME_LENGTH);
+    snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
+    //XBT_INFO("%s", hostname);
+    h = MSG_get_host_by_name(hostname);
+    if (h == NULL) {
+      XBT_INFO("Unknown host %s. Stopping Now! ", hostname);
+      abort();
+    } else {
+      xbt_dynar_push(host_list, &hostname);
+    }
+  }
+  return host_list;
+}
+
+/*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[])
+{
+  xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
+  msg_host_t h = NULL;
+  int i = 1;
+  
+  for (; i < argc; i++) {
+    XBT_INFO("host%d = %s", i, argv[i]);
+    h = MSG_get_host_by_name(argv[i]);
+    if (h == NULL) {
+      XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
+      abort();
+    } else {
+      xbt_dynar_push(host_list, &(argv[i]));
+    }
+  }
+  return host_list;
+}*/
+
+void delete_hostlist(xbt_dynar_t h)
+{
+  xbt_dynar_free(&h);
+}
+
+int broadcaster_build_chain(const char **first, xbt_dynar_t host_list)
+{
+  xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
+  msg_task_t task = NULL;
+  char **cur = (char**)xbt_dynar_iterator_next(it);
+  const char *me = MSG_host_get_name(MSG_host_self());
+  const char *current_host = NULL;
+  const char *prev = NULL;
+  const char *next = NULL;
+  const char *last = NULL;
+
+  /* Build the chain if there's at least one peer */
+  if (cur != NULL) {
+    /* init: prev=NULL, host=current cur, next=next cur */
+    next = *cur;
+    *first = next;
+
+    /* This iterator iterates one step ahead: cur is current iterated element, 
+       but it's actually the next one in the chain */
+    do {
+      /* following steps: prev=last, host=next, next=cur */
+      cur = (char**)xbt_dynar_iterator_next(it);
+      prev = last;
+      current_host = next;
+      if (cur != NULL)
+        next = *cur;
+      else
+        next = NULL;
+      //XBT_INFO("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
+    
+      /* Send message to current peer */
+      task = task_message_chain_new(me, current_host, prev, next);
+      //MSG_task_set_category(task, current_host);
+      MSG_task_send(task, current_host);
+
+      last = current_host;
+    } while (cur != NULL);
+  }
+  xbt_dynar_iterator_delete(it);
+
+  return MSG_OK;
+}
+
+int broadcaster_send_file(const char *first)
+{
+  const char *me = MSG_host_get_name(MSG_host_self());
+  msg_task_t task = NULL;
+  msg_comm_t comm = NULL;
+  int status;
+
+  int piece_count = PIECE_COUNT;
+  int cur = 0;
+
+  for (; cur < piece_count; cur++) {
+    task = task_message_data_new(me, first, NULL, 0);
+    XBT_INFO("Sending (send) from %s into mailbox %s", me, first);
+    status = MSG_task_send(task, first);
+   
+    xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
+  }
+
+  return MSG_OK;
+}
+
+/* FIXME: I should iterate nodes in the same order as the one used to build the chain */
+int broadcaster_finish(xbt_dynar_t host_list)
+{
+  xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
+  msg_task_t task = NULL;
+  const char *me = MSG_host_get_name(MSG_host_self());
+  const char *current_host = NULL;
+  char **cur = NULL;
+
+  /* Send goodbye message to every peer */
+  for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
+    /* Send message to current peer */
+    current_host = *cur;
+    task = task_message_end_data_new(me, current_host);
+    //MSG_task_set_category(task, current_host);
+    MSG_task_send(task, current_host);
+  }
+
+  return MSG_OK;
+}
+
+
+/** Emitter function  */
+int broadcaster(int argc, char *argv[])
+{
+  xbt_dynar_t host_list = NULL;
+  const char *first = NULL;
+  int status = !MSG_OK;
+
+  XBT_INFO("broadcaster");
+
+  /* Check that every host given by the hostcount in argv[1] exists and add it
+     to a dynamic array */
+  host_list = build_hostlist_from_hostcount(atoi(argv[1]));
+  /*host_list = build_hostlist_from_argv(argc, argv);*/
+  
+  /* TODO: Error checking */
+  status = broadcaster_build_chain(&first, host_list);
+  status = broadcaster_send_file(first);
+  status = broadcaster_finish(host_list);
+
+  delete_hostlist(host_list);
+
+  return status;
+}
diff --git a/examples/msg/kadeploy/broadcaster.h b/examples/msg/kadeploy/broadcaster.h
new file mode 100644 (file)
index 0000000..2c55bee
--- /dev/null
@@ -0,0 +1,28 @@
+#ifndef KADEPLOY_BROADCASTER_H
+#define KADEPLOY_BROADCASTER_H
+
+#include "msg/msg.h"
+#include "xbt/sysdep.h"
+
+/* Create a log channel to have nice outputs. */
+#include "xbt/log.h"
+#include "xbt/asserts.h"
+
+#include "messages.h"
+#include "iterator.h"
+
+#define HOSTNAME_LENGTH 20
+#define PIECE_COUNT 1000
+
+xbt_dynar_t build_hostlist_from_hostcount(int hostcount); 
+/*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[]);*/
+
+/* Broadcaster: helper functions */
+int broadcaster_build_chain(const char **first, xbt_dynar_t host_list);
+int broadcaster_send_file(const char *first);
+int broadcaster_finish(xbt_dynar_t host_list);
+
+/* Tasks */
+int broadcaster(int argc, char *argv[]);
+
+#endif /* KADEPLOY_BROADCASTER_H */
diff --git a/examples/msg/kadeploy/common.c b/examples/msg/kadeploy/common.c
new file mode 100644 (file)
index 0000000..eefb02d
--- /dev/null
@@ -0,0 +1,26 @@
+#include "common.h"
+
+inline void queue_pending_connection(msg_comm_t comm, xbt_dynar_t q)
+{
+  xbt_dynar_push(q, &comm);
+}
+
+int process_pending_connections(xbt_dynar_t q)
+{
+  unsigned int iter;
+  int status;
+  int empty = 0;
+  msg_comm_t comm;
+
+  xbt_dynar_foreach(q, iter, comm) {
+    empty = 1;
+    if (MSG_comm_test(comm)) {
+      MSG_comm_destroy(comm);
+      status = MSG_comm_get_status(comm);
+      xbt_assert(status == MSG_OK, __FILE__ ": process_pending_connections() failed");
+      xbt_dynar_cursor_rm(q, &iter);
+      empty = 0;
+    }
+  }
+  return empty;
+}
diff --git a/examples/msg/kadeploy/common.h b/examples/msg/kadeploy/common.h
new file mode 100644 (file)
index 0000000..68bdc33
--- /dev/null
@@ -0,0 +1,11 @@
+#ifndef KADEPLOY_COMMON_H
+#define KADEPLOY_COMMON_H
+
+#include "msg/msg.h"
+#include "xbt/sysdep.h"
+
+#define MESSAGE_SIZE 1
+
+
+
+#endif /* KADEPLOY_COMMON_H */
diff --git a/examples/msg/kadeploy/iterator.c b/examples/msg/kadeploy/iterator.c
new file mode 100644 (file)
index 0000000..a4788bc
--- /dev/null
@@ -0,0 +1,96 @@
+#include "iterator.h"
+
+/* http://stackoverflow.com/a/3348142 */
+static int rand_int(int n)
+{
+  int limit = RAND_MAX - RAND_MAX % n;
+  int rnd;
+
+  do {
+    rnd = rand();
+  } while (rnd >= limit);
+  
+  return rnd % n;
+}
+
+void xbt_dynar_shuffle_in_place(xbt_dynar_t indices_list)
+{
+  int i, j;
+
+  for (i = xbt_dynar_length(indices_list) - 1; i > 0; i--) {
+    j = rand_int(i + 1);
+    xbt_dynar_swap_elements(indices_list, int, i, j);
+  }
+}
+/**************************************/
+
+/* Allocates and initializes a new xbt_dynar iterator for list, using criteria_fn as iteration criteria
+   criteria_fn: given an array size, it must generate a list containing the indices of every item in some order */
+xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, xbt_dynar_t (*criteria_fn)(int))
+{
+  xbt_dynar_iterator_t it = xbt_new(xbt_dynar_iterator_s, 1);
+  
+  it->list = list;
+  it->length = xbt_dynar_length(list);
+  it->indices_list = criteria_fn(it->length); //xbt_dynar_new(sizeof(int), NULL);
+  it->criteria_fn = criteria_fn;
+  it->current = 0;
+}
+
+void xbt_dynar_iterator_reset(xbt_dynar_iterator_t it)
+{
+  xbt_dynar_free_container(&(it->indices_list));
+  it->indices_list = it->criteria_fn(it->length);
+  it->current = 0;
+}
+
+void xbt_dynar_iterator_seek(xbt_dynar_iterator_t it, int pos)
+{
+  it->current = pos;
+}
+
+/* Returns the next element iterated by iterator it, NULL if there are no more elements */
+void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it)
+{
+  int *next;
+  //XBT_INFO("%d current\n", next);
+  if (it->current >= it->length) {
+    //XBT_INFO("Nothing to return!\n");
+    return NULL;
+  } else {
+    next = xbt_dynar_get_ptr(it->indices_list, it->current);
+    it->current++;
+    return xbt_dynar_get_ptr(it->list, *next);
+  }
+}
+
+void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it)
+{
+  xbt_dynar_free_container(&(it->indices_list));
+  xbt_free_ref(&it);
+}
+
+xbt_dynar_t forward_indices_list(int size)
+{
+  xbt_dynar_t indices_list = xbt_dynar_new(sizeof(int), NULL);
+  int i;
+  for (i = 0; i < size; i++)
+    xbt_dynar_push_as(indices_list, int, i);
+  return indices_list;
+}
+
+xbt_dynar_t reverse_indices_list(int size)
+{
+  xbt_dynar_t indices_list = xbt_dynar_new(sizeof(int), NULL);
+  int i;
+  for (i = size-1; i >= 0; i--)
+    xbt_dynar_push_as(indices_list, int, i);
+  return indices_list;
+}
+
+xbt_dynar_t random_indices_list(int size)
+{
+  xbt_dynar_t indices_list = forward_indices_list(size);
+  xbt_dynar_shuffle_in_place(indices_list);
+  return indices_list;
+}
diff --git a/examples/msg/kadeploy/iterator.h b/examples/msg/kadeploy/iterator.h
new file mode 100644 (file)
index 0000000..59a42a2
--- /dev/null
@@ -0,0 +1,41 @@
+#ifndef KADEPLOY_ITERATOR_H
+#define KADEPLOY_ITERATOR_H
+
+#include <stdlib.h>
+
+#include "xbt/dynar.h"
+#include "xbt/sysdep.h"
+
+/* Random iterator for xbt_dynar */
+typedef struct xbt_dynar_iterator_struct {
+  xbt_dynar_t list;
+  xbt_dynar_t indices_list;
+  int current;
+  unsigned long length;
+  xbt_dynar_t (*criteria_fn)(int size);
+} *xbt_dynar_iterator_t;
+typedef struct xbt_dynar_iterator_struct xbt_dynar_iterator_s;
+
+/* Iterator methods */
+xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, xbt_dynar_t (*criteria_fn)(int));
+void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it);
+void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it);
+
+/* Iterator generators */
+xbt_dynar_t forward_indices_list(int size);
+xbt_dynar_t reverse_indices_list(int size);
+xbt_dynar_t random_indices_list(int size);
+
+/* Shuffle */
+/**************************************/
+static int rand_int(int n);
+void xbt_dynar_shuffle_in_place(xbt_dynar_t indices_list);
+
+#define xbt_dynar_swap_elements(d, type, i, j) \
+  type tmp; \
+  tmp = xbt_dynar_get_as(indices_list, (unsigned int)j, type); \
+  xbt_dynar_set_as(indices_list, (unsigned int)j, type, \
+    xbt_dynar_get_as(indices_list, (unsigned int)i, type)); \
+  xbt_dynar_set_as(indices_list, (unsigned int)i, type, tmp);
+
+#endif /* KADEPLOY_ITERATOR_H */
index 171f3d1..8a784cc 100644 (file)
 #include "xbt/log.h"
 #include "xbt/asserts.h"
 
 #include "xbt/log.h"
 #include "xbt/asserts.h"
 
+#include "iterator.h"
+#include "messages.h"
+#include "broadcaster.h"
+#include "peer.h"
+
 /** @addtogroup MSG_examples
  * 
  *  - <b>kadeploy/kadeploy.c: Kadeploy implementation</b>.
 /** @addtogroup MSG_examples
  * 
  *  - <b>kadeploy/kadeploy.c: Kadeploy implementation</b>.
 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kadeploy,
                              "Messages specific for kadeploy");
 
 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kadeploy,
                              "Messages specific for kadeploy");
 
-#define MESSAGE_SIZE 1
-#define PIECE_COUNT 1000
-#define HOSTNAME_LENGTH 20
-
-#define PEER_SHUTDOWN_DEADLINE 6000
-
 /*
  Data structures
  */
 
 /*
  Data structures
  */
 
-/* Random iterator for xbt_dynar */
-typedef struct xbt_dynar_iterator_struct {
-  xbt_dynar_t list;
-  xbt_dynar_t indices_list;
-  int current;
-  unsigned long length;
-  xbt_dynar_t (*criteria_fn)(int size);
-} *xbt_dynar_iterator_t;
-typedef struct xbt_dynar_iterator_struct xbt_dynar_iterator_s;
-
-/* Messages enum */
-typedef enum {
-  MESSAGE_BUILD_CHAIN = 0,
-  MESSAGE_SEND_DATA,
-  MESSAGE_END_DATA
-} e_message_type;
-
-/* Message struct */
-typedef struct s_message {
-  e_message_type type;
-  const char *issuer_hostname;
-  const char *mailbox;
-  const char *prev_hostname;
-  const char *next_hostname;
-  const char *data_block;
-  unsigned int data_length;
-} s_message_t, *message_t;
-
-/* Peer struct */
-typedef struct s_peer {
-  int init;
-  const char *prev;
-  const char *next;
-  const char *me;
-  int pieces;
-  xbt_dynar_t pending_sends;
-  int close_asap; /* TODO: unused */
-} s_peer_t, *peer_t;
-
-/* Iterator methods */
-xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, xbt_dynar_t (*criteria_fn)(int));
-void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it);
-void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it);
-
-/* Iterator generators */
-xbt_dynar_t forward_indices_list(int size);
-xbt_dynar_t reverse_indices_list(int size);
-xbt_dynar_t random_indices_list(int size);
-
-/* Message methods */
-msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox);
-msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next);
-msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len);
-msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox);
-void task_message_delete(void *);
-
-/* Tasks */
-int broadcaster(int argc, char *argv[]);
-int peer(int argc, char *argv[]);
-
-xbt_dynar_t build_hostlist_from_hostcount(int hostcount); 
-/*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[]);*/
-
-/* Broadcaster: helper functions */
-int broadcaster_build_chain(const char **first, xbt_dynar_t host_list);
-int broadcaster_send_file(const char *first);
-int broadcaster_finish(xbt_dynar_t host_list);
-
-/* Peer: helper functions */
-msg_error_t peer_wait_for_message(peer_t peer);
-int peer_execute_task(peer_t peer, msg_task_t task);
-void peer_init_chain(peer_t peer, message_t msg);
-void peer_shutdown(peer_t p);
-void peer_init(peer_t p);
-
 /* Initialization stuff */
 msg_error_t test_all(const char *platform_file,
                      const char *application_file);
 
 /* Initialization stuff */
 msg_error_t test_all(const char *platform_file,
                      const char *application_file);
 
-/* Shuffle */
-/**************************************/
-static int rand_int(int n);
-void xbt_dynar_shuffle_in_place(xbt_dynar_t indices_list);
-
-#define xbt_dynar_swap_elements(d, type, i, j) \
-  type tmp; \
-  tmp = xbt_dynar_get_as(indices_list, (unsigned int)j, type); \
-  xbt_dynar_set_as(indices_list, (unsigned int)j, type, \
-    xbt_dynar_get_as(indices_list, (unsigned int)i, type)); \
-  xbt_dynar_set_as(indices_list, (unsigned int)i, type, tmp);
-
-/* http://stackoverflow.com/a/3348142 */
-static int rand_int(int n)
-{
-  int limit = RAND_MAX - RAND_MAX % n;
-  int rnd;
-
-  do {
-    rnd = rand();
-  } while (rnd >= limit);
-  
-  return rnd % n;
-}
-
-void xbt_dynar_shuffle_in_place(xbt_dynar_t indices_list)
-{
-  int i, j;
-
-  for (i = xbt_dynar_length(indices_list) - 1; i > 0; i--) {
-    j = rand_int(i + 1);
-    xbt_dynar_swap_elements(indices_list, int, i, j);
-  }
-}
-/**************************************/
-
-/* Allocates and initializes a new xbt_dynar iterator for list, using criteria_fn as iteration criteria
-   criteria_fn: given an array size, it must generate a list containing the indices of every item in some order */
-xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, xbt_dynar_t (*criteria_fn)(int))
-{
-  xbt_dynar_iterator_t it = xbt_new(xbt_dynar_iterator_s, 1);
-  
-  it->list = list;
-  it->length = xbt_dynar_length(list);
-  it->indices_list = criteria_fn(it->length); //xbt_dynar_new(sizeof(int), NULL);
-  it->criteria_fn = criteria_fn;
-  it->current = 0;
-}
-
-void xbt_dynar_iterator_reset(xbt_dynar_iterator_t it)
-{
-  xbt_dynar_free_container(&(it->indices_list));
-  it->indices_list = it->criteria_fn(it->length);
-  it->current = 0;
-}
-
-void xbt_dynar_iterator_seek(xbt_dynar_iterator_t it, int pos)
-{
-  it->current = pos;
-}
-
-/* Returns the next element iterated by iterator it, NULL if there are no more elements */
-void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it)
-{
-  int *next;
-  //XBT_INFO("%d current\n", next);
-  if (it->current >= it->length) {
-    //XBT_INFO("Nothing to return!\n");
-    return NULL;
-  } else {
-    next = xbt_dynar_get_ptr(it->indices_list, it->current);
-    it->current++;
-    return xbt_dynar_get_ptr(it->list, *next);
-  }
-}
-
-void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it)
-{
-  xbt_dynar_free_container(&(it->indices_list));
-  xbt_free_ref(&it);
-}
-
-xbt_dynar_t forward_indices_list(int size)
-{
-  xbt_dynar_t indices_list = xbt_dynar_new(sizeof(int), NULL);
-  int i;
-  for (i = 0; i < size; i++)
-    xbt_dynar_push_as(indices_list, int, i);
-  return indices_list;
-}
-
-xbt_dynar_t reverse_indices_list(int size)
-{
-  xbt_dynar_t indices_list = xbt_dynar_new(sizeof(int), NULL);
-  int i;
-  for (i = size-1; i >= 0; i--)
-    xbt_dynar_push_as(indices_list, int, i);
-  return indices_list;
-}
-
-xbt_dynar_t random_indices_list(int size)
-{
-  xbt_dynar_t indices_list = forward_indices_list(size);
-  xbt_dynar_shuffle_in_place(indices_list);
-  return indices_list;
-}
-
-
-msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox)
-{
-  message_t msg = xbt_new(s_message_t, 1);
-  msg->type = type;
-  msg->issuer_hostname = issuer_hostname;
-  msg->mailbox = mailbox;
-  msg_task_t task = MSG_task_create(NULL, 0, MESSAGE_SIZE, msg); 
-
-  return task;
-}
-
-msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next)
-{
-  msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, issuer_hostname, mailbox);
-  message_t msg = MSG_task_get_data(task);
-  msg->prev_hostname = prev;
-  msg->next_hostname = next;
-
-  return task;
-}
-
-msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
-{
-  msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox);
-  if (strcmp(mailbox, "host4") == 0) MSG_task_set_category(task, mailbox);
-  message_t msg = MSG_task_get_data(task);
-  msg->data_block = block;
-  msg->data_length = len;
-
-  return task;
-}
-
-msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox)
-{
-  return task_message_new(MESSAGE_END_DATA, issuer_hostname, mailbox);
-}
-
-void task_message_delete(void *task)
-{
-  message_t msg = MSG_task_get_data(task);
-  xbt_free(msg);
-  MSG_task_destroy(task);
-}
-
-inline void queue_pending_connection(msg_comm_t comm, xbt_dynar_t q)
-{
-  xbt_dynar_push(q, &comm);
-}
-
-int process_pending_connections(xbt_dynar_t q)
-{
-  unsigned int iter;
-  int status;
-  int empty = 0;
-  msg_comm_t comm;
-
-  xbt_dynar_foreach(q, iter, comm) {
-    empty = 1;
-    if (MSG_comm_test(comm)) {
-      MSG_comm_destroy(comm);
-      status = MSG_comm_get_status(comm);
-      xbt_assert(status == MSG_OK, __FILE__ ": process_pending_connections() failed");
-      xbt_dynar_cursor_rm(q, &iter);
-      empty = 0;
-    }
-  }
-  return empty;
-}
-
-xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
-{
-  xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
-  char *hostname = NULL;
-  msg_host_t h = NULL;
-  int i = 1;
-  
-  for (; i < hostcount+1; i++) {
-    hostname = xbt_new(char, HOSTNAME_LENGTH);
-    snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
-    //XBT_INFO("%s", hostname);
-    h = MSG_get_host_by_name(hostname);
-    if (h == NULL) {
-      XBT_INFO("Unknown host %s. Stopping Now! ", hostname);
-      abort();
-    } else {
-      xbt_dynar_push(host_list, &hostname);
-    }
-  }
-  return host_list;
-}
-
-/*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[])
-{
-  xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
-  msg_host_t h = NULL;
-  int i = 1;
-  
-  for (; i < argc; i++) {
-    XBT_INFO("host%d = %s", i, argv[i]);
-    h = MSG_get_host_by_name(argv[i]);
-    if (h == NULL) {
-      XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
-      abort();
-    } else {
-      xbt_dynar_push(host_list, &(argv[i]));
-    }
-  }
-  return host_list;
-}*/
-
-void delete_hostlist(xbt_dynar_t h)
-{
-  xbt_dynar_free(&h);
-}
-
-int broadcaster_build_chain(const char **first, xbt_dynar_t host_list)
-{
-  xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
-  msg_task_t task = NULL;
-  char **cur = (char**)xbt_dynar_iterator_next(it);
-  const char *me = MSG_host_get_name(MSG_host_self());
-  const char *current_host = NULL;
-  const char *prev = NULL;
-  const char *next = NULL;
-  const char *last = NULL;
-
-  /* Build the chain if there's at least one peer */
-  if (cur != NULL) {
-    /* init: prev=NULL, host=current cur, next=next cur */
-    next = *cur;
-    *first = next;
-
-    /* This iterator iterates one step ahead: cur is current iterated element, 
-       but it's actually the next one in the chain */
-    do {
-      /* following steps: prev=last, host=next, next=cur */
-      cur = (char**)xbt_dynar_iterator_next(it);
-      prev = last;
-      current_host = next;
-      if (cur != NULL)
-        next = *cur;
-      else
-        next = NULL;
-      //XBT_INFO("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
-    
-      /* Send message to current peer */
-      task = task_message_chain_new(me, current_host, prev, next);
-      //MSG_task_set_category(task, current_host);
-      MSG_task_send(task, current_host);
-
-      last = current_host;
-    } while (cur != NULL);
-  }
-  xbt_dynar_iterator_delete(it);
-
-  return MSG_OK;
-}
-
-int broadcaster_send_file(const char *first)
-{
-  const char *me = MSG_host_get_name(MSG_host_self());
-  msg_task_t task = NULL;
-  msg_comm_t comm = NULL;
-  int status;
-
-  int piece_count = PIECE_COUNT;
-  int cur = 0;
-
-  for (; cur < piece_count; cur++) {
-    task = task_message_data_new(me, first, NULL, 0);
-    XBT_INFO("Sending (send) from %s into mailbox %s", me, first);
-    status = MSG_task_send(task, first);
-   
-    xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
-  }
-
-  return MSG_OK;
-}
-
-/* FIXME: I should iterate nodes in the same order as the one used to build the chain */
-int broadcaster_finish(xbt_dynar_t host_list)
-{
-  xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
-  msg_task_t task = NULL;
-  const char *me = MSG_host_get_name(MSG_host_self());
-  const char *current_host = NULL;
-  char **cur = NULL;
-
-  /* Send goodbye message to every peer */
-  for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
-    /* Send message to current peer */
-    current_host = *cur;
-    task = task_message_end_data_new(me, current_host);
-    //MSG_task_set_category(task, current_host);
-    MSG_task_send(task, current_host);
-  }
-
-  return MSG_OK;
-}
-
-
-/** Emitter function  */
-int broadcaster(int argc, char *argv[])
-{
-  xbt_dynar_t host_list = NULL;
-  const char *first = NULL;
-  int status = !MSG_OK;
-
-  XBT_INFO("broadcaster");
-
-  /* Check that every host given by the hostcount in argv[1] exists and add it
-     to a dynamic array */
-  host_list = build_hostlist_from_hostcount(atoi(argv[1]));
-  /*host_list = build_hostlist_from_argv(argc, argv);*/
-  
-  /* TODO: Error checking */
-  status = broadcaster_build_chain(&first, host_list);
-  status = broadcaster_send_file(first);
-  status = broadcaster_finish(host_list);
-
-  delete_hostlist(host_list);
-
-  return status;
-}
-
-/*******************************************************
- *                     Peer                            *
- *******************************************************/
-
-void peer_init_chain(peer_t peer, message_t msg)
-{
-  peer->prev = msg->prev_hostname;
-  peer->next = msg->next_hostname;
-  peer->init = 1;
-}
-
-void peer_forward_msg(peer_t peer, message_t msg)
-{
-  int status;
-  msg_task_t task = task_message_data_new(peer->me, peer->next, NULL, 0);
-  msg_comm_t comm = NULL;
-  XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
-  comm = MSG_task_isend(task, peer->next);
-  queue_pending_connection(comm, peer->pending_sends);
-}
-
-int peer_execute_task(peer_t peer, msg_task_t task)
-{
-  int done = 0;
-  message_t msg = MSG_task_get_data(task);
-  
-  //XBT_INFO("Peer %s got message of type %d\n", peer->me, msg->type);
-  switch (msg->type) {
-    case MESSAGE_BUILD_CHAIN:
-      peer_init_chain(peer, msg);
-      break;
-    case MESSAGE_SEND_DATA:
-      xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
-      if (peer->next != NULL)
-        peer_forward_msg(peer, msg);
-      peer->pieces++;
-      break;
-    case MESSAGE_END_DATA:
-      xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
-      done = 1;
-      XBT_INFO("%d pieces receieved", peer->pieces);
-      break;
-  }
-
-  MSG_task_execute(task);
-
-  return done;
-}
-
-msg_error_t peer_wait_for_message(peer_t peer)
-{
-  msg_error_t status;
-  msg_comm_t comm = NULL;
-  msg_task_t task = NULL;
-  int done = 0;
-
-  while (!done) {
-    if (comm == NULL)
-      comm = MSG_task_irecv(&task, peer->me);
-
-    if (MSG_comm_test(comm)) {
-      status = MSG_comm_get_status(comm);
-      //XBT_INFO("peer_wait_for_message: error code = %d", status);
-      xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed");
-      MSG_comm_destroy(comm);
-      comm = NULL;
-      done = peer_execute_task(peer, task);
-      task_message_delete(task);
-      task = NULL;
-    } else {
-      process_pending_connections(peer->pending_sends);
-      MSG_process_sleep(0.1);
-    }
-  }
-
-  return status;
-}
-
-void peer_init(peer_t p)
-{
-  p->init = 0;
-  p->prev = NULL;
-  p->next = NULL;
-  p->pieces = 0;
-  p->close_asap = 0;
-  p->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
-  p->me = MSG_host_get_name(MSG_host_self());
-}
-
-void peer_shutdown(peer_t p)
-{
-  float start_time = MSG_get_clock();
-  float end_time = start_time + PEER_SHUTDOWN_DEADLINE;
-
-  XBT_INFO("Waiting for sends to finish before shutdown...");
-  while (xbt_dynar_length(p->pending_sends) && MSG_get_clock() < end_time) {
-    process_pending_connections(p->pending_sends);
-    MSG_process_sleep(0.1);
-  }
-
-  xbt_assert(xbt_dynar_length(p->pending_sends) == 0, "Shutdown failed, sends still pending after deadline");
-  xbt_dynar_free(&p->pending_sends);
-
-  xbt_free(p);
-}
-
-/** Peer function  */
-int peer(int argc, char *argv[])
-{
-  peer_t p = xbt_new(s_peer_t, 1);
-  msg_error_t status;
-
-  XBT_INFO("peer");
-
-  peer_init(p);
-  status = peer_wait_for_message(p);
-  peer_shutdown(p);
-
-  return MSG_OK;
-}                               /* end_of_receiver */
-
 
 /** Test function */
 msg_error_t test_all(const char *platform_file,
 
 /** Test function */
 msg_error_t test_all(const char *platform_file,
diff --git a/examples/msg/kadeploy/messages.c b/examples/msg/kadeploy/messages.c
new file mode 100644 (file)
index 0000000..7b661a3
--- /dev/null
@@ -0,0 +1,45 @@
+#include "messages.h"
+
+msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox)
+{
+  message_t msg = xbt_new(s_message_t, 1);
+  msg->type = type;
+  msg->issuer_hostname = issuer_hostname;
+  msg->mailbox = mailbox;
+  msg_task_t task = MSG_task_create(NULL, 0, MESSAGE_SIZE, msg); 
+
+  return task;
+}
+
+msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next)
+{
+  msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, issuer_hostname, mailbox);
+  message_t msg = MSG_task_get_data(task);
+  msg->prev_hostname = prev;
+  msg->next_hostname = next;
+
+  return task;
+}
+
+msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
+{
+  msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox);
+  if (strcmp(mailbox, "host4") == 0) MSG_task_set_category(task, mailbox);
+  message_t msg = MSG_task_get_data(task);
+  msg->data_block = block;
+  msg->data_length = len;
+
+  return task;
+}
+
+msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox)
+{
+  return task_message_new(MESSAGE_END_DATA, issuer_hostname, mailbox);
+}
+
+void task_message_delete(void *task)
+{
+  message_t msg = MSG_task_get_data(task);
+  xbt_free(msg);
+  MSG_task_destroy(task);
+}
diff --git a/examples/msg/kadeploy/messages.h b/examples/msg/kadeploy/messages.h
new file mode 100644 (file)
index 0000000..8e89fd6
--- /dev/null
@@ -0,0 +1,34 @@
+#ifndef KADEPLOY_MESSAGES_H
+#define KADEPLOY_MESSAGES_H
+
+#include "msg/msg.h"
+#include "xbt/sysdep.h"
+
+#define MESSAGE_SIZE 1
+
+/* Messages enum */
+typedef enum {
+  MESSAGE_BUILD_CHAIN = 0,
+  MESSAGE_SEND_DATA,
+  MESSAGE_END_DATA
+} e_message_type;
+
+/* Message struct */
+typedef struct s_message {
+  e_message_type type;
+  const char *issuer_hostname;
+  const char *mailbox;
+  const char *prev_hostname;
+  const char *next_hostname;
+  const char *data_block;
+  unsigned int data_length;
+} s_message_t, *message_t;
+
+/* Message methods */
+msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox);
+msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next);
+msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len);
+msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox);
+void task_message_delete(void *);
+
+#endif /* KADEPLOY_MESSAGES_H */
diff --git a/examples/msg/kadeploy/peer.c b/examples/msg/kadeploy/peer.c
new file mode 100644 (file)
index 0000000..dcb8329
--- /dev/null
@@ -0,0 +1,127 @@
+#include "peer.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peer,
+                             "Messages specific for kadeploy");
+
+/*******************************************************
+ *                     Peer                            *
+ *******************************************************/
+
+void peer_init_chain(peer_t peer, message_t msg)
+{
+  peer->prev = msg->prev_hostname;
+  peer->next = msg->next_hostname;
+  peer->init = 1;
+}
+
+void peer_forward_msg(peer_t peer, message_t msg)
+{
+  int status;
+  msg_task_t task = task_message_data_new(peer->me, peer->next, NULL, 0);
+  msg_comm_t comm = NULL;
+  XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
+  comm = MSG_task_isend(task, peer->next);
+  queue_pending_connection(comm, peer->pending_sends);
+}
+
+int peer_execute_task(peer_t peer, msg_task_t task)
+{
+  int done = 0;
+  message_t msg = MSG_task_get_data(task);
+  
+  //XBT_INFO("Peer %s got message of type %d\n", peer->me, msg->type);
+  switch (msg->type) {
+    case MESSAGE_BUILD_CHAIN:
+      peer_init_chain(peer, msg);
+      break;
+    case MESSAGE_SEND_DATA:
+      xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
+      if (peer->next != NULL)
+        peer_forward_msg(peer, msg);
+      peer->pieces++;
+      break;
+    case MESSAGE_END_DATA:
+      xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
+      done = 1;
+      XBT_INFO("%d pieces receieved", peer->pieces);
+      break;
+  }
+
+  MSG_task_execute(task);
+
+  return done;
+}
+
+msg_error_t peer_wait_for_message(peer_t peer)
+{
+  msg_error_t status;
+  msg_comm_t comm = NULL;
+  msg_task_t task = NULL;
+  int done = 0;
+
+  while (!done) {
+    if (comm == NULL)
+      comm = MSG_task_irecv(&task, peer->me);
+
+    if (MSG_comm_test(comm)) {
+      status = MSG_comm_get_status(comm);
+      //XBT_INFO("peer_wait_for_message: error code = %d", status);
+      xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed");
+      MSG_comm_destroy(comm);
+      comm = NULL;
+      done = peer_execute_task(peer, task);
+      task_message_delete(task);
+      task = NULL;
+    } else {
+      process_pending_connections(peer->pending_sends);
+      MSG_process_sleep(0.1);
+    }
+  }
+
+  return status;
+}
+
+void peer_init(peer_t p)
+{
+  p->init = 0;
+  p->prev = NULL;
+  p->next = NULL;
+  p->pieces = 0;
+  p->close_asap = 0;
+  p->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
+  p->me = MSG_host_get_name(MSG_host_self());
+}
+
+void peer_shutdown(peer_t p)
+{
+  float start_time = MSG_get_clock();
+  float end_time = start_time + PEER_SHUTDOWN_DEADLINE;
+
+  XBT_INFO("Waiting for sends to finish before shutdown...");
+  while (xbt_dynar_length(p->pending_sends) && MSG_get_clock() < end_time) {
+    process_pending_connections(p->pending_sends);
+    MSG_process_sleep(0.1);
+  }
+
+  xbt_assert(xbt_dynar_length(p->pending_sends) == 0, "Shutdown failed, sends still pending after deadline");
+  xbt_dynar_free(&p->pending_sends);
+
+  xbt_free(p);
+}
+
+/** Peer function  */
+int peer(int argc, char *argv[])
+{
+  peer_t p = xbt_new(s_peer_t, 1);
+  msg_error_t status;
+
+  XBT_INFO("peer");
+
+  peer_init(p);
+  status = peer_wait_for_message(p);
+  peer_shutdown(p);
+
+  return MSG_OK;
+}                               /* end_of_receiver */
+
+
diff --git a/examples/msg/kadeploy/peer.h b/examples/msg/kadeploy/peer.h
new file mode 100644 (file)
index 0000000..b7a9529
--- /dev/null
@@ -0,0 +1,31 @@
+#ifndef KADEPLOY_PEER_H
+#define KADEPLOY_PEER_H
+
+#include "msg/msg.h"
+#include "xbt/sysdep.h"
+
+#include "messages.h"
+
+#define PEER_SHUTDOWN_DEADLINE 6000
+
+/* Peer struct */
+typedef struct s_peer {
+  int init;
+  const char *prev;
+  const char *next;
+  const char *me;
+  int pieces;
+  xbt_dynar_t pending_sends;
+  int close_asap; /* TODO: unused */
+} s_peer_t, *peer_t;
+
+/* Peer: helper functions */
+msg_error_t peer_wait_for_message(peer_t peer);
+int peer_execute_task(peer_t peer, msg_task_t task);
+void peer_init_chain(peer_t peer, message_t msg);
+void peer_shutdown(peer_t p);
+void peer_init(peer_t p);
+
+int peer(int argc, char *argv[]);
+
+#endif /* KADEPLOY_PEER_H */