From 7b28e9e1bfe2348d1a820e78d8867ba325dcd03b Mon Sep 17 00:00:00 2001 From: thiery Date: Thu, 16 Dec 2010 10:23:37 +0000 Subject: [PATCH] Chord: adding timeouts to avoid waiting forever git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@9269 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- examples/msg/chord/chord.c | 148 +++++++++++++++++++++++-------------- 1 file changed, 92 insertions(+), 56 deletions(-) diff --git a/examples/msg/chord/chord.c b/examples/msg/chord/chord.c index d3484f12e1..7b9bf0eed9 100644 --- a/examples/msg/chord/chord.c +++ b/examples/msg/chord/chord.c @@ -227,7 +227,9 @@ static void set_predecessor(node_t node, int predecessor_id) int node(int argc, char *argv[]) { double init_time = MSG_get_clock(); - msg_comm_t comm = NULL; + m_task_t task = NULL; + msg_comm_t comm_send = NULL; + msg_comm_t comm_receive = NULL; int i; char* mailbox = NULL; double deadline; @@ -267,35 +269,44 @@ int node(int argc, char *argv[]) while (MSG_get_clock() < init_time + deadline) { - unsigned int cursor; - comm = NULL; - xbt_dynar_foreach(node.comms, cursor, comm) { - if (MSG_comm_test(comm)) { // FIXME: try with MSG_comm_testany instead - xbt_dynar_cursor_rm(node.comms, &cursor); - MSG_comm_destroy(comm); - } + // see if some communications are finished + int index; + while ((index = MSG_comm_testany(node.comms)) != -1) { + comm_send = xbt_dynar_get_as(node.comms, index, msg_comm_t); + xbt_dynar_remove_at(node.comms, index, &comm_send); + MSG_comm_destroy(comm_send); } - // periodic calls - if (MSG_get_clock() >= next_stabilize_date) { - stabilize(&node); - next_stabilize_date = MSG_get_clock() + 10; - } - if (MSG_get_clock() >= next_fix_fingers_date) { - fix_fingers(&node); - next_fix_fingers_date = MSG_get_clock() + 10; - } - if (MSG_get_clock() >= next_check_predecessor_date) { - check_predecessor(&node); - next_check_predecessor_date = MSG_get_clock() + 10; + if (comm_receive == NULL) { + task = NULL; + comm_receive = MSG_task_irecv(&task, node.mailbox); } - m_task_t task = NULL; - MSG_error_t res = MSG_task_receive_with_timeout(&task, node.mailbox, 45); // FIXME >> find the right timeout !! + if (!MSG_comm_test(comm_receive)) { - if (res == MSG_OK) { // else check deadline condition and keep waiting for a task + // no task was received: make some periodic calls + if (MSG_get_clock() >= next_stabilize_date) { + stabilize(&node); + next_stabilize_date = MSG_get_clock() + 10; + } + else if (MSG_get_clock() >= next_fix_fingers_date) { + fix_fingers(&node); + next_fix_fingers_date = MSG_get_clock() + 10; + } + else if (MSG_get_clock() >= next_check_predecessor_date) { + check_predecessor(&node); + next_check_predecessor_date = MSG_get_clock() + 10; + } + else { + // nothing to do: sleep for a while + MSG_process_sleep(5); + } + } + else { + // a task was received - // a task was received, get the data + MSG_comm_destroy(comm_receive); + comm_receive = NULL; const char* task_name = MSG_task_get_name(task); task_data_t task_data = (task_data_t) MSG_task_get_data(task); @@ -308,8 +319,8 @@ int node(int argc, char *argv[]) MSG_task_set_name(task, "Find Successor Answer"); INFO3("Sending back a 'Find Successor Answer' to %s: the successor of %d is %d", task_data->issuer_host_name, task_data->request_id, task_data->answer_id); - comm = MSG_task_isend(task, task_data->answer_to); - xbt_dynar_push(node.comms, &comm); + comm_send = MSG_task_isend(task, task_data->answer_to); + xbt_dynar_push(node.comms, &comm_send); } else { // otherwise, forward the request to the closest preceding finger in my table @@ -317,8 +328,8 @@ int node(int argc, char *argv[]) INFO2("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d", task_data->request_id, closest); mailbox = get_mailbox(closest); - comm = MSG_task_isend(task, mailbox); - xbt_dynar_push(node.comms, &comm); + comm_send = MSG_task_isend(task, mailbox); + xbt_dynar_push(node.comms, &comm_send); xbt_free(mailbox); } } @@ -329,8 +340,8 @@ int node(int argc, char *argv[]) MSG_task_set_name(task, "Get Predecessor Answer"); INFO2("Sending back a 'Get Predecessor Answer' to %s: my predecessor is %d", task_data->issuer_host_name, task_data->answer_id); - comm = MSG_task_isend(task, task_data->answer_to); - xbt_dynar_push(node.comms, &comm); + comm_send = MSG_task_isend(task, task_data->answer_to); + xbt_dynar_push(node.comms, &comm_send); } /* else if (!strcmp(task_name, "Find Predecessor")) { @@ -426,7 +437,13 @@ static void join(node_t node, int known_id) { INFO2("Joining the ring with id %d, knowing node %d", node->id, known_id); set_predecessor(node, -1); // no predecessor (yet) - int successor_id = remote_find_successor(node, known_id, node->id); + + int successor_id; + do { + successor_id = remote_find_successor(node, known_id, node->id); + INFO0("My 'Find Successor' request has failed, let's try again"); + } while (successor_id == -1); + set_finger(node, 0, successor_id); print_finger_table(node); } @@ -676,7 +693,7 @@ static void remote_update_finger_table(node_t node, int ask_to_id, int candidate * \brief Makes the current node find the successor node of an id. * \param node the current node * \param id the id to find - * \return the id of the successor node + * \return the id of the successor node, or -1 if the request failed */ static int find_successor(node_t node, int id) { @@ -695,10 +712,11 @@ static int find_successor(node_t node, int id) * \param node the current node * \param ask_to the node to ask to * \param id the id to find - * \return the id of the successor node + * \return the id of the successor node, or -1 if the request failed */ static int remote_find_successor(node_t node, int ask_to, int id) { + int successor = -1; s_task_data_t req_data; char* mailbox = bprintf("%s Find Successor", node->mailbox); req_data.request_id = id; @@ -708,17 +726,23 @@ static int remote_find_successor(node_t node, int ask_to, int id) // send a "Find Successor" request to ask_to_id INFO2("Sending a 'Find Successor' request to %d for key %d", ask_to, id); m_task_t task = MSG_task_create("Find Successor", 1000, 5000, &req_data); - MSG_task_send(task, get_mailbox(ask_to)); + MSG_error_t res = MSG_task_send_with_timeout(task, get_mailbox(ask_to), 20); + if (res == MSG_OK) { - // receive the answer - task = NULL; - MSG_task_receive(&task, req_data.answer_to); - task_data_t ans_data; - ans_data = MSG_task_get_data(task); - int successor = ans_data->answer_id; - xbt_free(mailbox); - INFO2("Received the answer to my 'Find Successor' request: the successor of key %d is %d", id, successor); + // receive the answer + task = NULL; + res = MSG_task_receive_with_timeout(&task, req_data.answer_to, 20); + if (res == MSG_OK) { + + task_data_t ans_data; + ans_data = MSG_task_get_data(task); + successor = ans_data->answer_id; + INFO2("Received the answer to my 'Find Successor' request: the successor of key %d is %d", id, successor); + } + } + + xbt_free(mailbox); return successor; } @@ -726,10 +750,12 @@ static int remote_find_successor(node_t node, int ask_to, int id) * \brief Asks another node its predecessor. * \param node the current node * \param ask_to the node to ask to - * \return the id of its predecessor node + * \return the id of its predecessor node, or -1 if the request failed + * (or if the node does not know its predecessor) */ static int remote_get_predecessor(node_t node, int ask_to) { + int predecessor_id = -1; s_task_data_t req_data; char* mailbox = bprintf("%s Get Predecessor", node->mailbox); req_data.answer_to = mailbox; @@ -738,17 +764,24 @@ static int remote_get_predecessor(node_t node, int ask_to) // send a "Get Predecessor" request to ask_to_id INFO1("Sending a 'Get Predecessor' request to %d", ask_to); m_task_t task = MSG_task_create("Get Predecessor", 1000, 5000, &req_data); - MSG_task_send(task, get_mailbox(ask_to)); + MSG_error_t res = MSG_task_send_with_timeout(task, get_mailbox(ask_to), 20); - // receive the answer - task = NULL; - MSG_task_receive(&task, req_data.answer_to); - task_data_t ans_data; - ans_data = MSG_task_get_data(task); - int predecessor_id = ans_data->answer_id; - xbt_free(mailbox); - INFO2("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to, predecessor_id); + if (res == MSG_OK) { + + // receive the answer + task = NULL; + res = MSG_task_receive_with_timeout(&task, req_data.answer_to, 20); + + if (res == MSG_OK) { + + task_data_t ans_data; + ans_data = MSG_task_get_data(task); + predecessor_id = ans_data->answer_id; + INFO2("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to, predecessor_id); + } + } + xbt_free(mailbox); return predecessor_id; } @@ -904,11 +937,14 @@ static void fix_fingers(node_t node) { INFO0("Fixing fingers"); int i = node->next_finger_to_fix; int id = find_successor(node, node->id + powers2[i]); - if (id != -1 && id != node->fingers[i].id) { - set_finger(node, i, id); - print_finger_table(node); + if (id != -1) { + + if (id != node->fingers[i].id) { + set_finger(node, i, id); + print_finger_table(node); + } + node->next_finger_to_fix = (i + 1) % NB_BITS; } - node->next_finger_to_fix = (i + 1) % NB_BITS; } /** -- 2.20.1