Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
chainsend: change message sizes to more realistic values, avoid passing too many...
authorMaximiliano Geier <maximiliano.geier@loria.fr>
Fri, 30 Nov 2012 10:11:05 +0000 (11:11 +0100)
committerMaximiliano Geier <maximiliano.geier@loria.fr>
Fri, 30 Nov 2012 10:37:42 +0000 (11:37 +0100)
Signed-off-by: Maximiliano Geier <maximiliano.geier@loria.fr>
examples/msg/chainsend/broadcaster.c
examples/msg/chainsend/broadcaster.h
examples/msg/chainsend/chainsend.tesh
examples/msg/chainsend/messages.c
examples/msg/chainsend/messages.h
examples/msg/chainsend/peer.c
examples/msg/chainsend/peer.h

index c953e36..e311175 100644 (file)
@@ -18,15 +18,10 @@ xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
   return host_list;
 }
 
   return host_list;
 }
 
-static void delete_hostlist(xbt_dynar_t h)
-{
-  xbt_dynar_free(&h);
-}
-
-int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar_iterator_t it)
+int broadcaster_build_chain(broadcaster_t bc)
 {
   msg_task_t task = NULL;
 {
   msg_task_t task = NULL;
-  char **cur = (char**)xbt_dynar_iterator_next(it);
+  char **cur = (char**)xbt_dynar_iterator_next(bc->it);
   const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
   const char *current_host = NULL;
   const char *prev = NULL;
   const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
   const char *current_host = NULL;
   const char *prev = NULL;
@@ -37,13 +32,13 @@ int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar
   if (cur != NULL) {
     /* init: prev=NULL, host=current cur, next=next cur */
     next = *cur;
   if (cur != NULL) {
     /* init: prev=NULL, host=current cur, next=next cur */
     next = *cur;
-    *first = next;
+    bc->first = next;
 
     /* This iterator iterates one step ahead: cur is current iterated element, 
        but it's actually the next one in the chain */
     do {
       /* following steps: prev=last, host=next, next=cur */
 
     /* This iterator iterates one step ahead: cur is current iterated element, 
        but it's actually the next one in the chain */
     do {
       /* following steps: prev=last, host=next, next=cur */
-      cur = (char**)xbt_dynar_iterator_next(it);
+      cur = (char**)xbt_dynar_iterator_next(bc->it);
       prev = last;
       current_host = next;
       if (cur != NULL)
       prev = last;
       current_host = next;
       if (cur != NULL)
@@ -64,37 +59,42 @@ int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar
   return MSG_OK;
 }
 
   return MSG_OK;
 }
 
-int broadcaster_send_file(const char *first)
+int broadcaster_send_file(broadcaster_t bc)
 {
 {
-  const char *me = MSG_host_get_name(MSG_host_self());
+  const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
+  msg_comm_t comm = NULL;
   msg_task_t task = NULL;
   int status;
 
   msg_task_t task = NULL;
   int status;
 
-  int piece_count = PIECE_COUNT;
-  int cur = 0;
-
-  for (; cur < piece_count; cur++) {
-    task = task_message_data_new(me, first, NULL, 0);
-    XBT_DEBUG("Sending (send) from %s into mailbox %s", me, first);
-    status = MSG_task_send(task, first);
-   
-    xbt_assert(status == MSG_OK, "broadcaster_send_file() failed");
+  bc->current_piece = 0;
+
+  while (bc->current_piece < bc->piece_count) {
+    if (xbt_dynar_length(bc->pending_sends) < bc->max_pending_sends) {
+      task = task_message_data_new(me, bc->first, NULL, PIECE_SIZE);
+      XBT_DEBUG("Sending (isend) piece %d from %s into mailbox %s (current pending %d)", bc->current_piece, me, bc->first, xbt_dynar_length(bc->pending_sends));
+      comm = MSG_task_isend(task, bc->first);
+      queue_pending_connection(comm, bc->pending_sends);
+      bc->current_piece++;
+    } else {
+      MSG_process_sleep(0.01);
+    }
+    process_pending_connections(bc->pending_sends);
   }
 
   return MSG_OK;
 }
 
   }
 
   return MSG_OK;
 }
 
-int broadcaster_finish(xbt_dynar_iterator_t it)
+int broadcaster_finish(broadcaster_t bc)
 {
   msg_task_t task = NULL;
   const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
   const char *current_host = NULL;
   char **cur = NULL;
 
 {
   msg_task_t task = NULL;
   const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
   const char *current_host = NULL;
   char **cur = NULL;
 
-  xbt_dynar_iterator_seek(it, 0);
+  xbt_dynar_iterator_seek(bc->it, 0);
 
   /* Send goodbye message to every peer in the order generated by iterator it */
 
   /* Send goodbye message to every peer in the order generated by iterator it */
-  for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
+  for (cur = (char**)xbt_dynar_iterator_next(bc->it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(bc->it)) {
     /* Send message to current peer */
     current_host = *cur;
     task = task_message_end_data_new(me, current_host);
     /* Send message to current peer */
     current_host = *cur;
     task = task_message_end_data_new(me, current_host);
@@ -105,31 +105,52 @@ int broadcaster_finish(xbt_dynar_iterator_t it)
   return MSG_OK;
 }
 
   return MSG_OK;
 }
 
+broadcaster_t broadcaster_init(xbt_dynar_t host_list)
+{
+  int status;
+  broadcaster_t bc = xbt_new(s_broadcaster_t, 1);
+
+  bc->piece_count = PIECE_COUNT;
+  bc->current_piece = 0;
+  bc->host_list = host_list;
+  bc->it = xbt_dynar_iterator_new(bc->host_list, forward_indices_list);
+  bc->max_pending_sends = MAX_PENDING_SENDS;
+  bc->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
+
+  status = broadcaster_build_chain(bc);
+  xbt_assert(status == MSG_OK, "Chain initialization failed");
+
+  return bc;
+}
+
+static void broadcaster_destroy(broadcaster_t bc)
+{
+  /* Destroy iterator and hostlist */
+  xbt_dynar_iterator_delete(bc->it);
+  xbt_dynar_free(&bc->pending_sends);
+  xbt_dynar_free(&bc->host_list);
+}
 
 /** Emitter function  */
 int broadcaster(int argc, char *argv[])
 {
 
 /** Emitter function  */
 int broadcaster(int argc, char *argv[])
 {
+  broadcaster_t bc = NULL;
   xbt_dynar_t host_list = NULL;
   const char *first = NULL;
   xbt_dynar_t host_list = NULL;
   const char *first = NULL;
-  int status = !MSG_OK;
+  int status;
 
   XBT_INFO("broadcaster");
 
   /* Add every mailbox given by the hostcount in argv[1] to a dynamic array */
   host_list = build_hostlist_from_hostcount(atoi(argv[1]));
 
   XBT_INFO("broadcaster");
 
   /* Add every mailbox given by the hostcount in argv[1] to a dynamic array */
   host_list = build_hostlist_from_hostcount(atoi(argv[1]));
-  /*host_list = build_hostlist_from_argv(argc, argv);*/
   
   
-  /* Initialize iterator */
-  xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
+  bc = broadcaster_init(host_list);
 
   /* TODO: Error checking */
 
   /* TODO: Error checking */
-  status = broadcaster_build_chain(&first, host_list, it);
-  status = broadcaster_send_file(first);
-  status = broadcaster_finish(it);
+  status = broadcaster_send_file(bc);
+  status = broadcaster_finish(bc);
 
 
-  /* Destroy iterator and hostlist */
-  xbt_dynar_iterator_delete(it);
-  delete_hostlist(host_list);
+  broadcaster_destroy(bc);
 
   return status;
 }
 
   return status;
 }
index 7d69cea..e0c026c 100644 (file)
 #include "iterator.h"
 #include "common.h"
 
 #include "iterator.h"
 #include "common.h"
 
+/* Connection parameters */
+#define MAX_PENDING_SENDS 10
+
+/* ``File'' details */
+#define PIECE_SIZE 16384
 #define PIECE_COUNT 50
 
 #define PIECE_COUNT 50
 
+/* Broadcaster struct */
+typedef struct s_broadcaster {
+  const char *first;
+  int piece_count;
+  int current_piece;
+  xbt_dynar_t host_list;
+  xbt_dynar_iterator_t it;
+  int max_pending_sends;
+  xbt_dynar_t pending_sends;
+} s_broadcaster_t, *broadcaster_t;
+
 xbt_dynar_t build_hostlist_from_hostcount(int hostcount); 
 
 /* Broadcaster: helper functions */
 xbt_dynar_t build_hostlist_from_hostcount(int hostcount); 
 
 /* Broadcaster: helper functions */
-int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar_iterator_t it);
-int broadcaster_send_file(const char *first);
-int broadcaster_finish(xbt_dynar_iterator_t it);
+broadcaster_t broadcaster_init(xbt_dynar_t host_list);
+int broadcaster_build_chain(broadcaster_t bc);
+int broadcaster_send_file(broadcaster_t bc);
+int broadcaster_finish(broadcaster_t bc);
+static void broadcaster_destroy(broadcaster_t bc);
 
 /* Tasks */
 int broadcaster(int argc, char *argv[]);
 
 /* Tasks */
 int broadcaster(int argc, char *argv[]);
index fc4c599..a78a40a 100644 (file)
@@ -15,12 +15,12 @@ $ $SG_TEST_EXENV ${bindir:=.}/chainsend ${srcdir:=.}/platform_chainsend.xml ${sr
 > [    0.000000] (7:peer@host6) peer
 > [    0.000000] (8:peer@host7) peer
 > [    0.000000] (9:peer@host8) peer
 > [    0.000000] (7:peer@host6) peer
 > [    0.000000] (8:peer@host7) peer
 > [    0.000000] (9:peer@host8) peer
-> [   93.000000] (2:peer@host1) Waiting for sends to finish before shutdown...
-> [  225.700000] (3:peer@host2) Waiting for sends to finish before shutdown...
-> [  294.700000] (4:peer@host3) Waiting for sends to finish before shutdown...
-> [  298.600000] (5:peer@host4) Waiting for sends to finish before shutdown...
-> [  309.100000] (6:peer@host5) Waiting for sends to finish before shutdown...
-> [  314.300000] (7:peer@host6) Waiting for sends to finish before shutdown...
-> [  318.300000] (8:peer@host7) Waiting for sends to finish before shutdown...
-> [  318.400000] (0:@) Total simulation time: 3.184000e+02
-> [  318.400000] (9:peer@host8) Waiting for sends to finish before shutdown...
+> [   88.950000] (2:peer@host1) Waiting for sends to finish before shutdown...
+> [  221.070000] (3:peer@host2) Waiting for sends to finish before shutdown...
+> [  289.980000] (4:peer@host3) Waiting for sends to finish before shutdown...
+> [  293.890000] (5:peer@host4) Waiting for sends to finish before shutdown...
+> [  304.300000] (6:peer@host5) Waiting for sends to finish before shutdown...
+> [  310.940000] (7:peer@host6) Waiting for sends to finish before shutdown...
+> [  314.850000] (8:peer@host7) Waiting for sends to finish before shutdown...
+> [  314.930000] (9:peer@host8) Waiting for sends to finish before shutdown...
+> [  316.850000] (0:@) Total simulation time: 3.168500e+02
index 177b4e8..0bc23cd 100644 (file)
@@ -1,19 +1,19 @@
 #include "messages.h"
 
 #include "messages.h"
 
-msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox)
+msg_task_t task_message_new(e_message_type type, unsigned int len, const char *issuer_hostname, const char *mailbox)
 {
   message_t msg = xbt_new(s_message_t, 1);
   msg->type = type;
   msg->issuer_hostname = issuer_hostname;
   msg->mailbox = mailbox;
 {
   message_t msg = xbt_new(s_message_t, 1);
   msg->type = type;
   msg->issuer_hostname = issuer_hostname;
   msg->mailbox = mailbox;
-  msg_task_t task = MSG_task_create(NULL, 0, MESSAGE_SIZE, msg); 
+  msg_task_t task = MSG_task_create(NULL, 0, len, msg);
 
   return task;
 }
 
 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next)
 {
 
   return task;
 }
 
 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next)
 {
-  msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, issuer_hostname, mailbox);
+  msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, MESSAGE_BUILD_CHAIN_SIZE, issuer_hostname, mailbox);
   message_t msg = MSG_task_get_data(task);
   msg->prev_hostname = prev;
   msg->next_hostname = next;
   message_t msg = MSG_task_get_data(task);
   msg->prev_hostname = prev;
   msg->next_hostname = next;
@@ -23,7 +23,7 @@ msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailb
 
 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
 {
 
 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
 {
-  msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox);
+  msg_task_t task = task_message_new(MESSAGE_SEND_DATA, MESSAGE_SEND_DATA_HEADER_SIZE + len, issuer_hostname, mailbox);
   //if (strcmp(mailbox, "host4") == 0) 
   //MSG_task_set_category(task, mailbox);
   message_t msg = MSG_task_get_data(task);
   //if (strcmp(mailbox, "host4") == 0) 
   //MSG_task_set_category(task, mailbox);
   message_t msg = MSG_task_get_data(task);
@@ -35,7 +35,7 @@ msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbo
 
 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox)
 {
 
 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox)
 {
-  return task_message_new(MESSAGE_END_DATA, issuer_hostname, mailbox);
+  return task_message_new(MESSAGE_END_DATA, MESSAGE_END_DATA_SIZE, issuer_hostname, mailbox);
 }
 
 void task_message_delete(void *task)
 }
 
 void task_message_delete(void *task)
index 8e89fd6..4421305 100644 (file)
@@ -4,7 +4,9 @@
 #include "msg/msg.h"
 #include "xbt/sysdep.h"
 
 #include "msg/msg.h"
 #include "xbt/sysdep.h"
 
-#define MESSAGE_SIZE 1
+#define MESSAGE_BUILD_CHAIN_SIZE 40
+#define MESSAGE_SEND_DATA_HEADER_SIZE 10
+#define MESSAGE_END_DATA_SIZE 20
 
 /* Messages enum */
 typedef enum {
 
 /* Messages enum */
 typedef enum {
@@ -25,7 +27,7 @@ typedef struct s_message {
 } s_message_t, *message_t;
 
 /* Message methods */
 } s_message_t, *message_t;
 
 /* Message methods */
-msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox);
+msg_task_t task_message_new(e_message_type type, unsigned int len, const char *issuer_hostname, const char *mailbox);
 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next);
 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len);
 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox);
 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next);
 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len);
 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox);
