// Chord core
static void create(node_t node);
-static void join(node_t node, int known_id);
+static int join(node_t node, int known_id);
static void leave(node_t node);
static int find_successor(node_t node, int id);
static int remote_find_successor(node_t node, int ask_to_id, int id);
node->fingers[finger_index].id = id;
xbt_free(node->fingers[finger_index].mailbox);
node->fingers[finger_index].mailbox = get_mailbox(id);
- INFO2("My new finger #%d is %d", finger_index, id);
+ DEBUG2("My new finger #%d is %d", finger_index, id);
}
/**
node->pred_mailbox = get_mailbox(predecessor_id);
}
- INFO1("My new predecessor is %d", predecessor_id);
+ DEBUG1("My new predecessor is %d", predecessor_id);
}
/**
{
double init_time = MSG_get_clock();
m_task_t task = NULL;
+ m_task_t task_received = NULL;
msg_comm_t comm_send = NULL;
int i;
int index;
+ int join_success = 0;
double deadline;
double next_stabilize_date = init_time + 10;
double next_fix_fingers_date = init_time + 10;
if (argc == 3) { // first ring
deadline = atof(argv[2]);
create(&node);
+ join_success = 1;
}
else {
int known_id = atoi(argv[2]);
deadline = atof(argv[4]);
// sleep before starting
- INFO1("Let's sleep during %f", sleep_time);
+ DEBUG1("Let's sleep during %f", sleep_time);
MSG_process_sleep(sleep_time);
- INFO0("Hey! Let's join the system.");
+ DEBUG0("Hey! Let's join the system.");
- join(&node, known_id);
+ join_success = join(&node, known_id);
}
- while (MSG_get_clock() < init_time + deadline) {
+ if (join_success) {
+ while (MSG_get_clock() < init_time + deadline) {
- if (node.comm_receive == NULL) {
- task = NULL;
- node.comm_receive = MSG_task_irecv(&task, node.mailbox);
- // FIXME: do not make MSG_task_irecv() calls from several functions
- }
+ if (node.comm_receive == NULL) {
+ task_received = NULL;
+ node.comm_receive = MSG_task_irecv(&task_received, node.mailbox);
+ // FIXME: do not make MSG_task_irecv() calls from several functions
+ }
- if (!MSG_comm_test(node.comm_receive)) {
+ if (!MSG_comm_test(node.comm_receive)) {
- // no task was received: make some periodic calls
- if (MSG_get_clock() >= next_stabilize_date) {
- stabilize(&node);
- next_stabilize_date = MSG_get_clock() + 10;
- }
- else if (MSG_get_clock() >= next_fix_fingers_date) {
- fix_fingers(&node);
- next_fix_fingers_date = MSG_get_clock() + 10;
- }
- else if (MSG_get_clock() >= next_check_predecessor_date) {
- check_predecessor(&node);
- next_check_predecessor_date = MSG_get_clock() + 10;
+ // no task was received: make some periodic calls
+ if (MSG_get_clock() >= next_stabilize_date) {
+ stabilize(&node);
+ next_stabilize_date = MSG_get_clock() + 10;
+ }
+ else if (MSG_get_clock() >= next_fix_fingers_date) {
+ fix_fingers(&node);
+ next_fix_fingers_date = MSG_get_clock() + 10;
+ }
+ else if (MSG_get_clock() >= next_check_predecessor_date) {
+ check_predecessor(&node);
+ next_check_predecessor_date = MSG_get_clock() + 10;
+ }
+ else {
+ // nothing to do: sleep for a while
+ MSG_process_sleep(5);
+ }
}
else {
- // nothing to do: sleep for a while
- MSG_process_sleep(5);
- }
- }
- else {
- // a transfer has occured
+ // a transfer has occured
- MSG_error_t status = MSG_comm_get_status(node.comm_receive);
- MSG_comm_destroy(node.comm_receive);
- node.comm_receive = NULL;
+ MSG_error_t status = MSG_comm_get_status(node.comm_receive);
+ MSG_comm_destroy(node.comm_receive);
+ node.comm_receive = NULL;
- if (status != MSG_OK) {
- INFO0("Failed to receive a task. Nevermind.");
+ if (status != MSG_OK) {
+ DEBUG0("Failed to receive a task. Nevermind.");
+ }
+ else {
+ // the task was successfully received
+ handle_task(&node, task_received);
+ }
}
- else {
- // the task was successfully received
- handle_task(&node, task);
+
+ // see if some communications are finished
+ while ((index = MSG_comm_testany(node.comms)) != -1) {
+ comm_send = xbt_dynar_get_as(node.comms, index, msg_comm_t);
+ MSG_error_t status = MSG_comm_get_status(comm_send);
+ xbt_dynar_remove_at(node.comms, index, &comm_send);
+ DEBUG3("Communication %p is finished with status %d, dynar size is now %lu",
+ comm_send, status, xbt_dynar_length(node.comms));
+ MSG_comm_destroy(comm_send);
}
}
- // see if some communications are finished
- while ((index = MSG_comm_testany(node.comms)) != -1) {
- comm_send = xbt_dynar_get_as(node.comms, index, msg_comm_t);
- MSG_error_t status = MSG_comm_get_status(comm_send);
- xbt_dynar_remove_at(node.comms, index, &comm_send);
- DEBUG3("Communication %p is finished with status %d, dynar size is now %lu",
- comm_send, status, xbt_dynar_length(node.comms));
+ // clean unfinished comms sent
+ unsigned int cursor;
+ xbt_dynar_foreach(node.comms, cursor, comm_send) {
+ task = MSG_comm_get_task(comm_send);
+ MSG_task_cancel(task);
+ xbt_free(MSG_task_get_data(task));
+ MSG_task_destroy(task);
MSG_comm_destroy(comm_send);
+ // FIXME: the task is actually not destroyed because MSG thinks that the other side (whose process is dead) is still using it
}
- }
- // clean unfinished comms sent
- unsigned int cursor;
- xbt_dynar_foreach(node.comms, cursor, comm_send) {
- task = MSG_comm_get_task(comm_send);
- MSG_task_cancel(task);
- xbt_free(MSG_task_get_data(task));
- MSG_task_destroy(task);
- MSG_comm_destroy(comm_send);
- // FIXME: the task is actually not destroyed because MSG thinks that the other side (whose process is dead) is still using it
+ // leave the ring
+ leave(&node);
}
- // leave the ring and stop the simulation
- leave(&node);
+ // stop the simulation
xbt_dynar_free(&node.comms);
xbt_free(node.mailbox);
xbt_free(node.pred_mailbox);
*/
static void handle_task(node_t node, m_task_t task) {
+ DEBUG1("Handling task %p", task);
msg_comm_t comm = NULL;
char* mailbox = NULL;
task_data_t task_data = (task_data_t) MSG_task_get_data(task);
switch (type) {
case TASK_FIND_SUCCESSOR:
- INFO2("Receiving a 'Find Successor' request from %s for id %d",
+ DEBUG2("Receiving a 'Find Successor' request from %s for id %d",
task_data->issuer_host_name, task_data->request_id);
// is my successor the successor?
if (is_in_interval(task_data->request_id, node->id + 1, node->fingers[0].id)) {
task_data->answer_id = node->fingers[0].id;
comm = MSG_task_isend(task, task_data->answer_to);
xbt_dynar_push(node->comms, &comm);
- INFO3("Sending back a 'Find Successor Answer' to %s: the successor of %d is %d",
+ DEBUG3("Sending back a 'Find Successor Answer' to %s: the successor of %d is %d",
task_data->issuer_host_name,
task_data->request_id, task_data->answer_id);
}
else {
// otherwise, forward the request to the closest preceding finger in my table
int closest = closest_preceding_node(node, task_data->request_id);
- INFO2("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d",
+ DEBUG2("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d",
task_data->request_id, closest);
mailbox = get_mailbox(closest);
comm = MSG_task_isend(task, mailbox);
break;
case TASK_GET_PREDECESSOR:
- INFO1("Receiving a 'Get Predecessor' request from %s", task_data->issuer_host_name);
+ DEBUG1("Receiving a 'Get Predecessor' request from %s", task_data->issuer_host_name);
task_data->type = TASK_GET_PREDECESSOR_ANSWER;
task_data->answer_id = node->pred_id;
comm = MSG_task_isend(task, task_data->answer_to);
xbt_dynar_push(node->comms, &comm);
- INFO3("Sending back a 'Get Predecessor Answer' to %s via mailbox '%s': my predecessor is %d",
+ DEBUG3("Sending back a 'Get Predecessor Answer' to %s via mailbox '%s': my predecessor is %d",
task_data->issuer_host_name,
task_data->answer_to, task_data->answer_id);
break;
case TASK_NOTIFY:
// someone is telling me that he may be my new predecessor
- INFO1("Receiving a 'Notify' request from %s", task_data->issuer_host_name);
+ DEBUG1("Receiving a 'Notify' request from %s", task_data->issuer_host_name);
notify(node, task_data->request_id);
xbt_free(task_data);
MSG_task_destroy(task);
case TASK_PREDECESSOR_LEAVING:
// my predecessor is about to quit
- INFO1("Receiving a 'Predecessor Leaving' message from %s", task_data->issuer_host_name);
+ DEBUG1("Receiving a 'Predecessor Leaving' message from %s", task_data->issuer_host_name);
// modify my predecessor
set_predecessor(node, task_data->request_id);
xbt_free(task_data);
case TASK_SUCCESSOR_LEAVING:
// my successor is about to quit
- INFO1("Receiving a 'Successor Leaving' message from %s", task_data->issuer_host_name);
+ DEBUG1("Receiving a 'Successor Leaving' message from %s", task_data->issuer_host_name);
// modify my successor FIXME : this should be implicit ?
set_finger(node, 0, task_data->request_id);
xbt_free(task_data);
default:
CRITICAL1("Received an unexpected task: %d", type);
- xbt_abort();
+ //xbt_abort();
}
}
*/
static void create(node_t node)
{
- INFO0("Create a new Chord ring...");
+ DEBUG0("Create a new Chord ring...");
set_predecessor(node, -1); // -1 means that I have no predecessor
print_finger_table(node);
}
* already in the ring
* \param node the current node
* \param known_id id of a node already in the ring
+ * \return 1 if the join operation succeeded, 0 otherwise
*/
-static void join(node_t node, int known_id)
+static int join(node_t node, int known_id)
{
INFO2("Joining the ring with id %d, knowing node %d", node->id, known_id);
set_predecessor(node, -1); // no predecessor (yet)
- int successor_id;
- do {
- successor_id = remote_find_successor(node, known_id, node->id);
- if (successor_id == -1) {
- INFO0("I really want to join the ring. Let's try again.");
- }
- } while (successor_id == -1);
+ int successor_id = remote_find_successor(node, known_id, node->id);
+ if (successor_id == -1) {
+ DEBUG0("Cannot join the ring.");
+ }
+ else {
+ set_finger(node, 0, successor_id);
+ print_finger_table(node);
+ }
- set_finger(node, 0, successor_id);
- print_finger_table(node);
+ return successor_id != -1;
}
/**
*/
static void leave(node_t node)
{
- INFO0("Well Guys! I Think it's time for me to quit ;)");
+ DEBUG0("Well Guys! I Think it's time for me to quit ;)");
quit_notify(node, 1); // notify to my successor ( >>> 1 );
quit_notify(node, -1); // notify my predecessor ( >>> -1);
// TODO ...
req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
// send a "Find Successor" request to ask_to_id
- INFO2("Sending a 'Find Successor' request to %d for id %d", ask_to, id);
m_task_t task = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
+ DEBUG3("Sending a 'Find Successor' request (task %p) to %d for id %d", task, ask_to, id);
MSG_error_t res = MSG_task_send_with_timeout(task, mailbox, 50);
if (res != MSG_OK) {
- INFO2("Failed to send the 'Find Successor' request to %d for id %d", ask_to, id);
+ DEBUG2("Failed to send the 'Find Successor' request to %d for id %d", ask_to, id);
MSG_task_destroy(task);
xbt_free(req_data);
}
else {
// receive the answer
- DEBUG2("Sent a 'Find Successor' request to %d for key %d, waiting for the answer", ask_to, id);
+ DEBUG3("Sent a 'Find Successor' request (task %p) to %d for key %d, waiting for the answer",
+ task, ask_to, id);
do {
if (node->comm_receive == NULL) {
res = MSG_comm_wait(node->comm_receive, 50);
if (res != MSG_OK) {
- INFO1("Failed to receive the answer to my 'Find Successor' request: %d", res);
+ DEBUG2("Failed to receive the answer to my 'Find Successor' request (task %p): %d", task, res);
stop = 1;
// MSG_comm_destroy(node->comm_receive);
}
handle_task(node, task);
}
else {
- INFO2("Received the answer to my 'Find Successor' request: the successor of key %d is %d", id, successor);
+ DEBUG3("Received the answer to my 'Find Successor' request (task %p): the successor of key %d is %d",
+ task, id, successor);
successor = ans_data->answer_id;
stop = 1;
MSG_task_destroy(task);
req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
// send a "Get Predecessor" request to ask_to_id
- INFO1("Sending a 'Get Predecessor' request to %d", ask_to);
+ DEBUG1("Sending a 'Get Predecessor' request to %d", ask_to);
m_task_t task = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
MSG_error_t res = MSG_task_send_with_timeout(task, mailbox, 50);
if (res != MSG_OK) {
- INFO1("Failed to send the 'Get Predecessor' request to %d", ask_to);
+ DEBUG1("Failed to send the 'Get Predecessor' request to %d", ask_to);
MSG_task_destroy(task);
xbt_free(req_data);
}
res = MSG_comm_wait(node->comm_receive, 50);
if (res != MSG_OK) {
- INFO1("Failed to receive the answer to my 'Get Predecessor' request: %d", res);
+ DEBUG1("Failed to receive the answer to my 'Get Predecessor' request: %d", res);
stop = 1;
// MSG_comm_destroy(node->comm_receive);
}
handle_task(node, task);
}
else {
- INFO2("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to, predecessor_id);
+ DEBUG2("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to, predecessor_id);
predecessor_id = ans_data->answer_id;
stop = 1;
MSG_task_destroy(task);
*/
static void stabilize(node_t node)
{
- INFO0("Stabilizing node");
+ DEBUG0("Stabilizing node");
// get the predecessor of my immediate successor
int candidate_id;
print_finger_table(node);
}
else {
- INFO1("I don't have to change my predecessor to %d", predecessor_candidate_id);
+ DEBUG1("I don't have to change my predecessor to %d", predecessor_candidate_id);
}
}
req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
// send a "Notify" request to notify_id
- INFO1("Sending a 'Notify' request to %d", notify_id);
+ DEBUG1("Sending a 'Notify' request to %d", notify_id);
m_task_t task = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
char* mailbox = get_mailbox(notify_id);
msg_comm_t comm = MSG_task_isend(task, mailbox);
*/
static void fix_fingers(node_t node) {
- INFO0("Fixing fingers");
+ DEBUG0("Fixing fingers");
int i = node->next_finger_to_fix;
int id = find_successor(node, node->id + powers2[i]);
if (id != -1) {
*/
static void check_predecessor(node_t node)
{
- INFO0("Checking whether my predecessor is alive");
+ DEBUG0("Checking whether my predecessor is alive");
// TODO
}