Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Refactored the xbt_dynar_iterator_t implementation Added reverse and random iterators
[simgrid.git] / examples / msg / kadeploy / kadeploy.c
index 93e6998..171f3d1 100644 (file)
@@ -25,10 +25,10 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kadeploy,
                              "Messages specific for kadeploy");
 
 #define MESSAGE_SIZE 1
-#define PIECE_COUNT 100
+#define PIECE_COUNT 1000
 #define HOSTNAME_LENGTH 20
 
-#define PEER_SHUTDOWN_DEADLINE 600
+#define PEER_SHUTDOWN_DEADLINE 6000
 
 /*
  Data structures
@@ -40,7 +40,7 @@ typedef struct xbt_dynar_iterator_struct {
   xbt_dynar_t indices_list;
   int current;
   unsigned long length;
-  int (*criteria_fn)(void* it);
+  xbt_dynar_t (*criteria_fn)(int size);
 } *xbt_dynar_iterator_t;
 typedef struct xbt_dynar_iterator_struct xbt_dynar_iterator_s;
 
@@ -74,10 +74,14 @@ typedef struct s_peer {
 } s_peer_t, *peer_t;
 
 /* Iterator methods */
-xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)(void*));
+xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, xbt_dynar_t (*criteria_fn)(int));
 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it);
 void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it);
-int xbt_dynar_iterator_forward_criteria(void *p);
+
+/* Iterator generators */
+xbt_dynar_t forward_indices_list(int size);
+xbt_dynar_t reverse_indices_list(int size);
+xbt_dynar_t random_indices_list(int size);
 
 /* Message methods */
 msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox);
@@ -109,31 +113,79 @@ void peer_init(peer_t p);
 msg_error_t test_all(const char *platform_file,
                      const char *application_file);
 
+/* Shuffle */
+/**************************************/
+static int rand_int(int n);
+void xbt_dynar_shuffle_in_place(xbt_dynar_t indices_list);
+
+#define xbt_dynar_swap_elements(d, type, i, j) \
+  type tmp; \
+  tmp = xbt_dynar_get_as(indices_list, (unsigned int)j, type); \
+  xbt_dynar_set_as(indices_list, (unsigned int)j, type, \
+    xbt_dynar_get_as(indices_list, (unsigned int)i, type)); \
+  xbt_dynar_set_as(indices_list, (unsigned int)i, type, tmp);
+
+/* http://stackoverflow.com/a/3348142 */
+static int rand_int(int n)
+{
+  int limit = RAND_MAX - RAND_MAX % n;
+  int rnd;
+
+  do {
+    rnd = rand();
+  } while (rnd >= limit);
+  
+  return rnd % n;
+}
+
+void xbt_dynar_shuffle_in_place(xbt_dynar_t indices_list)
+{
+  int i, j;
+
+  for (i = xbt_dynar_length(indices_list) - 1; i > 0; i--) {
+    j = rand_int(i + 1);
+    xbt_dynar_swap_elements(indices_list, int, i, j);
+  }
+}
+/**************************************/
+
 /* Allocates and initializes a new xbt_dynar iterator for list, using criteria_fn as iteration criteria
-   criteria_fn: given an iterator, it must update the iterator and give the next element's index, 
-   less than 0 otherwise*/
-xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)(void*))
+   criteria_fn: given an array size, it must generate a list containing the indices of every item in some order */
+xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, xbt_dynar_t (*criteria_fn)(int))
 {
   xbt_dynar_iterator_t it = xbt_new(xbt_dynar_iterator_s, 1);
   
   it->list = list;
   it->length = xbt_dynar_length(list);
-  it->indices_list = xbt_dynar_new(sizeof(int), NULL);
+  it->indices_list = criteria_fn(it->length); //xbt_dynar_new(sizeof(int), NULL);
   it->criteria_fn = criteria_fn;
-  it->current = -1;
+  it->current = 0;
+}
+
+void xbt_dynar_iterator_reset(xbt_dynar_iterator_t it)
+{
+  xbt_dynar_free_container(&(it->indices_list));
+  it->indices_list = it->criteria_fn(it->length);
+  it->current = 0;
+}
+
+void xbt_dynar_iterator_seek(xbt_dynar_iterator_t it, int pos)
+{
+  it->current = pos;
 }
 
 /* Returns the next element iterated by iterator it, NULL if there are no more elements */
 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it)
 {
-  int next = it->criteria_fn((xbt_dynar_iterator_t)it);
+  int *next;
   //XBT_INFO("%d current\n", next);
-  if (next < 0) {
+  if (it->current >= it->length) {
     //XBT_INFO("Nothing to return!\n");
     return NULL;
   } else {
-    xbt_dynar_push(it->indices_list, &next);
-    return xbt_dynar_get_ptr(it->list, next);
+    next = xbt_dynar_get_ptr(it->indices_list, it->current);
+    it->current++;
+    return xbt_dynar_get_ptr(it->list, *next);
   }
 }
 
@@ -143,22 +195,32 @@ void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it)
   xbt_free_ref(&it);
 }
 