index fa91a7b..5605c88 100644 (file)
@@ -73,7 +73,7 @@ msg_error_t peer_wait_for_message(peer_t peer)
       task = NULL;
     } else {
       process_pending_connections(peer->pending_sends);
       task = NULL;
     } else {
       process_pending_connections(peer->pending_sends);
-      MSG_process_sleep(0.1);
+      MSG_process_sleep(0.01);
     }
   }
 
     }
   }
 
@@ -105,7 +105,7 @@ void peer_shutdown(peer_t p)
   XBT_INFO("Waiting for sends to finish before shutdown...");
   while (xbt_dynar_length(p->pending_sends) && MSG_get_clock() < end_time) {
     process_pending_connections(p->pending_sends);
   XBT_INFO("Waiting for sends to finish before shutdown...");
   while (xbt_dynar_length(p->pending_sends) && MSG_get_clock() < end_time) {
     process_pending_connections(p->pending_sends);
-    MSG_process_sleep(0.1);
+    MSG_process_sleep(1);
   }
 
   xbt_assert(xbt_dynar_length(p->pending_sends) == 0, "Shutdown failed, sends still pending after deadline");
   }
 
   xbt_assert(xbt_dynar_length(p->pending_sends) == 0, "Shutdown failed, sends still pending after deadline");
index 5827862..a5d99d3 100644 (file)
@@ -7,7 +7,7 @@
 #include "messages.h"
 #include "common.h"
 
 #include "messages.h"
 #include "common.h"
 
-#define PEER_SHUTDOWN_DEADLINE 6000
+#define PEER_SHUTDOWN_DEADLINE 60000
 
 /* Peer struct */
 typedef struct s_peer {
 
 /* Peer struct */
 typedef struct s_peer {