From: thiery Date: Wed, 12 Jan 2011 14:20:20 +0000 (+0000) Subject: Don't try forever to join the Chord ring X-Git-Tag: v3.6_beta2~530 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/b02ac1c736917a89bec0baa4de9afc17cd29777c?hp=9cc96b0e9d8ac721f1ad89c67827fa62a05726f1 Don't try forever to join the Chord ring git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@9398 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/examples/msg/chord/chord.c b/examples/msg/chord/chord.c index d0b2e825ff..fe9ab9d15a 100644 --- a/examples/msg/chord/chord.c +++ b/examples/msg/chord/chord.c @@ -81,7 +81,7 @@ static void handle_task(node_t node, m_task_t task); // Chord core static void create(node_t node); -static void join(node_t node, int known_id); +static int join(node_t node, int known_id); static void leave(node_t node); static int find_successor(node_t node, int id); static int remote_find_successor(node_t node, int ask_to_id, int id); @@ -197,7 +197,7 @@ static void set_finger(node_t node, int finger_index, int id) node->fingers[finger_index].id = id; xbt_free(node->fingers[finger_index].mailbox); node->fingers[finger_index].mailbox = get_mailbox(id); - INFO2("My new finger #%d is %d", finger_index, id); + DEBUG2("My new finger #%d is %d", finger_index, id); } /** @@ -214,7 +214,7 @@ static void set_predecessor(node_t node, int predecessor_id) node->pred_mailbox = get_mailbox(predecessor_id); } - INFO1("My new predecessor is %d", predecessor_id); + DEBUG1("My new predecessor is %d", predecessor_id); } /** @@ -228,9 +228,11 @@ int node(int argc, char *argv[]) { double init_time = MSG_get_clock(); m_task_t task = NULL; + m_task_t task_received = NULL; msg_comm_t comm_send = NULL; int i; int index; + int join_success = 0; double deadline; double next_stabilize_date = init_time + 10; double next_fix_fingers_date = init_time + 10; @@ -252,6 +254,7 @@ int node(int argc, char *argv[]) if (argc == 3) { // first ring deadline = atof(argv[2]); create(&node); + join_success = 1; } else { int known_id = atoi(argv[2]); @@ -259,81 +262,85 @@ int node(int argc, char *argv[]) deadline = atof(argv[4]); // sleep before starting - INFO1("Let's sleep during %f", sleep_time); + DEBUG1("Let's sleep during %f", sleep_time); MSG_process_sleep(sleep_time); - INFO0("Hey! Let's join the system."); + DEBUG0("Hey! Let's join the system."); - join(&node, known_id); + join_success = join(&node, known_id); } - while (MSG_get_clock() < init_time + deadline) { + if (join_success) { + while (MSG_get_clock() < init_time + deadline) { - if (node.comm_receive == NULL) { - task = NULL; - node.comm_receive = MSG_task_irecv(&task, node.mailbox); - // FIXME: do not make MSG_task_irecv() calls from several functions - } + if (node.comm_receive == NULL) { + task_received = NULL; + node.comm_receive = MSG_task_irecv(&task_received, node.mailbox); + // FIXME: do not make MSG_task_irecv() calls from several functions + } - if (!MSG_comm_test(node.comm_receive)) { + if (!MSG_comm_test(node.comm_receive)) { - // no task was received: make some periodic calls - if (MSG_get_clock() >= next_stabilize_date) { - stabilize(&node); - next_stabilize_date = MSG_get_clock() + 10; - } - else if (MSG_get_clock() >= next_fix_fingers_date) { - fix_fingers(&node); - next_fix_fingers_date = MSG_get_clock() + 10; - } - else if (MSG_get_clock() >= next_check_predecessor_date) { - check_predecessor(&node); - next_check_predecessor_date = MSG_get_clock() + 10; + // no task was received: make some periodic calls + if (MSG_get_clock() >= next_stabilize_date) { + stabilize(&node); + next_stabilize_date = MSG_get_clock() + 10; + } + else if (MSG_get_clock() >= next_fix_fingers_date) { + fix_fingers(&node); + next_fix_fingers_date = MSG_get_clock() + 10; + } + else if (MSG_get_clock() >= next_check_predecessor_date) { + check_predecessor(&node); + next_check_predecessor_date = MSG_get_clock() + 10; + } + else { + // nothing to do: sleep for a while + MSG_process_sleep(5); + } } else { - // nothing to do: sleep for a while - MSG_process_sleep(5); - } - } - else { - // a transfer has occured + // a transfer has occured - MSG_error_t status = MSG_comm_get_status(node.comm_receive); - MSG_comm_destroy(node.comm_receive); - node.comm_receive = NULL; + MSG_error_t status = MSG_comm_get_status(node.comm_receive); + MSG_comm_destroy(node.comm_receive); + node.comm_receive = NULL; - if (status != MSG_OK) { - INFO0("Failed to receive a task. Nevermind."); + if (status != MSG_OK) { + DEBUG0("Failed to receive a task. Nevermind."); + } + else { + // the task was successfully received + handle_task(&node, task_received); + } } - else { - // the task was successfully received - handle_task(&node, task); + + // see if some communications are finished + while ((index = MSG_comm_testany(node.comms)) != -1) { + comm_send = xbt_dynar_get_as(node.comms, index, msg_comm_t); + MSG_error_t status = MSG_comm_get_status(comm_send); + xbt_dynar_remove_at(node.comms, index, &comm_send); + DEBUG3("Communication %p is finished with status %d, dynar size is now %lu", + comm_send, status, xbt_dynar_length(node.comms)); + MSG_comm_destroy(comm_send); } } - // see if some communications are finished - while ((index = MSG_comm_testany(node.comms)) != -1) { - comm_send = xbt_dynar_get_as(node.comms, index, msg_comm_t); - MSG_error_t status = MSG_comm_get_status(comm_send); - xbt_dynar_remove_at(node.comms, index, &comm_send); - DEBUG3("Communication %p is finished with status %d, dynar size is now %lu", - comm_send, status, xbt_dynar_length(node.comms)); + // clean unfinished comms sent + unsigned int cursor; + xbt_dynar_foreach(node.comms, cursor, comm_send) { + task = MSG_comm_get_task(comm_send); + MSG_task_cancel(task); + xbt_free(MSG_task_get_data(task)); + MSG_task_destroy(task); MSG_comm_destroy(comm_send); + // FIXME: the task is actually not destroyed because MSG thinks that the other side (whose process is dead) is still using it } - } - // clean unfinished comms sent - unsigned int cursor; - xbt_dynar_foreach(node.comms, cursor, comm_send) { - task = MSG_comm_get_task(comm_send); - MSG_task_cancel(task); - xbt_free(MSG_task_get_data(task)); - MSG_task_destroy(task); - MSG_comm_destroy(comm_send); - // FIXME: the task is actually not destroyed because MSG thinks that the other side (whose process is dead) is still using it + // leave the ring + leave(&node); } - // leave the ring and stop the simulation - leave(&node); + // stop the simulation xbt_dynar_free(&node.comms); xbt_free(node.mailbox); xbt_free(node.pred_mailbox); @@ -351,6 +358,7 @@ int node(int argc, char *argv[]) */ static void handle_task(node_t node, m_task_t task) { + DEBUG1("Handling task %p", task); msg_comm_t comm = NULL; char* mailbox = NULL; task_data_t task_data = (task_data_t) MSG_task_get_data(task); @@ -359,7 +367,7 @@ static void handle_task(node_t node, m_task_t task) { switch (type) { case TASK_FIND_SUCCESSOR: - INFO2("Receiving a 'Find Successor' request from %s for id %d", + DEBUG2("Receiving a 'Find Successor' request from %s for id %d", task_data->issuer_host_name, task_data->request_id); // is my successor the successor? if (is_in_interval(task_data->request_id, node->id + 1, node->fingers[0].id)) { @@ -367,14 +375,14 @@ static void handle_task(node_t node, m_task_t task) { task_data->answer_id = node->fingers[0].id; comm = MSG_task_isend(task, task_data->answer_to); xbt_dynar_push(node->comms, &comm); - INFO3("Sending back a 'Find Successor Answer' to %s: the successor of %d is %d", + DEBUG3("Sending back a 'Find Successor Answer' to %s: the successor of %d is %d", task_data->issuer_host_name, task_data->request_id, task_data->answer_id); } else { // otherwise, forward the request to the closest preceding finger in my table int closest = closest_preceding_node(node, task_data->request_id); - INFO2("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d", + DEBUG2("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d", task_data->request_id, closest); mailbox = get_mailbox(closest); comm = MSG_task_isend(task, mailbox); @@ -384,19 +392,19 @@ static void handle_task(node_t node, m_task_t task) { break; case TASK_GET_PREDECESSOR: - INFO1("Receiving a 'Get Predecessor' request from %s", task_data->issuer_host_name); + DEBUG1("Receiving a 'Get Predecessor' request from %s", task_data->issuer_host_name); task_data->type = TASK_GET_PREDECESSOR_ANSWER; task_data->answer_id = node->pred_id; comm = MSG_task_isend(task, task_data->answer_to); xbt_dynar_push(node->comms, &comm); - INFO3("Sending back a 'Get Predecessor Answer' to %s via mailbox '%s': my predecessor is %d", + DEBUG3("Sending back a 'Get Predecessor Answer' to %s via mailbox '%s': my predecessor is %d", task_data->issuer_host_name, task_data->answer_to, task_data->answer_id); break; case TASK_NOTIFY: // someone is telling me that he may be my new predecessor - INFO1("Receiving a 'Notify' request from %s", task_data->issuer_host_name); + DEBUG1("Receiving a 'Notify' request from %s", task_data->issuer_host_name); notify(node, task_data->request_id); xbt_free(task_data); MSG_task_destroy(task); @@ -404,7 +412,7 @@ static void handle_task(node_t node, m_task_t task) { case TASK_PREDECESSOR_LEAVING: // my predecessor is about to quit - INFO1("Receiving a 'Predecessor Leaving' message from %s", task_data->issuer_host_name); + DEBUG1("Receiving a 'Predecessor Leaving' message from %s", task_data->issuer_host_name); // modify my predecessor set_predecessor(node, task_data->request_id); xbt_free(task_data); @@ -417,7 +425,7 @@ static void handle_task(node_t node, m_task_t task) { case TASK_SUCCESSOR_LEAVING: // my successor is about to quit - INFO1("Receiving a 'Successor Leaving' message from %s", task_data->issuer_host_name); + DEBUG1("Receiving a 'Successor Leaving' message from %s", task_data->issuer_host_name); // modify my successor FIXME : this should be implicit ? set_finger(node, 0, task_data->request_id); xbt_free(task_data); @@ -429,7 +437,7 @@ static void handle_task(node_t node, m_task_t task) { default: CRITICAL1("Received an unexpected task: %d", type); - xbt_abort(); + //xbt_abort(); } } @@ -439,7 +447,7 @@ static void handle_task(node_t node, m_task_t task) { */ static void create(node_t node) { - INFO0("Create a new Chord ring..."); + DEBUG0("Create a new Chord ring..."); set_predecessor(node, -1); // -1 means that I have no predecessor print_finger_table(node); } @@ -449,22 +457,23 @@ static void create(node_t node) * already in the ring * \param node the current node * \param known_id id of a node already in the ring + * \return 1 if the join operation succeeded, 0 otherwise */ -static void join(node_t node, int known_id) +static int join(node_t node, int known_id) { INFO2("Joining the ring with id %d, knowing node %d", node->id, known_id); set_predecessor(node, -1); // no predecessor (yet) - int successor_id; - do { - successor_id = remote_find_successor(node, known_id, node->id); - if (successor_id == -1) { - INFO0("I really want to join the ring. Let's try again."); - } - } while (successor_id == -1); + int successor_id = remote_find_successor(node, known_id, node->id); + if (successor_id == -1) { + DEBUG0("Cannot join the ring."); + } + else { + set_finger(node, 0, successor_id); + print_finger_table(node); + } - set_finger(node, 0, successor_id); - print_finger_table(node); + return successor_id != -1; } /** @@ -473,7 +482,7 @@ static void join(node_t node, int known_id) */ static void leave(node_t node) { - INFO0("Well Guys! I Think it's time for me to quit ;)"); + DEBUG0("Well Guys! I Think it's time for me to quit ;)"); quit_notify(node, 1); // notify to my successor ( >>> 1 ); quit_notify(node, -1); // notify my predecessor ( >>> -1); // TODO ... @@ -557,19 +566,20 @@ static int remote_find_successor(node_t node, int ask_to, int id) req_data->issuer_host_name = MSG_host_get_name(MSG_host_self()); // send a "Find Successor" request to ask_to_id - INFO2("Sending a 'Find Successor' request to %d for id %d", ask_to, id); m_task_t task = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data); + DEBUG3("Sending a 'Find Successor' request (task %p) to %d for id %d", task, ask_to, id); MSG_error_t res = MSG_task_send_with_timeout(task, mailbox, 50); if (res != MSG_OK) { - INFO2("Failed to send the 'Find Successor' request to %d for id %d", ask_to, id); + DEBUG2("Failed to send the 'Find Successor' request to %d for id %d", ask_to, id); MSG_task_destroy(task); xbt_free(req_data); } else { // receive the answer - DEBUG2("Sent a 'Find Successor' request to %d for key %d, waiting for the answer", ask_to, id); + DEBUG3("Sent a 'Find Successor' request (task %p) to %d for key %d, waiting for the answer", + task, ask_to, id); do { if (node->comm_receive == NULL) { @@ -580,7 +590,7 @@ static int remote_find_successor(node_t node, int ask_to, int id) res = MSG_comm_wait(node->comm_receive, 50); if (res != MSG_OK) { - INFO1("Failed to receive the answer to my 'Find Successor' request: %d", res); + DEBUG2("Failed to receive the answer to my 'Find Successor' request (task %p): %d", task, res); stop = 1; // MSG_comm_destroy(node->comm_receive); } @@ -592,7 +602,8 @@ static int remote_find_successor(node_t node, int ask_to, int id) handle_task(node, task); } else { - INFO2("Received the answer to my 'Find Successor' request: the successor of key %d is %d", id, successor); + DEBUG3("Received the answer to my 'Find Successor' request (task %p): the successor of key %d is %d", + task, id, successor); successor = ans_data->answer_id; stop = 1; MSG_task_destroy(task); @@ -625,12 +636,12 @@ static int remote_get_predecessor(node_t node, int ask_to) req_data->issuer_host_name = MSG_host_get_name(MSG_host_self()); // send a "Get Predecessor" request to ask_to_id - INFO1("Sending a 'Get Predecessor' request to %d", ask_to); + DEBUG1("Sending a 'Get Predecessor' request to %d", ask_to); m_task_t task = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data); MSG_error_t res = MSG_task_send_with_timeout(task, mailbox, 50); if (res != MSG_OK) { - INFO1("Failed to send the 'Get Predecessor' request to %d", ask_to); + DEBUG1("Failed to send the 'Get Predecessor' request to %d", ask_to); MSG_task_destroy(task); xbt_free(req_data); } @@ -648,7 +659,7 @@ static int remote_get_predecessor(node_t node, int ask_to) res = MSG_comm_wait(node->comm_receive, 50); if (res != MSG_OK) { - INFO1("Failed to receive the answer to my 'Get Predecessor' request: %d", res); + DEBUG1("Failed to receive the answer to my 'Get Predecessor' request: %d", res); stop = 1; // MSG_comm_destroy(node->comm_receive); } @@ -660,7 +671,7 @@ static int remote_get_predecessor(node_t node, int ask_to) handle_task(node, task); } else { - INFO2("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to, predecessor_id); + DEBUG2("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to, predecessor_id); predecessor_id = ans_data->answer_id; stop = 1; MSG_task_destroy(task); @@ -700,7 +711,7 @@ int closest_preceding_node(node_t node, int id) */ static void stabilize(node_t node) { - INFO0("Stabilizing node"); + DEBUG0("Stabilizing node"); // get the predecessor of my immediate successor int candidate_id; @@ -736,7 +747,7 @@ static void notify(node_t node, int predecessor_candidate_id) { print_finger_table(node); } else { - INFO1("I don't have to change my predecessor to %d", predecessor_candidate_id); + DEBUG1("I don't have to change my predecessor to %d", predecessor_candidate_id); } } @@ -754,7 +765,7 @@ static void remote_notify(node_t node, int notify_id, int predecessor_candidate_ req_data->issuer_host_name = MSG_host_get_name(MSG_host_self()); // send a "Notify" request to notify_id - INFO1("Sending a 'Notify' request to %d", notify_id); + DEBUG1("Sending a 'Notify' request to %d", notify_id); m_task_t task = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data); char* mailbox = get_mailbox(notify_id); msg_comm_t comm = MSG_task_isend(task, mailbox); @@ -769,7 +780,7 @@ static void remote_notify(node_t node, int notify_id, int predecessor_candidate_ */ static void fix_fingers(node_t node) { - INFO0("Fixing fingers"); + DEBUG0("Fixing fingers"); int i = node->next_finger_to_fix; int id = find_successor(node, node->id + powers2[i]); if (id != -1) { @@ -789,7 +800,7 @@ static void fix_fingers(node_t node) { */ static void check_predecessor(node_t node) { - INFO0("Checking whether my predecessor is alive"); + DEBUG0("Checking whether my predecessor is alive"); // TODO }