Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
First working version with 8 peers I have not evaluated how it works yet, only that...
[simgrid.git] / examples / msg / kadeploy / kadeploy.c
index ae5dc08..93e6998 100644 (file)
@@ -25,8 +25,11 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kadeploy,
                              "Messages specific for kadeploy");
 
 #define MESSAGE_SIZE 1
+#define PIECE_COUNT 100
 #define HOSTNAME_LENGTH 20
 
+#define PEER_SHUTDOWN_DEADLINE 600
+
 /*
  Data structures
  */
@@ -66,6 +69,8 @@ typedef struct s_peer {
   const char *next;
   const char *me;
   int pieces;
+  xbt_dynar_t pending_sends;
+  int close_asap; /* TODO: unused */
 } s_peer_t, *peer_t;
 
 /* Iterator methods */
@@ -94,9 +99,11 @@ int broadcaster_send_file(const char *first);
 int broadcaster_finish(xbt_dynar_t host_list);
 
 /* Peer: helper functions */
-msg_error_t peer_wait_for_message();
+msg_error_t peer_wait_for_message(peer_t peer);
 int peer_execute_task(peer_t peer, msg_task_t task);
 void peer_init_chain(peer_t peer, message_t msg);
+void peer_shutdown(peer_t p);
+void peer_init(peer_t p);
 
 /* Initialization stuff */
 msg_error_t test_all(const char *platform_file,
@@ -120,9 +127,9 @@ xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)
 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it)
 {
   int next = it->criteria_fn((xbt_dynar_iterator_t)it);
-  XBT_INFO("%d current\n", next);
+  //XBT_INFO("%d current\n", next);
   if (next < 0) {
-    XBT_INFO("Nothing to return!\n");
+    //XBT_INFO("Nothing to return!\n");
     return NULL;
   } else {
     xbt_dynar_push(it->indices_list, &next);
@@ -176,6 +183,7 @@ msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailb
 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
 {
   msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox);
+  if (strcmp(mailbox, "host4") == 0) MSG_task_set_category(task, mailbox);
   message_t msg = MSG_task_get_data(task);
   msg->data_block = block;
   msg->data_length = len;
@@ -188,7 +196,6 @@ msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *ma
   return task_message_new(MESSAGE_END_DATA, issuer_hostname, mailbox);
 }
 
-
 void task_message_delete(void *task)
 {
   message_t msg = MSG_task_get_data(task);
@@ -196,6 +203,31 @@ void task_message_delete(void *task)
   MSG_task_destroy(task);
 }
 
+void queue_pending_connection(msg_comm_t comm, xbt_dynar_t q)
+{
+  xbt_dynar_push(q, &comm);
+}
+
+int process_pending_connections(xbt_dynar_t q)
+{
+  unsigned int iter;
+  int status;
+  int empty = 0;
+  msg_comm_t comm;
+
+  xbt_dynar_foreach(q, iter, comm) {
+    empty = 1;
+    if (MSG_comm_test(comm)) {
+      MSG_comm_destroy(comm);
+      status = MSG_comm_get_status(comm);
+      xbt_assert(status == MSG_OK, __FILE__ ": process_pending_connections() failed");
+      xbt_dynar_cursor_rm(q, &iter);
+      empty = 0;
+    }
+  }
+  return empty;
+}
+
 xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
 {
   xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
@@ -206,7 +238,7 @@ xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
   for (; i < hostcount+1; i++) {
     hostname = xbt_new(char, HOSTNAME_LENGTH);
     snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
-    XBT_INFO("%s", hostname);
+    //XBT_INFO("%s", hostname);
     h = MSG_get_host_by_name(hostname);
     if (h == NULL) {
       XBT_INFO("Unknown host %s. Stopping Now! ", hostname);
@@ -270,10 +302,11 @@ int broadcaster_build_chain(const char **first, xbt_dynar_t host_list)
         next = *cur;
       else
         next = NULL;
-      XBT_INFO("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
+      //XBT_INFO("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
     
       /* Send message to current peer */
       task = task_message_chain_new(me, current_host, prev, next);
+      //MSG_task_set_category(task, current_host);
       MSG_task_send(task, current_host);
 
       last = current_host;
@@ -291,19 +324,19 @@ int broadcaster_send_file(const char *first)
   msg_comm_t comm = NULL;
   int status;
 
-  int piece_count = 10;
+  int piece_count = PIECE_COUNT;
   int cur = 0;
 
   for (; cur < piece_count; cur++) {
     /* TODO: stub */
     task = task_message_data_new(me, first, NULL, 0);
-    XBT_INFO("Sending (isend) from %s into mailbox %s", me, first);
+    //XBT_INFO("Sending (isend) from %s into mailbox %s", me, first);
     //comm = MSG_task_isend(task, first);
-    //status = 
-    MSG_task_dsend(task, first, task_message_delete);
+    status = MSG_task_send(task, first);
+    //MSG_task_dsend(task, first, task_message_delete);
    
     //status = MSG_comm_wait(comm, -1);
-    //xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
+    xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
     //MSG_comm_destroy(comm);
   }
 
@@ -323,6 +356,7 @@ int broadcaster_finish(xbt_dynar_t host_list)
     /* Send message to current peer */
     current_host = *cur;
     task = task_message_end_data_new(me, current_host);
+    //MSG_task_set_category(task, current_host);
     MSG_task_send(task, current_host);
   }
 
@@ -365,28 +399,22 @@ void peer_init_chain(peer_t peer, message_t msg)
   peer->init = 1;
 }
 
-/* TODO: error checking */
 void peer_forward_msg(peer_t peer, message_t msg)
 {
   int status;
   msg_task_t task = task_message_data_new(peer->me, peer->next, NULL, 0);
   msg_comm_t comm = NULL;
-  XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
-  //comm =
-  //status = 
-  MSG_task_dsend(task, peer->next, task_message_delete);
-   
-  //status = MSG_comm_wait(comm, -1);
-  xbt_assert(status == MSG_OK, __FILE__ ": peer_forward_msg() failed");
-  //MSG_comm_destroy(comm);
+  //XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
+  comm = MSG_task_isend(task, peer->next);
+  queue_pending_connection(comm, peer->pending_sends);
 }
 
 int peer_execute_task(peer_t peer, msg_task_t task)
 {
-  int done = 0, init = 0;
+  int done = 0;
   message_t msg = MSG_task_get_data(task);
   
-  XBT_INFO("Peer %s got message of type %d\n", peer->me, msg->type);
+  //XBT_INFO("Peer %s got message of type %d\n", peer->me, msg->type);
   switch (msg->type) {
     case MESSAGE_BUILD_CHAIN:
       peer_init_chain(peer, msg);
@@ -416,14 +444,13 @@ msg_error_t peer_wait_for_message(peer_t peer)
   msg_task_t task = NULL;
   int done = 0;
 
-
-  /* TODO: Error checking is not correct */
   while (!done) {
     if (comm == NULL)
       comm = MSG_task_irecv(&task, peer->me);
 
     if (MSG_comm_test(comm)) {
       status = MSG_comm_get_status(comm);
+      //XBT_INFO("peer_wait_for_message: error code = %d", status);
       xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed");
       MSG_comm_destroy(comm);
       comm = NULL;
@@ -431,6 +458,7 @@ msg_error_t peer_wait_for_message(peer_t peer)
       task_message_delete(task);
       task = NULL;
     } else {
+      process_pending_connections(peer->pending_sends);
       MSG_process_sleep(0.01);
     }
   }
@@ -444,9 +472,28 @@ void peer_init(peer_t p)
   p->prev = NULL;
   p->next = NULL;
   p->pieces = 0;
+  p->close_asap = 0;
+  p->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
   p->me = MSG_host_get_name(MSG_host_self());
 }
 
+void peer_shutdown(peer_t p)
+{
+  float start_time = MSG_get_clock();
+  float end_time = start_time + PEER_SHUTDOWN_DEADLINE;
+
+  XBT_INFO("Waiting for sends to finish before shutdown...");
+  while (xbt_dynar_length(p->pending_sends) && MSG_get_clock() < end_time) {
+    process_pending_connections(p->pending_sends);
+    MSG_process_sleep(0.1);
+  }
+
+  xbt_assert(xbt_dynar_length(p->pending_sends) == 0, "Shutdown failed, sends still pending after deadline");
+  xbt_dynar_free(&p->pending_sends);
+
+  xbt_free(p);
+}
+
 /** Peer function  */
 int peer(int argc, char *argv[])
 {
@@ -457,8 +504,7 @@ int peer(int argc, char *argv[])
 
   peer_init(p);
   status = peer_wait_for_message(p);
-
-  xbt_free(p);
+  peer_shutdown(p);
 
   return MSG_OK;
 }                               /* end_of_receiver */
@@ -478,6 +524,14 @@ msg_error_t test_all(const char *platform_file,
   /*  Simulation setting */
   MSG_create_environment(platform_file);
 
+  /* Trace categories */
+  TRACE_category_with_color("host0", "0 0 1");
+  TRACE_category_with_color("host1", "0 1 0");
+  TRACE_category_with_color("host2", "0 1 1");
+  TRACE_category_with_color("host3", "1 0 0");
+  TRACE_category_with_color("host4", "1 0 1");
+  TRACE_category_with_color("host5", "1 1 0");
+
   /*   Application deployment */
   MSG_function_register("broadcaster", broadcaster);
   MSG_function_register("peer", peer);
@@ -502,7 +556,6 @@ int main(int argc, char *argv[])
 
   MSG_init(&argc, argv);
 
-
   /*if (argc <= 3) {
     XBT_CRITICAL("Usage: %s platform_file deployment_file <model>\n",
               argv[0]);