-int xbt_dynar_iterator_forward_criteria(void *p)
+xbt_dynar_t forward_indices_list(int size)
 {
-  xbt_dynar_iterator_t it = (xbt_dynar_iterator_t)p;
-  int r = -1;
-  if (it->current == -1) {
-    /* iterator initialization */
-    it->current = 0;
-  }
-  if (it->current < it->length) {
-    r = it->current;
-    it->current++;
-  }
+  xbt_dynar_t indices_list = xbt_dynar_new(sizeof(int), NULL);
+  int i;
+  for (i = 0; i < size; i++)
+    xbt_dynar_push_as(indices_list, int, i);
+  return indices_list;
+}
 
-  return r;
+xbt_dynar_t reverse_indices_list(int size)
+{
+  xbt_dynar_t indices_list = xbt_dynar_new(sizeof(int), NULL);
+  int i;
+  for (i = size-1; i >= 0; i--)
+    xbt_dynar_push_as(indices_list, int, i);
+  return indices_list;
 }
 
+xbt_dynar_t random_indices_list(int size)
+{
+  xbt_dynar_t indices_list = forward_indices_list(size);
+  xbt_dynar_shuffle_in_place(indices_list);
+  return indices_list;
+}
+
+
 msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox)
 {
   message_t msg = xbt_new(s_message_t, 1);
@@ -203,7 +265,7 @@ void task_message_delete(void *task)
   MSG_task_destroy(task);
 }
 
-void queue_pending_connection(msg_comm_t comm, xbt_dynar_t q)
+inline void queue_pending_connection(msg_comm_t comm, xbt_dynar_t q)
 {
   xbt_dynar_push(q, &comm);
 }
@@ -276,7 +338,7 @@ void delete_hostlist(xbt_dynar_t h)
 
 int broadcaster_build_chain(const char **first, xbt_dynar_t host_list)
 {
-  xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria);
+  xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
   msg_task_t task = NULL;
   char **cur = (char**)xbt_dynar_iterator_next(it);
   const char *me = MSG_host_get_name(MSG_host_self());
@@ -328,24 +390,20 @@ int broadcaster_send_file(const char *first)
   int cur = 0;
 
   for (; cur < piece_count; cur++) {
-    /* TODO: stub */
     task = task_message_data_new(me, first, NULL, 0);
-    //XBT_INFO("Sending (isend) from %s into mailbox %s", me, first);
-    //comm = MSG_task_isend(task, first);
+    XBT_INFO("Sending (send) from %s into mailbox %s", me, first);
     status = MSG_task_send(task, first);
-    //MSG_task_dsend(task, first, task_message_delete);
    
-    //status = MSG_comm_wait(comm, -1);
     xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
-    //MSG_comm_destroy(comm);
   }
 
   return MSG_OK;
 }
 
+/* FIXME: I should iterate nodes in the same order as the one used to build the chain */
 int broadcaster_finish(xbt_dynar_t host_list)
 {
-  xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria);
+  xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
   msg_task_t task = NULL;
   const char *me = MSG_host_get_name(MSG_host_self());
   const char *current_host = NULL;
@@ -404,7 +462,7 @@ void peer_forward_msg(peer_t peer, message_t msg)
   int status;
   msg_task_t task = task_message_data_new(peer->me, peer->next, NULL, 0);
   msg_comm_t comm = NULL;
-  //XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
+  XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
   comm = MSG_task_isend(task, peer->next);
   queue_pending_connection(comm, peer->pending_sends);
 }
@@ -459,7 +517,7 @@ msg_error_t peer_wait_for_message(peer_t peer)
       task = NULL;
     } else {
       process_pending_connections(peer->pending_sends);
-      MSG_process_sleep(0.01);
+      MSG_process_sleep(0.1);
     }
   }