From b10b1f5a348a190fe1e1809ee6d65a03c7b1df06 Mon Sep 17 00:00:00 2001 From: thiery Date: Wed, 15 Dec 2010 16:10:53 +0000 Subject: [PATCH] Implementing concurrent join in Chord git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@9262 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- examples/msg/chord/chord.c | 231 ++++++++++++++++++++++++++++--------- 1 file changed, 175 insertions(+), 56 deletions(-) diff --git a/examples/msg/chord/chord.c b/examples/msg/chord/chord.c index 8718f6687f..b8e224c8a0 100644 --- a/examples/msg/chord/chord.c +++ b/examples/msg/chord/chord.c @@ -28,11 +28,11 @@ typedef struct finger { */ typedef struct node { int id; // my id - char* mailbox; + char* mailbox; // my usual mailbox name s_finger_t fingers[NB_BITS]; // finger table (fingers[0] is my successor) int pred_id; // predecessor id - char* pred_mailbox; - xbt_dynar_t comms; // current communications to finish + char* pred_mailbox; // predecessor's mailbox name + xbt_dynar_t comms; // current communications pending } s_node_t, *node_t; /** @@ -53,14 +53,16 @@ static int normalize(int id); static int is_in_interval(int id, int start, int end); static char* get_mailbox(int host_id); static void print_finger_table(node_t node); +static void set_finger(node_t node, int finger_index, int id); +static void set_predecessor(node_t node, int predecessor_id); // process functions static int node(int argc, char *argv[]); // initialization static void initialize_first_node(node_t node); -static void initialize_finger_table(node_t data, int known_id); static void join(node_t node, int known_id); +static void bootstrap(node_t node, int known_id); static void leave(node_t node); // Chord core @@ -70,22 +72,29 @@ static int find_predecessor(node_t node, int id); static int remote_find_predecessor(node_t node, int ask_to_id, int id); static int closest_preceding_finger(node_t node, int id); static int remote_closest_preceding_finger(int ask_to_id, int id); -static void notify_predecessors(node_t node); +static void notify(node_t node, int new_node_id); +static void remote_notify(node_t node, int notify_id); +static void stabilize(node_t node); +static void quit_notify(node_t node, int to); + +// not implemented yet static void remote_move_keys(node_t node, int take_from_id); + +// deprecated +static void notify_predecessors(node_t node); +static void initialize_finger_table(node_t data, int known_id); +static void notify_predecessor_changed(node_t node, int predecessor_candidate_id); +static void remote_notify_predecessor_changed(node_t node, int notify_to, int predecessor_candidate_id); static void update_finger_table(node_t node, int candidate_id, int finger_index); static void remote_update_finger_table(node_t node, int ask_to_id, int candidate_id, int finger_index); -static void notify(node_t node, int predecessor_candidate_id); -static void remote_notify(node_t node, int notify_to, int predecessor_candidate_id); -static void stabilize(node_t node); -static void quit_notify(node_t node, int to); /** * \brief Turns an id into an equivalent id in [0, NB_KEYS[ * \param id an id * \return the corresponding normalized id */ -static int normalize(int id) { - +static int normalize(int id) +{ // make sure id >= 0 while (id < 0) { id += NB_KEYS; @@ -112,8 +121,8 @@ static int normalize(int id) { * \param end upper bound * \return a non-zero value if id in in [start, end] */ -static int is_in_interval(int id, int start, int end) { - +static int is_in_interval(int id, int start, int end) +{ id = normalize(id); start = normalize(start); end = normalize(end); @@ -136,8 +145,8 @@ static int is_in_interval(int id, int start, int end) { * \return the name of its mailbox * FIXME: free the memory */ -static char* get_mailbox(int node_id) { - +static char* get_mailbox(int node_id) +{ return bprintf("mailbox%d", node_id); } @@ -145,8 +154,8 @@ static char* get_mailbox(int node_id) { * \brief Displays the finger table of a node. * \param node a node */ -static void print_finger_table(node_t node) { - +static void print_finger_table(node_t node) +{ int i; int pow = 1; INFO0("My finger table:"); @@ -158,6 +167,33 @@ static void print_finger_table(node_t node) { INFO1("Predecessor: %d", node->pred_id); } +/** + * \brief Sets a finger of the current node. + * \param node the current node + * \param finger_index index of the finger to set (0 to NB_BITS - 1) + * \param id the id to set for this finger + */ +static void set_finger(node_t node, int finger_index, int id) +{ + node->fingers[finger_index].id = id; + xbt_free(node->fingers[finger_index].mailbox); + node->fingers[finger_index].mailbox = get_mailbox(id); + INFO2("My new finger #%d is %d", finger_index, id); +} + +/** + * \brief Sets the predecessor of the current node. + * \param node the current node + * \param id the id to predecessor + */ +static void set_predecessor(node_t node, int predecessor_id) +{ + node->pred_id = predecessor_id; + xbt_free(node->pred_mailbox); + node->pred_mailbox = get_mailbox(predecessor_id); + INFO1("My new predecessor is %d", predecessor_id); +} + /** * \brief Node Function * Arguments: @@ -172,6 +208,7 @@ int node(int argc, char *argv[]) int i; char* mailbox; double deadline; + double next_stabilize_date = init_time + 50; xbt_assert0(argc == 3 || argc == 5, "Wrong number of arguments for this node"); @@ -181,7 +218,6 @@ int node(int argc, char *argv[]) node.mailbox = get_mailbox(node.id); node.comms = xbt_dynar_new(sizeof(msg_comm_t), NULL); - if (argc == 3) { // first ring initialize_first_node(&node); deadline = atof(argv[2]); @@ -199,7 +235,7 @@ int node(int argc, char *argv[]) join(&node, known_id); } - while ((MSG_get_clock( )- init_time) < deadline) { + while ((MSG_get_clock() - init_time) < deadline) { unsigned int cursor; comm = NULL; @@ -210,10 +246,15 @@ int node(int argc, char *argv[]) } } + if (MSG_get_clock() >= next_stabilize_date) { + stabilize(&node); + next_stabilize_date = MSG_get_clock() + 50; + } + m_task_t task = NULL; - MSG_error_t res = MSG_task_receive_with_timeout(&task, node.mailbox,45); // FIXME >> find the right timeout !! + MSG_error_t res = MSG_task_receive_with_timeout(&task, node.mailbox, 45); // FIXME >> find the right timeout !! - if(res == MSG_OK) // else check deadline condition and keep waiting for a task + if (res == MSG_OK) // else check deadline condition and keep waiting for a task { // get data @@ -262,15 +303,20 @@ int node(int argc, char *argv[]) } } + else if (!strcmp(task_name, "Notify")) { + // someone may be my new neighboor + INFO1("Receiving a 'Notify' request from %s", task_data->issuer_host_name); + notify(&node, task_data->request_id); + } else if (!strcmp(task_name, "Update Finger")) { // someone is telling me that he may be my new finger INFO1("Receiving an 'Update Finger' request from %s", task_data->issuer_host_name); update_finger_table(&node, task_data->request_id, task_data->request_finger); } - else if (!strcmp(task_name, "Notify")) { + else if (!strcmp(task_name, "Notify Predecessor Changed")) { // someone is telling me that he may be my new predecessor - INFO1("Receiving a 'Notify' request from %s", task_data->issuer_host_name); - notify(&node, task_data->request_id); + INFO1("Receiving a 'Notify Predecessor Changed' request from %s", task_data->issuer_host_name); + notify_predecessor_changed(&node, task_data->request_id); } else if (!strcmp(task_name, "Predecessor Leaving")) { // my predecessor is about quitting @@ -315,28 +361,103 @@ static void initialize_first_node(node_t node) { INFO0("Create a new Chord ring..."); - // I am my own successor and predecessor + // I am my own successor, predecessor and fingers int i; for (i = 0; i < NB_BITS; i++) { - node->fingers[i].id = node->id; - node->fingers[i].mailbox = xbt_strdup(node->mailbox); + set_finger(node, i, node->id); } - node->pred_id = node->id; - node->pred_mailbox = node->mailbox; + set_predecessor(node, node->id); print_finger_table(node); } /** - * \brief Makes the current node join the system, knowing the id of a node already in the system + * \brief Makes the current node join the system, knowing the id of a node + * already in the system * \param node the current node * \param known_id id of a node already in the system */ static void join(node_t node, int known_id) { - initialize_finger_table(node, known_id); // determine my fingers, asking to known_id - remote_notify(node, node->fingers[0].id, node->id); // tell my successor that I'm his new predecessor - notify_predecessors(node); // tell others that I may have became their finger - remote_move_keys(node, node->fingers[0].id); // take some key-value pairs from my successor + INFO0("Joining the system"); + // find my predecessor and successor + int pred; + int suc = remote_find_predecessor(node, known_id, node->id); + do { + pred = suc; + suc = remote_find_successor(node, pred, pred); + } while (!is_in_interval(node->id, pred + 1, suc)); + + set_finger(node, 0, suc); + set_predecessor(node, pred); + remote_notify(node, pred); + remote_notify(node, suc); + bootstrap(node, suc); +} + +/** + * \brief Notifies the current node that a node has just joined the network. + * \param node the current node + * \param new_node_id id of the new node in the network + */ +static void notify(node_t node, int new_node_id) +{ + INFO1("A new node %d has joined the network, let's see if it is a neighboor", new_node_id); + if (is_in_interval(new_node_id, node->id + 1, node->fingers[0].id - 1)) { + // new_node_id is my new successor + set_finger(node, 0, new_node_id); + bootstrap(node, new_node_id); + } + + if (is_in_interval(new_node_id, node->pred_id + 1, node->id - 1)) { + // new_node_id is my new predecessor + set_predecessor(node, new_node_id); + bootstrap(node, new_node_id); + } +} + +/** + * \brief Notifies a remote node that the current node has just joined + * the network. + * \param node the current node + * \param notify_id id of the remote node to notify + */ +static void remote_notify(node_t node, int notify_id) +{ + task_data_t req_data = xbt_new0(s_task_data_t, 1); + req_data->request_id = node->id; + req_data->issuer_host_name = MSG_host_get_name(MSG_host_self()); + + // send a "Notify" request to notify_id + INFO2("Sending a 'Notify' request to %d because I have joined the network with id %d", notify_id, node->id); + m_task_t task = MSG_task_create("Notify", 1000, 5000, req_data); + char* mailbox = get_mailbox(notify_id); + msg_comm_t comm = MSG_task_isend(task, mailbox); + xbt_dynar_push(node->comms, &comm); + xbt_free(mailbox); +} + +/** +* \brief Let the current node send queries to fill in its own finger table. +* \param node the current node +* \param ask_to_id id of a node to send queries to +*/ +static void bootstrap(node_t node, int ask_to_id) +{ + INFO0("Filling my finger table"); + int i, pred_id, succ_id; + int pow = 1; + + for (i = 0; i < NB_BITS; i++) + { + pred_id = remote_find_successor(node, ask_to_id, pow); + do { + succ_id = pred_id; + pred_id = remote_find_predecessor(node, pred_id, pred_id); + } while (pred_id >= pow); + + pow = pow << 1; + set_finger(node, i, succ_id); + } } /** @@ -457,10 +578,7 @@ static void update_finger_table(node_t node, int candidate_id, int finger_index) if (is_in_interval(candidate_id, node->id, node->fingers[finger_index].id - 1)) { // INFO3("Candidate %d is between %d and %d!", candidate_id, node->id + pow, node->fingers[finger_index].id - 1); // candidate_id is my new finger - xbt_free(node->fingers[finger_index].mailbox); - node->fingers[finger_index].id = candidate_id; - node->fingers[finger_index].mailbox = get_mailbox(candidate_id); - INFO2("My new finger #%d is %d", finger_index, candidate_id); + set_finger(node, finger_index, candidate_id); print_finger_table(node); if (node->pred_id != node->id) { // FIXME: is this necessary? @@ -542,7 +660,6 @@ static int remote_find_successor(node_t node, int ask_to, int id) return successor; } - /** * \brief Makes the current node find the predecessor node of an id. * \param node the current node @@ -615,18 +732,23 @@ int closest_preceding_finger(node_t node, int id) /** * \brief This function is called periodically. It checks the immediate - * successor of the current node. + * successor and predecessor of the current node. * \param node the current node */ -static void stabilize(node_t node) { - - int x = find_predecessor(node, node->fingers[0].id); - if (is_in_interval(x, node->id + 1, node->fingers[0].id)) { - xbt_free(node->fingers[0].mailbox); - node->fingers[0].id = x; - node->fingers[0].mailbox = get_mailbox(x); +static void stabilize(node_t node) +{ + INFO0("Stabilizing node"); + int succ_id; + succ_id = node->pred_id; + succ_id = remote_find_successor(node, succ_id, succ_id); + if (is_in_interval(succ_id, node->pred_id + 1, node->id - 1)) { + set_predecessor(node, succ_id); + } + succ_id = node->fingers[0].id; + succ_id = remote_find_predecessor(node, succ_id, succ_id); + if (is_in_interval(succ_id, node->id + 1, node->fingers[0].id - 1)) { + set_finger(node, 0, succ_id); } - remote_notify(node, node->fingers[0].id, node->id); } /** @@ -634,15 +756,12 @@ static void stabilize(node_t node) { * \param node the current node * \param candidate_id the possible new predecessor */ -static void notify(node_t node, int predecessor_candidate_id) { +static void notify_predecessor_changed(node_t node, int predecessor_candidate_id) { if (node->pred_id == node->id || is_in_interval(predecessor_candidate_id, node->pred_id, node->id)) { - node->pred_id = predecessor_candidate_id; - node->pred_mailbox = get_mailbox(predecessor_candidate_id); - - INFO1("My new predecessor is %d", predecessor_candidate_id); + set_predecessor(node, predecessor_candidate_id); print_finger_table(node); } else { @@ -656,15 +775,15 @@ static void notify(node_t node, int predecessor_candidate_id) { * \param notify_id id of the node to notify * \param candidate_id the possible new predecessor */ -static void remote_notify(node_t node, int notify_id, int predecessor_candidate_id) { +static void remote_notify_predecessor_changed(node_t node, int notify_id, int predecessor_candidate_id) { task_data_t req_data = xbt_new0(s_task_data_t, 1); req_data->request_id = predecessor_candidate_id; req_data->issuer_host_name = MSG_host_get_name(MSG_host_self()); // send a "Notify" request to notify_id - INFO1("Sending a 'Notify' request to %d", notify_id); - m_task_t task = MSG_task_create("Notify", 1000, 5000, req_data); + INFO1("Sending a 'Notify Predecessor Changed' request to %d", notify_id); + m_task_t task = MSG_task_create("Notify Predecessor Changed", 1000, 5000, req_data); char* mailbox = get_mailbox(notify_id); msg_comm_t comm = MSG_task_isend(task, mailbox); xbt_dynar_push(node->comms, &comm); -- 2.20.1