X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/3ebaa9f6d0886b21e5b01506f8136900f78e008e..b02ac1c736917a89bec0baa4de9afc17cd29777c:/examples/msg/chord/chord.c diff --git a/examples/msg/chord/chord.c b/examples/msg/chord/chord.c index a505f6c8c2..fe9ab9d15a 100644 --- a/examples/msg/chord/chord.c +++ b/examples/msg/chord/chord.c @@ -40,17 +40,28 @@ typedef struct node { } s_node_t, *node_t; /** - * Task data + * Types of tasks exchanged between nodes. + */ +typedef enum { + TASK_FIND_SUCCESSOR, + TASK_FIND_SUCCESSOR_ANSWER, + TASK_GET_PREDECESSOR, + TASK_GET_PREDECESSOR_ANSWER, + TASK_NOTIFY, + TASK_SUCCESSOR_LEAVING, + TASK_PREDECESSOR_LEAVING +} e_task_type_t; + +/** + * Data attached with the tasks sent and received */ typedef struct task_data { - int request_id; - int request_finger; - int answer_id; - const char* answer_to; - const char* issuer_host_name; // used for logging - int successor_id; // used when quitting - int pred_id; // used when quitting - // FIXME: remove successor_id and pred_id, request_id is enough + e_task_type_t type; // type of task + int request_id; // id paramater (used by some types of tasks) + int request_finger; // finger parameter (used by some types of tasks) + int answer_id; // answer (used by some types of tasks) + const char* answer_to; // mailbox to send an answer to + const char* issuer_host_name; // used for logging } s_task_data_t, *task_data_t; static int powers2[NB_BITS]; @@ -70,7 +81,7 @@ static void handle_task(node_t node, m_task_t task); // Chord core static void create(node_t node); -static void join(node_t node, int known_id); +static int join(node_t node, int known_id); static void leave(node_t node); static int find_successor(node_t node, int id); static int remote_find_successor(node_t node, int ask_to_id, int id); @@ -83,6 +94,9 @@ static void fix_fingers(node_t node); static void check_predecessor(node_t node); static void quit_notify(node_t node, int to); +/** + * \brief Global initialization of the Chord simulation. + */ static void chord_initialize(void) { // compute the powers of 2 once for all @@ -95,7 +109,7 @@ static void chord_initialize(void) } /** - * \brief Turns an id into an equivalent id in [0, NB_KEYS[ + * \brief Turns an id into an equivalent id in [0, NB_KEYS). * \param id an id * \return the corresponding normalized id */ @@ -149,7 +163,6 @@ static int is_in_interval(int id, int start, int end) * \brief Gets the mailbox name of a host given its chord id. * \param node_id id of a node * \return the name of its mailbox - * FIXME: free the memory */ static char* get_mailbox(int node_id) { @@ -184,7 +197,7 @@ static void set_finger(node_t node, int finger_index, int id) node->fingers[finger_index].id = id; xbt_free(node->fingers[finger_index].mailbox); node->fingers[finger_index].mailbox = get_mailbox(id); - INFO2("My new finger #%d is %d", finger_index, id); + DEBUG2("My new finger #%d is %d", finger_index, id); } /** @@ -201,7 +214,7 @@ static void set_predecessor(node_t node, int predecessor_id) node->pred_mailbox = get_mailbox(predecessor_id); } - INFO1("My new predecessor is %d", predecessor_id); + DEBUG1("My new predecessor is %d", predecessor_id); } /** @@ -215,9 +228,11 @@ int node(int argc, char *argv[]) { double init_time = MSG_get_clock(); m_task_t task = NULL; + m_task_t task_received = NULL; msg_comm_t comm_send = NULL; int i; int index; + int join_success = 0; double deadline; double next_stabilize_date = init_time + 10; double next_fix_fingers_date = init_time + 10; @@ -239,6 +254,7 @@ int node(int argc, char *argv[]) if (argc == 3) { // first ring deadline = atof(argv[2]); create(&node); + join_success = 1; } else { int known_id = atoi(argv[2]); @@ -246,79 +262,85 @@ int node(int argc, char *argv[]) deadline = atof(argv[4]); // sleep before starting - INFO1("Let's sleep during %f", sleep_time); + DEBUG1("Let's sleep during %f", sleep_time); MSG_process_sleep(sleep_time); - INFO0("Hey! Let's join the system."); + DEBUG0("Hey! Let's join the system."); - join(&node, known_id); + join_success = join(&node, known_id); } - while (MSG_get_clock() < init_time + deadline) { + if (join_success) { + while (MSG_get_clock() < init_time + deadline) { - if (node.comm_receive == NULL) { - task = NULL; - node.comm_receive = MSG_task_irecv(&task, node.mailbox); - // FIXME: do not make MSG_task_irecv() calls from several functions - } + if (node.comm_receive == NULL) { + task_received = NULL; + node.comm_receive = MSG_task_irecv(&task_received, node.mailbox); + // FIXME: do not make MSG_task_irecv() calls from several functions + } - if (!MSG_comm_test(node.comm_receive)) { + if (!MSG_comm_test(node.comm_receive)) { - // no task was received: make some periodic calls - if (MSG_get_clock() >= next_stabilize_date) { - stabilize(&node); - next_stabilize_date = MSG_get_clock() + 10; - } - else if (MSG_get_clock() >= next_fix_fingers_date) { - fix_fingers(&node); - next_fix_fingers_date = MSG_get_clock() + 10; - } - else if (MSG_get_clock() >= next_check_predecessor_date) { - check_predecessor(&node); - next_check_predecessor_date = MSG_get_clock() + 10; + // no task was received: make some periodic calls + if (MSG_get_clock() >= next_stabilize_date) { + stabilize(&node); + next_stabilize_date = MSG_get_clock() + 10; + } + else if (MSG_get_clock() >= next_fix_fingers_date) { + fix_fingers(&node); + next_fix_fingers_date = MSG_get_clock() + 10; + } + else if (MSG_get_clock() >= next_check_predecessor_date) { + check_predecessor(&node); + next_check_predecessor_date = MSG_get_clock() + 10; + } + else { + // nothing to do: sleep for a while + MSG_process_sleep(5); + } } else { - // nothing to do: sleep for a while - MSG_process_sleep(5); - } - } - else { - // a transfer has occured + // a transfer has occured - MSG_error_t status = MSG_comm_get_status(node.comm_receive); - MSG_comm_destroy(node.comm_receive); - node.comm_receive = NULL; + MSG_error_t status = MSG_comm_get_status(node.comm_receive); + MSG_comm_destroy(node.comm_receive); + node.comm_receive = NULL; - if (status != MSG_OK) { - INFO0("Failed to receive a task. Nevermind."); + if (status != MSG_OK) { + DEBUG0("Failed to receive a task. Nevermind."); + } + else { + // the task was successfully received + handle_task(&node, task_received); + } } - else { - // the task was successfully received - handle_task(&node, task); + + // see if some communications are finished + while ((index = MSG_comm_testany(node.comms)) != -1) { + comm_send = xbt_dynar_get_as(node.comms, index, msg_comm_t); + MSG_error_t status = MSG_comm_get_status(comm_send); + xbt_dynar_remove_at(node.comms, index, &comm_send); + DEBUG3("Communication %p is finished with status %d, dynar size is now %lu", + comm_send, status, xbt_dynar_length(node.comms)); + MSG_comm_destroy(comm_send); } } - // see if some communications are finished - while ((index = MSG_comm_testany(node.comms)) != -1) { - comm_send = xbt_dynar_get_as(node.comms, index, msg_comm_t); - xbt_dynar_remove_at(node.comms, index, &comm_send); - DEBUG3("Communication %p is finished with status %d, dynar size is now %lu", - comm_send, MSG_comm_get_status(comm_send), xbt_dynar_length(node.comms)); + // clean unfinished comms sent + unsigned int cursor; + xbt_dynar_foreach(node.comms, cursor, comm_send) { + task = MSG_comm_get_task(comm_send); + MSG_task_cancel(task); + xbt_free(MSG_task_get_data(task)); + MSG_task_destroy(task); MSG_comm_destroy(comm_send); + // FIXME: the task is actually not destroyed because MSG thinks that the other side (whose process is dead) is still using it } - } - // clean unfinished comms sent - unsigned int cursor; - xbt_dynar_foreach(node.comms, cursor, comm_send) { - task = MSG_comm_get_task(comm_send); - MSG_task_cancel(task); - xbt_free(MSG_task_get_data(task)); - MSG_task_destroy(task); - MSG_comm_destroy(comm_send); + // leave the ring + leave(&node); } - // leave the ring and stop the simulation - leave(&node); + // stop the simulation xbt_dynar_free(&node.comms); xbt_free(node.mailbox); xbt_free(node.pred_mailbox); @@ -336,78 +358,86 @@ int node(int argc, char *argv[]) */ static void handle_task(node_t node, m_task_t task) { + DEBUG1("Handling task %p", task); msg_comm_t comm = NULL; char* mailbox = NULL; - const char* task_name = MSG_task_get_name(task); task_data_t task_data = (task_data_t) MSG_task_get_data(task); - - if (!strcmp(task_name, "Find Successor")) { - INFO2("Receiving a 'Find Successor' request from %s for id %d", - task_data->issuer_host_name, task_data->request_id); - // is my successor the successor? - if (is_in_interval(task_data->request_id, node->id + 1, node->fingers[0].id)) { - task_data->answer_id = node->fingers[0].id; - MSG_task_set_name(task, "Find Successor Answer"); - comm = MSG_task_isend(task, task_data->answer_to); - xbt_dynar_push(node->comms, &comm); - INFO3("Sending back a 'Find Successor Answer' to %s: the successor of %d is %d", + e_task_type_t type = task_data->type; + + switch (type) { + + case TASK_FIND_SUCCESSOR: + DEBUG2("Receiving a 'Find Successor' request from %s for id %d", + task_data->issuer_host_name, task_data->request_id); + // is my successor the successor? + if (is_in_interval(task_data->request_id, node->id + 1, node->fingers[0].id)) { + task_data->type = TASK_FIND_SUCCESSOR_ANSWER; + task_data->answer_id = node->fingers[0].id; + comm = MSG_task_isend(task, task_data->answer_to); + xbt_dynar_push(node->comms, &comm); + DEBUG3("Sending back a 'Find Successor Answer' to %s: the successor of %d is %d", task_data->issuer_host_name, task_data->request_id, task_data->answer_id); - } - else { - // otherwise, forward the request to the closest preceding finger in my table - int closest = closest_preceding_node(node, task_data->request_id); - INFO2("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d", - task_data->request_id, closest); - mailbox = get_mailbox(closest); - comm = MSG_task_isend(task, mailbox); - xbt_dynar_push(node->comms, &comm); - xbt_free(mailbox); - } - } + } + else { + // otherwise, forward the request to the closest preceding finger in my table + int closest = closest_preceding_node(node, task_data->request_id); + DEBUG2("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d", + task_data->request_id, closest); + mailbox = get_mailbox(closest); + comm = MSG_task_isend(task, mailbox); + xbt_dynar_push(node->comms, &comm); + xbt_free(mailbox); + } + break; - else if (!strcmp(task_name, "Get Predecessor")) { - INFO1("Receiving a 'Get Predecessor' request from %s", task_data->issuer_host_name); - task_data->answer_id = node->pred_id; - MSG_task_set_name(task, "Get Predecessor Answer"); - comm = MSG_task_isend(task, task_data->answer_to); - xbt_dynar_push(node->comms, &comm); - INFO3("Sending back a 'Get Predecessor Answer' to %s via mailbox '%s': my predecessor is %d", + case TASK_GET_PREDECESSOR: + DEBUG1("Receiving a 'Get Predecessor' request from %s", task_data->issuer_host_name); + task_data->type = TASK_GET_PREDECESSOR_ANSWER; + task_data->answer_id = node->pred_id; + comm = MSG_task_isend(task, task_data->answer_to); + xbt_dynar_push(node->comms, &comm); + DEBUG3("Sending back a 'Get Predecessor Answer' to %s via mailbox '%s': my predecessor is %d", task_data->issuer_host_name, task_data->answer_to, task_data->answer_id); - } - - else if (!strcmp(task_name, "Notify")) { - // someone is telling me that he may be my new predecessor - INFO1("Receiving a 'Notify' request from %s", task_data->issuer_host_name); - notify(node, task_data->request_id); - xbt_free(task_data); - MSG_task_destroy(task); - } - - else if (!strcmp(task_name, "Predecessor Leaving")) { - // my predecessor is about to quit - INFO1("Receiving a 'Predecessor Leaving' message from %s", task_data->issuer_host_name); - // modify my predecessor - set_predecessor(node, task_data->pred_id); - xbt_free(task_data); - MSG_task_destroy(task); - /*TODO : - >> notify my new predecessor - >> send a notify_predecessors !! - */ - } - - else if (!strcmp(task_name, "Successor Leaving")) { - // my successor is about to quit - INFO1("Receiving a 'Successor Leaving' message from %s", task_data->issuer_host_name); - // modify my successor FIXME : this should be implicit ? - set_finger(node, 0, task_data->successor_id); - xbt_free(task_data); - MSG_task_destroy(task); - /* TODO - >> notify my new successor - >> update my table & predecessors table */ + break; + + case TASK_NOTIFY: + // someone is telling me that he may be my new predecessor + DEBUG1("Receiving a 'Notify' request from %s", task_data->issuer_host_name); + notify(node, task_data->request_id); + xbt_free(task_data); + MSG_task_destroy(task); + break; + + case TASK_PREDECESSOR_LEAVING: + // my predecessor is about to quit + DEBUG1("Receiving a 'Predecessor Leaving' message from %s", task_data->issuer_host_name); + // modify my predecessor + set_predecessor(node, task_data->request_id); + xbt_free(task_data); + MSG_task_destroy(task); + /*TODO : + >> notify my new predecessor + >> send a notify_predecessors !! + */ + break; + + case TASK_SUCCESSOR_LEAVING: + // my successor is about to quit + DEBUG1("Receiving a 'Successor Leaving' message from %s", task_data->issuer_host_name); + // modify my successor FIXME : this should be implicit ? + set_finger(node, 0, task_data->request_id); + xbt_free(task_data); + MSG_task_destroy(task); + /* TODO + >> notify my new successor + >> update my table & predecessors table */ + break; + + default: + CRITICAL1("Received an unexpected task: %d", type); + //xbt_abort(); } } @@ -417,7 +447,7 @@ static void handle_task(node_t node, m_task_t task) { */ static void create(node_t node) { - INFO0("Create a new Chord ring..."); + DEBUG0("Create a new Chord ring..."); set_predecessor(node, -1); // -1 means that I have no predecessor print_finger_table(node); } @@ -427,22 +457,23 @@ static void create(node_t node) * already in the ring * \param node the current node * \param known_id id of a node already in the ring + * \return 1 if the join operation succeeded, 0 otherwise */ -static void join(node_t node, int known_id) +static int join(node_t node, int known_id) { INFO2("Joining the ring with id %d, knowing node %d", node->id, known_id); set_predecessor(node, -1); // no predecessor (yet) - int successor_id; - do { - successor_id = remote_find_successor(node, known_id, node->id); - if (successor_id == -1) { - INFO0("I really want to join the ring. Let's try again."); - } - } while (successor_id == -1); + int successor_id = remote_find_successor(node, known_id, node->id); + if (successor_id == -1) { + DEBUG0("Cannot join the ring."); + } + else { + set_finger(node, 0, successor_id); + print_finger_table(node); + } - set_finger(node, 0, successor_id); - print_finger_table(node); + return successor_id != -1; } /** @@ -451,7 +482,7 @@ static void join(node_t node, int known_id) */ static void leave(node_t node) { - INFO0("Well Guys! I Think it's time for me to quit ;)"); + DEBUG0("Well Guys! I Think it's time for me to quit ;)"); quit_notify(node, 1); // notify to my successor ( >>> 1 ); quit_notify(node, -1); // notify my predecessor ( >>> -1); // TODO ... @@ -478,7 +509,7 @@ static void quit_notify(node_t node, int to) to_mailbox = node->fingers[0].mailbox; INFO2("Telling my Successor %d about my departure via mailbox %s", node->fingers[0].id, to_mailbox); - task_name = "Predecessor Leaving"; + req_data->type = TASK_PREDECESSOR_LEAVING; } else if (to == -1) { // notify my predecessor @@ -489,9 +520,9 @@ static void quit_notify(node_t node, int to) to_mailbox = node->pred_mailbox; INFO2("Telling my Predecessor %d about my departure via mailbox %s", node->pred_id, to_mailbox); - task_name = "Successor Leaving"; + req_data->type = TASK_SUCCESSOR_LEAVING; } - m_task_t task = MSG_task_create(task_name, COMP_SIZE, COMM_SIZE, req_data); + m_task_t task = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data); //char* mailbox = get_mailbox(to_mailbox); msg_comm_t comm = MSG_task_isend(task, to_mailbox); xbt_dynar_push(node->comms, &comm); @@ -529,24 +560,26 @@ static int remote_find_successor(node_t node, int ask_to, int id) int stop = 0; char* mailbox = get_mailbox(ask_to); task_data_t req_data = xbt_new0(s_task_data_t, 1); + req_data->type = TASK_FIND_SUCCESSOR; req_data->request_id = id; req_data->answer_to = node->mailbox; req_data->issuer_host_name = MSG_host_get_name(MSG_host_self()); // send a "Find Successor" request to ask_to_id - INFO2("Sending a 'Find Successor' request to %d for id %d", ask_to, id); - m_task_t task = MSG_task_create("Find Successor", COMP_SIZE, COMM_SIZE, req_data); + m_task_t task = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data); + DEBUG3("Sending a 'Find Successor' request (task %p) to %d for id %d", task, ask_to, id); MSG_error_t res = MSG_task_send_with_timeout(task, mailbox, 50); if (res != MSG_OK) { - INFO2("Failed to send the 'Find Successor' request to %d for id %d", ask_to, id); + DEBUG2("Failed to send the 'Find Successor' request to %d for id %d", ask_to, id); MSG_task_destroy(task); xbt_free(req_data); } else { // receive the answer - INFO2("Sent a 'Find Successor' request to %d for key %d, waiting for the answer", ask_to, id); + DEBUG3("Sent a 'Find Successor' request (task %p) to %d for key %d, waiting for the answer", + task, ask_to, id); do { if (node->comm_receive == NULL) { @@ -557,20 +590,20 @@ static int remote_find_successor(node_t node, int ask_to, int id) res = MSG_comm_wait(node->comm_receive, 50); if (res != MSG_OK) { - INFO1("Failed to receive the answer to my 'Find Successor' request: %d", res); + DEBUG2("Failed to receive the answer to my 'Find Successor' request (task %p): %d", task, res); stop = 1; // MSG_comm_destroy(node->comm_receive); } else { task = MSG_comm_get_task(node->comm_receive); - INFO1("Received a task '%s'", MSG_task_get_name(task)); + task_data_t ans_data = MSG_task_get_data(task); - if (strcmp(MSG_task_get_name(task), "Find Successor Answer")) { + if (ans_data->type != TASK_FIND_SUCCESSOR_ANSWER) { handle_task(node, task); } else { - INFO2("This is the answer to my 'Find Successor' request: the successor of key %d is %d", id, successor); - task_data_t ans_data = MSG_task_get_data(task); + DEBUG3("Received the answer to my 'Find Successor' request (task %p): the successor of key %d is %d", + task, id, successor); successor = ans_data->answer_id; stop = 1; MSG_task_destroy(task); @@ -598,23 +631,24 @@ static int remote_get_predecessor(node_t node, int ask_to) int stop = 0; char* mailbox = get_mailbox(ask_to); task_data_t req_data = xbt_new0(s_task_data_t, 1); + req_data->type = TASK_GET_PREDECESSOR; req_data->answer_to = node->mailbox; req_data->issuer_host_name = MSG_host_get_name(MSG_host_self()); // send a "Get Predecessor" request to ask_to_id - INFO1("Sending a 'Get Predecessor' request to %d", ask_to); - m_task_t task = MSG_task_create("Get Predecessor", COMP_SIZE, COMM_SIZE, req_data); + DEBUG1("Sending a 'Get Predecessor' request to %d", ask_to); + m_task_t task = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data); MSG_error_t res = MSG_task_send_with_timeout(task, mailbox, 50); if (res != MSG_OK) { - INFO1("Failed to send the 'Get Predecessor' request to %d", ask_to); + DEBUG1("Failed to send the 'Get Predecessor' request to %d", ask_to); MSG_task_destroy(task); xbt_free(req_data); } else { // receive the answer - INFO2("Sent 'Get Predecessor' request to %d, waiting for the answer on my mailbox '%s'", ask_to, req_data->answer_to); + DEBUG2("Sent 'Get Predecessor' request to %d, waiting for the answer on my mailbox '%s'", ask_to, req_data->answer_to); do { if (node->comm_receive == NULL) { // FIXME simplify this @@ -625,20 +659,19 @@ static int remote_get_predecessor(node_t node, int ask_to) res = MSG_comm_wait(node->comm_receive, 50); if (res != MSG_OK) { - INFO1("Failed to receive the answer to my 'Get Predecessor' request: %d", res); + DEBUG1("Failed to receive the answer to my 'Get Predecessor' request: %d", res); stop = 1; // MSG_comm_destroy(node->comm_receive); } else { task = MSG_comm_get_task(node->comm_receive); - INFO1("Received a task '%s'", MSG_task_get_name(task)); + task_data_t ans_data = MSG_task_get_data(task); - if (strcmp(MSG_task_get_name(task), "Get Predecessor Answer")) { + if (ans_data->type != TASK_GET_PREDECESSOR_ANSWER) { handle_task(node, task); } else { - INFO2("This is the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to, predecessor_id); - task_data_t ans_data = MSG_task_get_data(task); + DEBUG2("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to, predecessor_id); predecessor_id = ans_data->answer_id; stop = 1; MSG_task_destroy(task); @@ -678,7 +711,7 @@ int closest_preceding_node(node_t node, int id) */ static void stabilize(node_t node) { - INFO0("Stabilizing node"); + DEBUG0("Stabilizing node"); // get the predecessor of my immediate successor int candidate_id; @@ -714,7 +747,7 @@ static void notify(node_t node, int predecessor_candidate_id) { print_finger_table(node); } else { - INFO1("I don't have to change my predecessor to %d", predecessor_candidate_id); + DEBUG1("I don't have to change my predecessor to %d", predecessor_candidate_id); } } @@ -727,12 +760,13 @@ static void notify(node_t node, int predecessor_candidate_id) { static void remote_notify(node_t node, int notify_id, int predecessor_candidate_id) { task_data_t req_data = xbt_new0(s_task_data_t, 1); + req_data->type = TASK_NOTIFY; req_data->request_id = predecessor_candidate_id; req_data->issuer_host_name = MSG_host_get_name(MSG_host_self()); // send a "Notify" request to notify_id - INFO1("Sending a 'Notify' request to %d", notify_id); - m_task_t task = MSG_task_create("Notify", COMP_SIZE, COMM_SIZE, req_data); + DEBUG1("Sending a 'Notify' request to %d", notify_id); + m_task_t task = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data); char* mailbox = get_mailbox(notify_id); msg_comm_t comm = MSG_task_isend(task, mailbox); xbt_dynar_push(node->comms, &comm); @@ -746,7 +780,7 @@ static void remote_notify(node_t node, int notify_id, int predecessor_candidate_ */ static void fix_fingers(node_t node) { - INFO0("Fixing fingers"); + DEBUG0("Fixing fingers"); int i = node->next_finger_to_fix; int id = find_successor(node, node->id + powers2[i]); if (id != -1) { @@ -766,7 +800,7 @@ static void fix_fingers(node_t node) { */ static void check_predecessor(node_t node) { - INFO0("Checking whether my predecessor is alive"); + DEBUG0("Checking whether my predecessor is alive"); // TODO }