int node(int argc, char *argv[])
{
double init_time = MSG_get_clock();
- msg_comm_t comm = NULL;
+ m_task_t task = NULL;
+ msg_comm_t comm_send = NULL;
+ msg_comm_t comm_receive = NULL;
int i;
char* mailbox = NULL;
double deadline;
while (MSG_get_clock() < init_time + deadline) {
- unsigned int cursor;
- comm = NULL;
- xbt_dynar_foreach(node.comms, cursor, comm) {
- if (MSG_comm_test(comm)) { // FIXME: try with MSG_comm_testany instead
- xbt_dynar_cursor_rm(node.comms, &cursor);
- MSG_comm_destroy(comm);
- }
+ // see if some communications are finished
+ int index;
+ while ((index = MSG_comm_testany(node.comms)) != -1) {
+ comm_send = xbt_dynar_get_as(node.comms, index, msg_comm_t);
+ xbt_dynar_remove_at(node.comms, index, &comm_send);
+ MSG_comm_destroy(comm_send);
}
- // periodic calls
- if (MSG_get_clock() >= next_stabilize_date) {
- stabilize(&node);
- next_stabilize_date = MSG_get_clock() + 10;
- }
- if (MSG_get_clock() >= next_fix_fingers_date) {
- fix_fingers(&node);
- next_fix_fingers_date = MSG_get_clock() + 10;
- }
- if (MSG_get_clock() >= next_check_predecessor_date) {
- check_predecessor(&node);
- next_check_predecessor_date = MSG_get_clock() + 10;
+ if (comm_receive == NULL) {
+ task = NULL;
+ comm_receive = MSG_task_irecv(&task, node.mailbox);
}
- m_task_t task = NULL;
- MSG_error_t res = MSG_task_receive_with_timeout(&task, node.mailbox, 45); // FIXME >> find the right timeout !!
+ if (!MSG_comm_test(comm_receive)) {
- if (res == MSG_OK) { // else check deadline condition and keep waiting for a task
+ // no task was received: make some periodic calls
+ if (MSG_get_clock() >= next_stabilize_date) {
+ stabilize(&node);
+ next_stabilize_date = MSG_get_clock() + 10;
+ }
+ else if (MSG_get_clock() >= next_fix_fingers_date) {
+ fix_fingers(&node);
+ next_fix_fingers_date = MSG_get_clock() + 10;
+ }
+ else if (MSG_get_clock() >= next_check_predecessor_date) {
+ check_predecessor(&node);
+ next_check_predecessor_date = MSG_get_clock() + 10;
+ }
+ else {
+ // nothing to do: sleep for a while
+ MSG_process_sleep(5);
+ }
+ }
+ else {
+ // a task was received
- // a task was received, get the data
+ MSG_comm_destroy(comm_receive);
+ comm_receive = NULL;
const char* task_name = MSG_task_get_name(task);
task_data_t task_data = (task_data_t) MSG_task_get_data(task);
MSG_task_set_name(task, "Find Successor Answer");
INFO3("Sending back a 'Find Successor Answer' to %s: the successor of %d is %d",
task_data->issuer_host_name, task_data->request_id, task_data->answer_id);
- comm = MSG_task_isend(task, task_data->answer_to);
- xbt_dynar_push(node.comms, &comm);
+ comm_send = MSG_task_isend(task, task_data->answer_to);
+ xbt_dynar_push(node.comms, &comm_send);
}
else {
// otherwise, forward the request to the closest preceding finger in my table
INFO2("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d",
task_data->request_id, closest);
mailbox = get_mailbox(closest);
- comm = MSG_task_isend(task, mailbox);
- xbt_dynar_push(node.comms, &comm);
+ comm_send = MSG_task_isend(task, mailbox);
+ xbt_dynar_push(node.comms, &comm_send);
xbt_free(mailbox);
}
}
MSG_task_set_name(task, "Get Predecessor Answer");
INFO2("Sending back a 'Get Predecessor Answer' to %s: my predecessor is %d",
task_data->issuer_host_name, task_data->answer_id);
- comm = MSG_task_isend(task, task_data->answer_to);
- xbt_dynar_push(node.comms, &comm);
+ comm_send = MSG_task_isend(task, task_data->answer_to);
+ xbt_dynar_push(node.comms, &comm_send);
}
/*
else if (!strcmp(task_name, "Find Predecessor")) {
{
INFO2("Joining the ring with id %d, knowing node %d", node->id, known_id);
set_predecessor(node, -1); // no predecessor (yet)
- int successor_id = remote_find_successor(node, known_id, node->id);
+
+ int successor_id;
+ do {
+ successor_id = remote_find_successor(node, known_id, node->id);
+ INFO0("My 'Find Successor' request has failed, let's try again");
+ } while (successor_id == -1);
+
set_finger(node, 0, successor_id);
print_finger_table(node);
}
* \brief Makes the current node find the successor node of an id.
* \param node the current node
* \param id the id to find
- * \return the id of the successor node
+ * \return the id of the successor node, or -1 if the request failed
*/
static int find_successor(node_t node, int id)
{
* \param node the current node
* \param ask_to the node to ask to
* \param id the id to find
- * \return the id of the successor node
+ * \return the id of the successor node, or -1 if the request failed
*/
static int remote_find_successor(node_t node, int ask_to, int id)
{
+ int successor = -1;
s_task_data_t req_data;
char* mailbox = bprintf("%s Find Successor", node->mailbox);
req_data.request_id = id;
// send a "Find Successor" request to ask_to_id
INFO2("Sending a 'Find Successor' request to %d for key %d", ask_to, id);
m_task_t task = MSG_task_create("Find Successor", 1000, 5000, &req_data);
- MSG_task_send(task, get_mailbox(ask_to));
+ MSG_error_t res = MSG_task_send_with_timeout(task, get_mailbox(ask_to), 20);
+ if (res == MSG_OK) {
- // receive the answer
- task = NULL;
- MSG_task_receive(&task, req_data.answer_to);
- task_data_t ans_data;
- ans_data = MSG_task_get_data(task);
- int successor = ans_data->answer_id;
- xbt_free(mailbox);
- INFO2("Received the answer to my 'Find Successor' request: the successor of key %d is %d", id, successor);
+ // receive the answer
+ task = NULL;
+ res = MSG_task_receive_with_timeout(&task, req_data.answer_to, 20);
+ if (res == MSG_OK) {
+
+ task_data_t ans_data;
+ ans_data = MSG_task_get_data(task);
+ successor = ans_data->answer_id;
+ INFO2("Received the answer to my 'Find Successor' request: the successor of key %d is %d", id, successor);
+ }
+ }
+
+ xbt_free(mailbox);
return successor;
}
* \brief Asks another node its predecessor.
* \param node the current node
* \param ask_to the node to ask to
- * \return the id of its predecessor node
+ * \return the id of its predecessor node, or -1 if the request failed
+ * (or if the node does not know its predecessor)
*/
static int remote_get_predecessor(node_t node, int ask_to)
{
+ int predecessor_id = -1;
s_task_data_t req_data;
char* mailbox = bprintf("%s Get Predecessor", node->mailbox);
req_data.answer_to = mailbox;
// send a "Get Predecessor" request to ask_to_id
INFO1("Sending a 'Get Predecessor' request to %d", ask_to);
m_task_t task = MSG_task_create("Get Predecessor", 1000, 5000, &req_data);
- MSG_task_send(task, get_mailbox(ask_to));
+ MSG_error_t res = MSG_task_send_with_timeout(task, get_mailbox(ask_to), 20);
- // receive the answer
- task = NULL;
- MSG_task_receive(&task, req_data.answer_to);
- task_data_t ans_data;
- ans_data = MSG_task_get_data(task);
- int predecessor_id = ans_data->answer_id;
- xbt_free(mailbox);
- INFO2("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to, predecessor_id);
+ if (res == MSG_OK) {
+
+ // receive the answer
+ task = NULL;
+ res = MSG_task_receive_with_timeout(&task, req_data.answer_to, 20);
+
+ if (res == MSG_OK) {
+
+ task_data_t ans_data;
+ ans_data = MSG_task_get_data(task);
+ predecessor_id = ans_data->answer_id;
+ INFO2("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to, predecessor_id);
+ }
+ }
+ xbt_free(mailbox);
return predecessor_id;
}
INFO0("Fixing fingers");
int i = node->next_finger_to_fix;
int id = find_successor(node, node->id + powers2[i]);
- if (id != -1 && id != node->fingers[i].id) {
- set_finger(node, i, id);
- print_finger_table(node);
+ if (id != -1) {
+
+ if (id != node->fingers[i].id) {
+ set_finger(node, i, id);
+ print_finger_table(node);
+ }
+ node->next_finger_to_fix = (i + 1) % NB_BITS;
}
- node->next_finger_to_fix = (i + 1) % NB_BITS;
}
/**