m_task_t task = NULL;
msg_comm_t comm_send = NULL;
int i;
+ int index;
double deadline;
double next_stabilize_date = init_time + 10;
double next_fix_fingers_date = init_time + 10;
while (MSG_get_clock() < init_time + deadline) {
- // see if some communications are finished
- int index;
- while ((index = MSG_comm_testany(node.comms)) != -1) {
- comm_send = xbt_dynar_get_as(node.comms, index, msg_comm_t);
- xbt_dynar_remove_at(node.comms, index, &comm_send);
- DEBUG3("Communication %p is finished with status %d, dynar size is now %lu",
- comm_send, MSG_comm_get_status(comm_send), xbt_dynar_length(node.comms));
- MSG_comm_destroy(comm_send);
- }
-
if (node.comm_receive == NULL) {
task = NULL;
node.comm_receive = MSG_task_irecv(&task, node.mailbox);
}
else {
// nothing to do: sleep for a while
- MSG_process_sleep(1);
+ MSG_process_sleep(5);
}
}
else {
handle_task(&node, task);
}
}
+
+ // see if some communications are finished
+ while ((index = MSG_comm_testany(node.comms)) != -1) {
+ comm_send = xbt_dynar_get_as(node.comms, index, msg_comm_t);
+ xbt_dynar_remove_at(node.comms, index, &comm_send);
+ DEBUG3("Communication %p is finished with status %d, dynar size is now %lu",
+ comm_send, MSG_comm_get_status(comm_send), xbt_dynar_length(node.comms));
+ MSG_comm_destroy(comm_send);
+ }
+ }
+
+ // clean unfinished comms sent
+ unsigned int cursor;
+ xbt_dynar_foreach(node.comms, cursor, comm_send) {
+ task = MSG_comm_get_task(comm_send);
+ MSG_task_cancel(task);
+ xbt_free(MSG_task_get_data(task));
+ MSG_task_destroy(task);
+ MSG_comm_destroy(comm_send);
}
// leave the ring and stop the simulation
/**
* \brief This function is called when the current node receives a task.
* \param node the current node
- * \param task the task to handle
+ * \param task the task to handle (don't touch it then:
+ * it will be destroyed, reused or forwarded)
*/
static void handle_task(node_t node, m_task_t task) {
// someone is telling me that he may be my new predecessor
INFO1("Receiving a 'Notify' request from %s", task_data->issuer_host_name);
notify(node, task_data->request_id);
+ xbt_free(task_data);
+ MSG_task_destroy(task);
}
else if (!strcmp(task_name, "Predecessor Leaving")) {
INFO1("Receiving a 'Predecessor Leaving' message from %s", task_data->issuer_host_name);
// modify my predecessor
set_predecessor(node, task_data->pred_id);
+ xbt_free(task_data);
+ MSG_task_destroy(task);
/*TODO :
>> notify my new predecessor
>> send a notify_predecessors !!
INFO1("Receiving a 'Successor Leaving' message from %s", task_data->issuer_host_name);
// modify my successor FIXME : this should be implicit ?
set_finger(node, 0, task_data->successor_id);
+ xbt_free(task_data);
+ MSG_task_destroy(task);
/* TODO
>> notify my new successor
>> update my table & predecessors table */
*/
static void quit_notify(node_t node, int to)
{
+ /* TODO
task_data_t req_data = xbt_new0(s_task_data_t, 1);
req_data->request_id = node->id;
req_data->successor_id = node->fingers[0].id;
req_data->pred_id = node->pred_id;
req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
- const char *task_name = NULL;
+ const char* task_name = NULL;
const char* to_mailbox = NULL;
- if (to == 1) // notify my successor
- {
+ if (to == 1) { // notify my successor
to_mailbox = node->fingers[0].mailbox;
INFO2("Telling my Successor %d about my departure via mailbox %s",
node->fingers[0].id, to_mailbox);
task_name = "Predecessor Leaving";
-
}
- else if (to == -1) // notify my predecessor
- {
+ else if (to == -1) { // notify my predecessor
+
if (node->pred_id == -1) {
return;
}
to_mailbox = node->pred_mailbox;
INFO2("Telling my Predecessor %d about my departure via mailbox %s",
node->pred_id, to_mailbox);
- task_name = "Predecessor Leaving";
-
+ task_name = "Successor Leaving";
}
m_task_t task = MSG_task_create(task_name, COMP_SIZE, COMM_SIZE, req_data);
//char* mailbox = get_mailbox(to_mailbox);
msg_comm_t comm = MSG_task_isend(task, to_mailbox);
xbt_dynar_push(node->comms, &comm);
+ */
}
/**
static int remote_find_successor(node_t node, int ask_to, int id)
{
int successor = -1;
- int received_answer = 0;
+ int stop = 0;
+ char* mailbox = get_mailbox(ask_to);
task_data_t req_data = xbt_new0(s_task_data_t, 1);
req_data->request_id = id;
req_data->answer_to = node->mailbox;
req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
// send a "Find Successor" request to ask_to_id
- INFO2("Sending a 'Find Successor' request to %d for key %d", ask_to, id);
+ INFO2("Sending a 'Find Successor' request to %d for id %d", ask_to, id);
m_task_t task = MSG_task_create("Find Successor", COMP_SIZE, COMM_SIZE, req_data);
- MSG_error_t res = MSG_task_send_with_timeout(task, get_mailbox(ask_to), 200);
+ MSG_error_t res = MSG_task_send_with_timeout(task, mailbox, 50);
- if (res == MSG_OK) {
+ if (res != MSG_OK) {
+ INFO2("Failed to send the 'Find Successor' request to %d for id %d", ask_to, id);
+ MSG_task_destroy(task);
+ xbt_free(req_data);
+ }
+ else {
// receive the answer
INFO2("Sent a 'Find Successor' request to %d for key %d, waiting for the answer", ask_to, id);
res = MSG_comm_wait(node->comm_receive, 50);
if (res != MSG_OK) {
- INFO1("Failed to receive the answer: %d", res);
+ INFO1("Failed to receive the answer to my 'Find Successor' request: %d", res);
+ stop = 1;
+// MSG_comm_destroy(node->comm_receive);
}
else {
task = MSG_comm_get_task(node->comm_receive);
handle_task(node, task);
}
else {
+ INFO2("This is the answer to my 'Find Successor' request: the successor of key %d is %d", id, successor);
task_data_t ans_data = MSG_task_get_data(task);
successor = ans_data->answer_id;
- received_answer = 1;
- INFO2("This is the answer to my 'Find Successor' request: the successor of key %d is %d", id, successor);
+ stop = 1;
+ MSG_task_destroy(task);
+ xbt_free(req_data);
}
- MSG_comm_destroy(node->comm_receive);
- node->comm_receive = NULL;
-// MSG_task_destroy(task);
}
- } while (!received_answer);
+ node->comm_receive = NULL;
+ } while (!stop);
}
+ xbt_free(mailbox);
return successor;
}
static int remote_get_predecessor(node_t node, int ask_to)
{
int predecessor_id = -1;
- int received_answer = 0;
+ int stop = 0;
+ char* mailbox = get_mailbox(ask_to);
task_data_t req_data = xbt_new0(s_task_data_t, 1);
req_data->answer_to = node->mailbox;
req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
// send a "Get Predecessor" request to ask_to_id
INFO1("Sending a 'Get Predecessor' request to %d", ask_to);
m_task_t task = MSG_task_create("Get Predecessor", COMP_SIZE, COMM_SIZE, req_data);
- MSG_error_t res = MSG_task_send_with_timeout(task, get_mailbox(ask_to), 200);
+ MSG_error_t res = MSG_task_send_with_timeout(task, mailbox, 50);
- if (res == MSG_OK) {
+ if (res != MSG_OK) {
+ INFO1("Failed to send the 'Get Predecessor' request to %d", ask_to);
+ MSG_task_destroy(task);
+ xbt_free(req_data);
+ }
+ else {
// receive the answer
INFO2("Sent 'Get Predecessor' request to %d, waiting for the answer on my mailbox '%s'", ask_to, req_data->answer_to);
res = MSG_comm_wait(node->comm_receive, 50);
if (res != MSG_OK) {
- INFO1("Failed to receive the answer: %d", res);
+ INFO1("Failed to receive the answer to my 'Get Predecessor' request: %d", res);
+ stop = 1;
+// MSG_comm_destroy(node->comm_receive);
}
else {
task = MSG_comm_get_task(node->comm_receive);
handle_task(node, task);
}
else {
+ INFO2("This is the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to, predecessor_id);
task_data_t ans_data = MSG_task_get_data(task);
predecessor_id = ans_data->answer_id;
- received_answer = 1;
- INFO2("This is the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to, predecessor_id);
+ stop = 1;
+ MSG_task_destroy(task);
+ xbt_free(req_data);
}
- MSG_comm_destroy(node->comm_receive);
- node->comm_receive = NULL;
-// MSG_task_destroy(task);
}
- } while (!received_answer);
+ node->comm_receive = NULL;
+ } while (!stop);
}
+ xbt_free(mailbox);
return predecessor_id;
}
<argument value="48"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="400"/> <!-- time to sleep before it starts-->
- <argument value ="5000"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
<process host="McGee" function="node">
<argument value="42"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="300"/> <!-- time to sleep before it starts-->
- <argument value ="5000"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
<process host="iRMX" function="node">
<argument value="38"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="200"/> <!-- time to sleep before it starts-->
- <argument value ="5000"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
<process host="Geoff" function="node">
<argument value="32"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="100"/> <!-- time to sleep before it starts-->
- <argument value ="5000"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
<process host="TeX" function="node">
<argument value="21"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="40"/> <!-- time to sleep before it starts-->
- <argument value ="5000"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
<process host="Jean_Yves" function="node">
<argument value="14"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="16"/> <!-- time to sleep before it starts-->
- <argument value ="5000"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
<process host="Boivin" function="node">
<argument value="8"/> <!-- my id -->
<argument value="1"/> <!-- known id -->
<argument value="1"/> <!-- time to sleep before it starts-->
- <argument value ="5000"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
<process host="Jacquelin" function="node">
<argument value="1"/> <!-- my id -->
- <argument value ="5000"/> <!-- deadline -->
+ <argument value ="600"/> <!-- deadline -->
</process>
</platform>