Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Chord: adding timeouts to avoid waiting forever
authorthiery <thiery@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 16 Dec 2010 10:23:37 +0000 (10:23 +0000)
committerthiery <thiery@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 16 Dec 2010 10:23:37 +0000 (10:23 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@9269 48e7efb5-ca39-0410-a469-dd3cf9ba447f

examples/msg/chord/chord.c

index d3484f1..7b9bf0e 100644 (file)
@@ -227,7 +227,9 @@ static void set_predecessor(node_t node, int predecessor_id)
 int node(int argc, char *argv[])
 {
   double init_time = MSG_get_clock();
-  msg_comm_t comm = NULL;
+  m_task_t task = NULL;
+  msg_comm_t comm_send = NULL;
+  msg_comm_t comm_receive = NULL;
   int i;
   char* mailbox = NULL;
   double deadline;
@@ -267,35 +269,44 @@ int node(int argc, char *argv[])
 
   while (MSG_get_clock() < init_time + deadline) {
 
-    unsigned int cursor;
-    comm = NULL;
-    xbt_dynar_foreach(node.comms, cursor, comm) {
-      if (MSG_comm_test(comm)) { // FIXME: try with MSG_comm_testany instead
-        xbt_dynar_cursor_rm(node.comms, &cursor);
-       MSG_comm_destroy(comm);
-      }
+    // see if some communications are finished
+    int index;
+    while ((index = MSG_comm_testany(node.comms)) != -1) {
+      comm_send = xbt_dynar_get_as(node.comms, index, msg_comm_t);
+      xbt_dynar_remove_at(node.comms, index, &comm_send);
+      MSG_comm_destroy(comm_send);
     }
 
-    // periodic calls
-    if (MSG_get_clock() >= next_stabilize_date) {
-      stabilize(&node);
-      next_stabilize_date = MSG_get_clock() + 10;
-    }
-    if (MSG_get_clock() >= next_fix_fingers_date) {
-      fix_fingers(&node);
-      next_fix_fingers_date = MSG_get_clock() + 10;
-    }
-    if (MSG_get_clock() >= next_check_predecessor_date) {
-      check_predecessor(&node);
-      next_check_predecessor_date = MSG_get_clock() + 10;
+    if (comm_receive == NULL) {
+      task = NULL;
+      comm_receive = MSG_task_irecv(&task, node.mailbox);
     }
 
-    m_task_t task = NULL;
-    MSG_error_t res = MSG_task_receive_with_timeout(&task, node.mailbox, 45); // FIXME >> find the right timeout !!
+    if (!MSG_comm_test(comm_receive)) {
 
-    if (res == MSG_OK) {                        // else check deadline condition and keep waiting for a task
+      // no task was received: make some periodic calls
+      if (MSG_get_clock() >= next_stabilize_date) {
+        stabilize(&node);
+        next_stabilize_date = MSG_get_clock() + 10;
+      }
+      else if (MSG_get_clock() >= next_fix_fingers_date) {
+        fix_fingers(&node);
+        next_fix_fingers_date = MSG_get_clock() + 10;
+      }
+      else if (MSG_get_clock() >= next_check_predecessor_date) {
+        check_predecessor(&node);
+        next_check_predecessor_date = MSG_get_clock() + 10;
+      }
+      else {
+       // nothing to do: sleep for a while
+        MSG_process_sleep(5);
+      }
+    }
+    else {
+      // a task was received
 
-      // a task was received, get the data
+      MSG_comm_destroy(comm_receive);
+      comm_receive = NULL;
       const char* task_name = MSG_task_get_name(task);
       task_data_t task_data = (task_data_t) MSG_task_get_data(task);
 
@@ -308,8 +319,8 @@ int node(int argc, char *argv[])
          MSG_task_set_name(task, "Find Successor Answer");
          INFO3("Sending back a 'Find Successor Answer' to %s: the successor of %d is %d",
                task_data->issuer_host_name, task_data->request_id, task_data->answer_id);
-         comm = MSG_task_isend(task, task_data->answer_to);
-         xbt_dynar_push(node.comms, &comm);
+         comm_send = MSG_task_isend(task, task_data->answer_to);
+         xbt_dynar_push(node.comms, &comm_send);
        }
        else {
          // otherwise, forward the request to the closest preceding finger in my table
@@ -317,8 +328,8 @@ int node(int argc, char *argv[])
          INFO2("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d",
                task_data->request_id, closest);
          mailbox = get_mailbox(closest);
-         comm = MSG_task_isend(task, mailbox);
-         xbt_dynar_push(node.comms, &comm);
+         comm_send = MSG_task_isend(task, mailbox);
+         xbt_dynar_push(node.comms, &comm_send);
          xbt_free(mailbox);
        }
       }
@@ -329,8 +340,8 @@ int node(int argc, char *argv[])
        MSG_task_set_name(task, "Get Predecessor Answer");
        INFO2("Sending back a 'Get Predecessor Answer' to %s: my predecessor is %d",
              task_data->issuer_host_name, task_data->answer_id);
-       comm = MSG_task_isend(task, task_data->answer_to);
-       xbt_dynar_push(node.comms, &comm);
+       comm_send = MSG_task_isend(task, task_data->answer_to);
+       xbt_dynar_push(node.comms, &comm_send);
       }
       /*
       else if (!strcmp(task_name, "Find Predecessor")) {
@@ -426,7 +437,13 @@ static void join(node_t node, int known_id)
 {
   INFO2("Joining the ring with id %d, knowing node %d", node->id, known_id);
   set_predecessor(node, -1); // no predecessor (yet)
-  int successor_id = remote_find_successor(node, known_id, node->id);
+
+  int successor_id;
+  do {
+    successor_id = remote_find_successor(node, known_id, node->id);
+    INFO0("My 'Find Successor' request has failed, let's try again");
+  } while (successor_id == -1);
+
   set_finger(node, 0, successor_id);
   print_finger_table(node);
 }
@@ -676,7 +693,7 @@ static void remote_update_finger_table(node_t node, int ask_to_id, int candidate
  * \brief Makes the current node find the successor node of an id.
  * \param node the current node
  * \param id the id to find
- * \return the id of the successor node
+ * \return the id of the successor node, or -1 if the request failed
  */
 static int find_successor(node_t node, int id)
 {
@@ -695,10 +712,11 @@ static int find_successor(node_t node, int id)
  * \param node the current node
  * \param ask_to the node to ask to
  * \param id the id to find
- * \return the id of the successor node
+ * \return the id of the successor node, or -1 if the request failed
  */
 static int remote_find_successor(node_t node, int ask_to, int id)
 {
+  int successor = -1;
   s_task_data_t req_data;
   char* mailbox = bprintf("%s Find Successor", node->mailbox);
   req_data.request_id = id;
@@ -708,17 +726,23 @@ static int remote_find_successor(node_t node, int ask_to, int id)
   // send a "Find Successor" request to ask_to_id
   INFO2("Sending a 'Find Successor' request to %d for key %d", ask_to, id);
   m_task_t task = MSG_task_create("Find Successor", 1000, 5000, &req_data);
-  MSG_task_send(task, get_mailbox(ask_to));
+  MSG_error_t res = MSG_task_send_with_timeout(task, get_mailbox(ask_to), 20);
+  if (res == MSG_OK) {
 
-  // receive the answer
-  task = NULL;
-  MSG_task_receive(&task, req_data.answer_to);
-  task_data_t ans_data;
-  ans_data = MSG_task_get_data(task);
-  int successor = ans_data->answer_id;
-  xbt_free(mailbox);
-  INFO2("Received the answer to my 'Find Successor' request: the successor of key %d is %d", id, successor);
+    // receive the answer
+    task = NULL;
+    res = MSG_task_receive_with_timeout(&task, req_data.answer_to, 20);
 
+    if (res == MSG_OK) {
+
+      task_data_t ans_data;
+      ans_data = MSG_task_get_data(task);
+      successor = ans_data->answer_id;
+      INFO2("Received the answer to my 'Find Successor' request: the successor of key %d is %d", id, successor);
+    }
+  }
+
+  xbt_free(mailbox);
   return successor;
 }
 
@@ -726,10 +750,12 @@ static int remote_find_successor(node_t node, int ask_to, int id)
  * \brief Asks another node its predecessor.
  * \param node the current node
  * \param ask_to the node to ask to
- * \return the id of its predecessor node
+ * \return the id of its predecessor node, or -1 if the request failed
+ * (or if the node does not know its predecessor)
  */
 static int remote_get_predecessor(node_t node, int ask_to)
 {
+  int predecessor_id = -1;
   s_task_data_t req_data;
   char* mailbox = bprintf("%s Get Predecessor", node->mailbox);
   req_data.answer_to = mailbox;
@@ -738,17 +764,24 @@ static int remote_get_predecessor(node_t node, int ask_to)
   // send a "Get Predecessor" request to ask_to_id
   INFO1("Sending a 'Get Predecessor' request to %d", ask_to);
   m_task_t task = MSG_task_create("Get Predecessor", 1000, 5000, &req_data);
-  MSG_task_send(task, get_mailbox(ask_to));
+  MSG_error_t res = MSG_task_send_with_timeout(task, get_mailbox(ask_to), 20);
 
-  // receive the answer
-  task = NULL;
-  MSG_task_receive(&task, req_data.answer_to);
-  task_data_t ans_data;
-  ans_data = MSG_task_get_data(task);
-  int predecessor_id = ans_data->answer_id;
-  xbt_free(mailbox);
-  INFO2("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to, predecessor_id);
+  if (res == MSG_OK) {
+
+    // receive the answer
+    task = NULL;
+    res = MSG_task_receive_with_timeout(&task, req_data.answer_to, 20);
+
+    if (res == MSG_OK) {
+
+      task_data_t ans_data;
+      ans_data = MSG_task_get_data(task);
+      predecessor_id = ans_data->answer_id;
+      INFO2("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to, predecessor_id);
+    }
+  }
 
+  xbt_free(mailbox);
   return predecessor_id;
 }
 
@@ -904,11 +937,14 @@ static void fix_fingers(node_t node) {
   INFO0("Fixing fingers");
   int i = node->next_finger_to_fix;
   int id = find_successor(node, node->id + powers2[i]);
-  if (id != -1 && id != node->fingers[i].id) {
-    set_finger(node, i, id);
-    print_finger_table(node);
+  if (id != -1) {
+
+    if (id != node->fingers[i].id) {
+      set_finger(node, i, id);
+      print_finger_table(node);
+    }
+    node->next_finger_to_fix = (i + 1) % NB_BITS;
   }
-  node->next_finger_to_fix = (i + 1) % NB_BITS;
 }
 
 /**