XBT_LOG_NEW_DEFAULT_CATEGORY(msg_chord,
"Messages specific for this msg example");
-#define NB_BITS 16
-#define NB_KEYS 65536
#define COMM_SIZE 10
#define COMP_SIZE 0
-#define TIMEOUT 50
+
+static int nb_bits = 16;
+static int nb_keys = 0;
+static int timeout = 50;
+static int max_simulation_time = 1000;
+static int periodic_stabilize_delay = 20;
+static int periodic_fix_fingers_delay = 120;
+static int periodic_check_predecessor_delay = 120;
/**
* Finger element.
typedef struct node {
int id; // my id
char* mailbox; // my usual mailbox name
- s_finger_t fingers[NB_BITS]; // finger table (fingers[0] is my successor)
+ s_finger_t *fingers; // finger table, of size nb_bits (fingers[0] is my successor)
int pred_id; // predecessor id
char* pred_mailbox; // predecessor's mailbox name
int next_finger_to_fix; // index of the next finger to fix in fix_fingers()
msg_comm_t comm_receive; // current communication to receive
xbt_dynar_t comms; // current communications being sent
+ double last_change_date; // last time I changed a finger or my predecessor
} s_node_t, *node_t;
/**
const char* issuer_host_name; // used for logging
} s_task_data_t, *task_data_t;
-static int powers2[NB_BITS];
+static int *powers2;
// utility functions
static void chord_initialize(void);
static void chord_initialize(void)
{
// compute the powers of 2 once for all
+ powers2 = xbt_new(int, nb_bits);
int pow = 1;
int i;
- for (i = 0; i < NB_BITS; i++) {
+ for (i = 0; i < nb_bits; i++) {
powers2[i] = pow;
pow = pow << 1;
}
+ nb_keys = pow;
+ DEBUG1("Sets nb_keys to %d", nb_keys);
}
/**
- * \brief Turns an id into an equivalent id in [0, NB_KEYS).
+ * \brief Turns an id into an equivalent id in [0, nb_keys).
* \param id an id
* \return the corresponding normalized id
*/
{
// make sure id >= 0
while (id < 0) {
- id += NB_KEYS;
+ id += nb_keys;
}
- // make sure id < NB_KEYS
- id = id % NB_KEYS;
+ // make sure id < nb_keys
+ id = id % nb_keys;
return id;
}
/**
* \brief Returns whether a id belongs to the interval [start, end].
*
- * The parameters are noramlized to make sure they are between 0 and CHORD_NB_KEYS - 1).
+ * The parameters are noramlized to make sure they are between 0 and nb_keys - 1).
* 1 belongs to [62, 3]
* 1 does not belong to [3, 62]
* 63 belongs to [62, 3]
// make sure end >= start and id >= start
if (end < start) {
- end += NB_KEYS;
+ end += nb_keys;
}
if (id < start) {
- id += NB_KEYS;
+ id += nb_keys;
}
return id <= end;
int pow = 1;
VERB0("My finger table:");
VERB0("Start | Succ ");
- for (i = 0; i < NB_BITS; i++) {
- VERB2(" %3d | %3d ", (node->id + pow) % NB_KEYS, node->fingers[i].id);
+ for (i = 0; i < nb_bits; i++) {
+ VERB2(" %3d | %3d ", (node->id + pow) % nb_keys, node->fingers[i].id);
pow = pow << 1;
}
VERB1("Predecessor: %d", node->pred_id);
/**
* \brief Sets a finger of the current node.
* \param node the current node
- * \param finger_index index of the finger to set (0 to NB_BITS - 1)
+ * \param finger_index index of the finger to set (0 to nb_bits - 1)
* \param id the id to set for this finger
*/
static void set_finger(node_t node, int finger_index, int id)
{
- node->fingers[finger_index].id = id;
- xbt_free(node->fingers[finger_index].mailbox);
- node->fingers[finger_index].mailbox = get_mailbox(id);
- DEBUG2("My new finger #%d is %d", finger_index, id);
+ if (id != node->fingers[finger_index].id) {
+ node->fingers[finger_index].id = id;
+ xbt_free(node->fingers[finger_index].mailbox);
+ node->fingers[finger_index].mailbox = get_mailbox(id);
+ node->last_change_date = MSG_get_clock();
+ DEBUG2("My new finger #%d is %d", finger_index, id);
+ }
}
/**
*/
static void set_predecessor(node_t node, int predecessor_id)
{
- node->pred_id = predecessor_id;
- xbt_free(node->pred_mailbox);
+ if (predecessor_id != node->pred_id) {
+ node->pred_id = predecessor_id;
+ xbt_free(node->pred_mailbox);
- if (predecessor_id != -1) {
- node->pred_mailbox = get_mailbox(predecessor_id);
- }
+ if (predecessor_id != -1) {
+ node->pred_mailbox = get_mailbox(predecessor_id);
+ }
+ node->last_change_date = MSG_get_clock();
- DEBUG1("My new predecessor is %d", predecessor_id);
+ DEBUG1("My new predecessor is %d", predecessor_id);
+ }
}
/**
node.mailbox = get_mailbox(node.id);
node.next_finger_to_fix = 0;
node.comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
+ node.fingers = xbt_new0(s_finger_t, nb_bits);
+ node.last_change_date = MSG_get_clock();
- for (i = 0; i < NB_BITS; i++) {
+ for (i = 0; i < nb_bits; i++) {
set_finger(&node, i, node.id);
}
}
else {
int known_id = atoi(argv[2]);
- double sleep_time = atof(argv[3]);
+ //double sleep_time = atof(argv[3]);
deadline = atof(argv[4]);
+ /*
// sleep before starting
DEBUG1("Let's sleep during %f", sleep_time);
MSG_process_sleep(sleep_time);
+ */
DEBUG0("Hey! Let's join the system.");
join_success = join(&node, known_id);
}
if (join_success) {
- while (MSG_get_clock() < init_time + deadline) {
+ while (MSG_get_clock() < init_time + deadline
+// && MSG_get_clock() < node.last_change_date + 1000
+ && MSG_get_clock() < max_simulation_time) {
if (node.comm_receive == NULL) {
task_received = NULL;
// no task was received: make some periodic calls
if (MSG_get_clock() >= next_stabilize_date) {
stabilize(&node);
- next_stabilize_date = MSG_get_clock() + 10;
+ next_stabilize_date = MSG_get_clock() + periodic_stabilize_delay;
}
else if (MSG_get_clock() >= next_fix_fingers_date) {
fix_fingers(&node);
- next_fix_fingers_date = MSG_get_clock() + 10;
+ next_fix_fingers_date = MSG_get_clock() + periodic_fix_fingers_delay;
}
else if (MSG_get_clock() >= next_check_predecessor_date) {
check_predecessor(&node);
- next_check_predecessor_date = MSG_get_clock() + 10;
+ next_check_predecessor_date = MSG_get_clock() + periodic_check_predecessor_delay;
}
else {
// nothing to do: sleep for a while
xbt_dynar_free(&node.comms);
xbt_free(node.mailbox);
xbt_free(node.pred_mailbox);
- for (i = 0; i < NB_BITS - 1; i++) {
+ for (i = 0; i < nb_bits - 1; i++) {
xbt_free(node.fingers[i].mailbox);
}
+ xbt_free(node.fingers);
return 0;
}
// send a "Find Successor" request to ask_to_id
m_task_t task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
DEBUG3("Sending a 'Find Successor' request (task %p) to %d for id %d", task_sent, ask_to, id);
- MSG_error_t res = MSG_task_send_with_timeout(task_sent, mailbox, TIMEOUT);
+ MSG_error_t res = MSG_task_send_with_timeout(task_sent, mailbox, timeout);
if (res != MSG_OK) {
DEBUG3("Failed to send the 'Find Successor' request (task %p) to %d for id %d",
node->comm_receive = MSG_task_irecv(&task_received, node->mailbox);
}
- res = MSG_comm_wait(node->comm_receive, TIMEOUT);
+ res = MSG_comm_wait(node->comm_receive, timeout);
if (res != MSG_OK) {
DEBUG2("Failed to receive the answer to my 'Find Successor' request (task %p): %d",
// send a "Get Predecessor" request to ask_to_id
DEBUG1("Sending a 'Get Predecessor' request to %d", ask_to);
m_task_t task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
- MSG_error_t res = MSG_task_send_with_timeout(task_sent, mailbox, TIMEOUT);
+ MSG_error_t res = MSG_task_send_with_timeout(task_sent, mailbox, timeout);
if (res != MSG_OK) {
DEBUG2("Failed to send the 'Get Predecessor' request (task %p) to %d",
node->comm_receive = MSG_task_irecv(&task_received, node->mailbox);
}
- res = MSG_comm_wait(node->comm_receive, TIMEOUT);
+ res = MSG_comm_wait(node->comm_receive, timeout);
if (res != MSG_OK) {
DEBUG2("Failed to receive the answer to my 'Get Predecessor' request (task %p): %d",
int closest_preceding_node(node_t node, int id)
{
int i;
- for (i = NB_BITS - 1; i >= 0; i--) {
+ for (i = nb_bits - 1; i >= 0; i--) {
if (is_in_interval(node->fingers[i].id, node->id + 1, id - 1)) {
return node->fingers[i].id;
}
set_finger(node, i, id);
print_finger_table(node);
}
- node->next_finger_to_fix = (i + 1) % NB_BITS;
+ node->next_finger_to_fix = (i + 1) % nb_bits;
}
}
int main(int argc, char *argv[])
{
if (argc < 3) {
- printf("Usage: %s platform_file deployment_file\n", argv[0]);
+ printf("Usage: %s [-nb_bits=n] [-timeout=t] platform_file deployment_file\n", argv[0]);
printf("example: %s ../msg_platform.xml chord.xml\n", argv[0]);
exit(1);
}
- const char* platform_file = argv[1];
- const char* application_file = argv[2];
+ MSG_global_init(&argc, argv);
+
+ char **options = &argv[1];
+ while (!strncmp(options[0], "-", 1)) {
+
+ int length = strlen("-nb_bits=");
+ if (!strncmp(options[0], "-nb_bits=", length) && strlen(options[0]) > length) {
+ nb_bits = atoi(options[0] + length);
+ DEBUG1("Set nb_bits to %d", nb_bits);
+ }
+ else {
+
+ length = strlen("-timeout=");
+ if (!strncmp(options[0], "-timeout=", length) && strlen(options[0]) > length) {
+ timeout = atoi(options[0] + length);
+ DEBUG1("Set timeout to %d", timeout);
+ }
+ else {
+ xbt_assert1(0, "Invalid chord option '%s'", options[0]);
+ }
+ }
+ options++;
+ }
+
+ const char* platform_file = options[0];
+ const char* application_file = options[1];
chord_initialize();
- MSG_global_init(&argc, argv);
MSG_set_channel_number(0);
MSG_create_environment(platform_file);