#include <math.h>
#include "simgrid/msg.h"
- XBT_LOG_NEW_DEFAULT_CATEGORY(msg_pastry, "Messages specific for this msg example");
+ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_pastry, "Messages specific for this msg example");
-/***************************************
- * PASTRY *
- * *
- * TODO: *
+/* TODO: *
* - handle node departure *
* - handle objects on the network *
- * - handle neighborood in the update *
- * *
- ***************************************/
+ * - handle neighborood in the update */
#define COMM_SIZE 10
#define COMP_SIZE 0
state_t state;
} s_task_data_t, *task_data_t;
-
-static void print_node(node_t node);
-static void print_node_id(node_t node);
-static void print_node_neighborood_set(node_t node);
-static void print_node_routing_table(node_t node);
-static void print_node_namespace_set(node_t node);
-static state_t node_get_state(node_t node);
static void get_mailbox(int node_id, char* mailbox);
static int domain(int a, int level);
static int shl(int a, int b);
static int closest_in_namespace_set(node_t node, int dest);
static int routing_next(node_t node, int dest);
-static void create(node_t node);
-static int join(node_t node);
/**
* \brief Gets the mailbox name of a host given its chord id.
snprintf(mailbox, MAILBOX_NAME_SIZE - 1, "%d", node_id);
}
-/**
- * Get the specific level of a node id
- */
+/** Get the specific level of a node id */
int domain_mask = 0;
static int domain(int a, int level) {
if (domain_mask == 0)
return (a >> shift) & domain_mask;
}
-/**
- * Get the shared domains between the two givens ids
- */
+/* Get the shared domains between the two givens ids */
static int shl(int a, int b) {
int l = 0;
while(l<LEVELS_COUNT && domain(a,l) == domain(b,l))
return l;
}
-/*
- * Get the closest id to the dest in the node namespace_set
- */
+/* Get the closest id to the dest in the node namespace_set */
static int closest_in_namespace_set(node_t node, int dest) {
int best_dist;
int res = -1;
int i, dist;
for (i=0; i<NAMESPACE_SIZE; i++) {
if (node->namespace_set[i]!=-1) {
- dist = abs(node->namespace_set[i] - dest);
- if (dist<best_dist) {
- best_dist = dist;
+ dist = abs(node->namespace_set[i] - dest);
+ if (dist<best_dist) {
+ best_dist = dist;
res = node->namespace_set[i];
- }
+ }
}
}
}
return res;
}
-/*
- * Find the next node to forward a meassage to
- */
+/* Find the next node to forward a message to */
static int routing_next(node_t node, int dest) {
int closest = closest_in_namespace_set(node, dest);
int res = -1;
return node->id;
}
-/*
- * Handle a given task
- */
+/* Get the corresponding state of a node */
+static state_t node_get_state(node_t node) {
+ int i,j;
+ state_t state = xbt_new0(s_state_t,1);
+ state->id = node->id;
+ for (i=0; i<NEIGHBORHOOD_SIZE; i++)
+ state->neighborhood_set[i] = node->neighborhood_set[i];
+
+ for (i=0; i<LEVELS_COUNT; i++)
+ for (j=0; j<LEVEL_SIZE; j++)
+ state->routing_table[i][j] = node->routing_table[i][j];
+
+ for (i=0; i<NAMESPACE_SIZE; i++)
+ state->namespace_set[i] = node->namespace_set[i];
+
+ return state;
+}
+
+/* Print the node id */
+static void print_node_id(node_t node) {
+ int i;
+ printf(" id: %i '%08x' ", node->id, node->id);
+ for (i=0;i<LEVELS_COUNT;i++)
+ printf(" %x", domain(node->id, i));
+ printf("\n");
+}
+
+/* * Print the node neighborhood set */
+static void print_node_neighborood_set(node_t node) {
+ int i;
+ printf(" Neighborhood:\n");
+ for (i=0; i<NEIGHBORHOOD_SIZE; i++)
+ printf(" %08x\n", node->neighborhood_set[i]);
+}
+
+/* Print the routing table */
+static void print_node_routing_table(node_t node) {
+ printf(" routing table:\n");
+ for (int i=0; i<LEVELS_COUNT; i++){
+ printf(" ");
+ for (int j=0; j<LEVEL_SIZE; j++)
+ printf("%08x ", node->routing_table[i][j]);
+ printf("\n");
+ }
+}
+
+/* Print the node namespace set */
+static void print_node_namespace_set(node_t node) {
+ printf(" namespace:\n");
+ for (int i=0; i<NAMESPACE_SIZE; i++)
+ printf(" %08x\n", node->namespace_set[i]);
+ printf("\n");
+}
+
+/* Print the node information */
+static void print_node(node_t node) {
+ printf("Node:\n");
+ print_node_id(node);
+ print_node_neighborood_set(node);
+ print_node_routing_table(node);
+ print_node_namespace_set(node);
+}
+
+/** Handle a given task */
static void handle_task(node_t node, msg_task_t task) {
XBT_DEBUG("Handling task %p", task);
char mailbox[MAILBOX_NAME_SIZE];
return;
}
switch (type) {
- /*
- * Try to join the ring
- */
+ /* Try to join the ring */
case TASK_JOIN: {
int next = routing_next(node, task_data->answer_id);
XBT_DEBUG("Join request from %08x forwarding to %08x", task_data->answer_id, next);
if (next!=node->id) {
get_mailbox(next, mailbox);
task_data->sender_id = node->id;
- task_data->steps++;
+ task_data->steps++;
task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, task_data);
MSG_task_send_with_timeout(task_sent, mailbox, timeout);
type = TASK_JOIN_REPLY;
MSG_task_send_with_timeout(task_sent, task_data->answer_to, timeout);
break;
}
- /*
- * Join reply from all the node touched by the join
- */
+ /* Join reply from all the node touched by the join */
case TASK_JOIN_LAST_REPLY:
// if last node touched reply, copy its namespace set
// TODO: it's work only if the two nodes are side to side (is it really the case ?)
case TASK_JOIN_REPLY:
XBT_DEBUG("Joining Reply");
- // if first node touched reply, copy its neighborood set
+ // if first node touched reply, copy its neighborhood set
if (task_data->sender_id == node->known_id) {
- node->neighborhood_set[0] = task_data->sender_id;
+ node->neighborhood_set[0] = task_data->sender_id;
for (i=1; i<NEIGHBORHOOD_SIZE; i++)
- node->neighborhood_set[i] = task_data->state->neighborhood_set[i-1];
+ node->neighborhood_set[i] = task_data->state->neighborhood_set[i-1];
}
-
+
// copy the corresponding routing table levels
- min = (node->id==task_data->answer_id) ? 0 : shl(node->id, task_data->answer_id);
+ min = (node->id==task_data->answer_id) ? 0 : shl(node->id, task_data->answer_id);
max = shl(node->id, task_data->sender_id)+1;
for (i=min;i<max;i++) {
d = domain(node->id, i);
for (j=0; j<LEVEL_SIZE; j++)
- if (d!=j)
+ if (d!=j)
node->routing_table[i][j] = task_data->state->routing_table[i][j];
- }
+ }
node->ready--;
// if the node is ready, do all the pending tasks and send update to known nodes
XBT_DEBUG("Node %i is ready!!!", node->id);
while(xbt_fifo_size(node->pending_tasks))
- handle_task(node, xbt_fifo_pop(node->pending_tasks));
+ handle_task(node, xbt_fifo_pop(node->pending_tasks));
- for (i=0; i<NAMESPACE_SIZE; i++) {
+ for (i=0; i<NAMESPACE_SIZE; i++) {
j = node->namespace_set[i];
if (j!=-1) {
XBT_DEBUG("Send update to %i", j);
get_mailbox(j, mailbox);
-
- req_data = xbt_new0(s_task_data_t,1);
+
+ req_data = xbt_new0(s_task_data_t,1);
req_data->answer_id = node->id;
req_data->steps = 0;
req_data->type = TASK_UPDATE;
MSG_task_send_with_timeout(task_sent, mailbox, timeout);
}
}
- }
+ }
break;
-
- /*
- * Recieved an update of state
- */
+ /* Received an update of state */
case TASK_UPDATE:
XBT_DEBUG("Task update %i !!!", node->id);
i=0;
for (; i<NAMESPACE_SIZE/2; i++){
curr_namespace_set[i] = node->namespace_set[i];
- task_namespace_set[i] = task_data->state->namespace_set[i];
+ task_namespace_set[i] = task_data->state->namespace_set[i];
}
task_namespace_set[i] = task_data->state->id;
for (; i<NAMESPACE_SIZE; i++){
curr_namespace_set[i] = node->namespace_set[i];
- task_namespace_set[i+1] = task_data->state->namespace_set[i];
+ task_namespace_set[i+1] = task_data->state->namespace_set[i];
}
// get the index of values before and after node->id in task_namespace
min = -1;
max = -1;
for (i=0; i<=NAMESPACE_SIZE; i++) {
- j = task_namespace_set[i];
+ j = task_namespace_set[i];
if (i<NAMESPACE_SIZE)
- printf("%08x %08x | ", j, curr_namespace_set[i]);
- if (j != -1 && j < node->id) min = i;
- if (j != -1 && max == -1 && j > node->id) max = i;
+ printf("%08x %08x | ", j, curr_namespace_set[i]);
+ if (j != -1 && j < node->id) min = i;
+ if (j != -1 && max == -1 && j > node->id) max = i;
}
printf("\n");
// add lower elements
j = NAMESPACE_SIZE/2-1;
for (i=NAMESPACE_SIZE/2-1; i>=0; i--) {
- printf("i:%i, j:%i, min:%i, currj:%08x, taskmin:%08x\n", i, j, min, curr_namespace_set[j], task_namespace_set[min]);
+ printf("i:%i, j:%i, min:%i, currj:%08x, taskmin:%08x\n", i, j, min, curr_namespace_set[j],
+ task_namespace_set[min]);
if (min<0) {
- node->namespace_set[i] = curr_namespace_set[j];
- j--;
- } else if (curr_namespace_set[j] == task_namespace_set[min]) {
- node->namespace_set[i] = curr_namespace_set[j];
- j--; min--;
- } else if (curr_namespace_set[j] > task_namespace_set[min]) {
node->namespace_set[i] = curr_namespace_set[j];
- j--;
- } else {
+ j--;
+ } else if (curr_namespace_set[j] == task_namespace_set[min]) {
+ node->namespace_set[i] = curr_namespace_set[j];
+ j--; min--;
+ } else if (curr_namespace_set[j] > task_namespace_set[min]) {
+ node->namespace_set[i] = curr_namespace_set[j];
+ j--;
+ } else {
node->namespace_set[i] = task_namespace_set[min];
- min--;
- }
+ min--;
+ }
}
// add greater elements
j = NAMESPACE_SIZE/2;
for (i=NAMESPACE_SIZE/2; i<NAMESPACE_SIZE; i++) {
- printf("i:%i, j:%i, max:%i, currj:%08x, taskmax:%08x\n", i, j, max, curr_namespace_set[j], task_namespace_set[max]);
+ printf("i:%i, j:%i, max:%i, currj:%08x, taskmax:%08x\n", i, j, max, curr_namespace_set[j],
+ task_namespace_set[max]);
if (min<0 || max>=NAMESPACE_SIZE) {
- node->namespace_set[i] = curr_namespace_set[j];
- j++;
- } else if (curr_namespace_set[j] == -1) {
- node->namespace_set[i] = task_namespace_set[max];
- max++;
- } else if (curr_namespace_set[j] == task_namespace_set[max]) {
- node->namespace_set[i] = curr_namespace_set[j];
- j++; max++;
- } else if (curr_namespace_set[j] < task_namespace_set[max]) {
+ node->namespace_set[i] = curr_namespace_set[j];
+ j++;
+ } else if (curr_namespace_set[j] == -1) {
+ node->namespace_set[i] = task_namespace_set[max];
+ max++;
+ } else if (curr_namespace_set[j] == task_namespace_set[max]) {
+ node->namespace_set[i] = curr_namespace_set[j];
+ j++; max++;
+ } else if (curr_namespace_set[j] < task_namespace_set[max]) {
node->namespace_set[i] = curr_namespace_set[j];
- j++;
- } else {
+ j++;
+ } else {
node->namespace_set[i] = task_namespace_set[max];
- max++;
- }
+ max++;
+ }
}
print_node_namespace_set(node);
node->routing_table[i][j] = task_data->state->routing_table[i][j];
}
}
- }
+ }
}
-/**
- * \brief Initializes the current node as the first one of the system.
- * \param node the current node
+/** \brief Initializes the current node as the first one of the system.
+ * \param node the current node
*/
static void create(node_t node){
node->ready = 0;
XBT_DEBUG("Create a new Pastry ring...");
}
-/*
- * Join the ring
- */
+/* Join the ring */
static int join(node_t node){
task_data_t req_data = xbt_new0(s_task_data_t,1);
req_data->type = TASK_JOIN;
return 1;
}
-/*
- * Print the node infomation
- */
-static void print_node(node_t node) {
- printf("Node:\n");
- print_node_id(node);
- print_node_neighborood_set(node);
- print_node_routing_table(node);
- print_node_namespace_set(node);
-}
-
-/*
- * Print the node id
- */
-static void print_node_id(node_t node) {
- int i;
- printf(" id: %i '%08x' ", node->id, node->id);
- for (i=0;i<LEVELS_COUNT;i++)
- printf(" %x", domain(node->id, i));
- printf("\n");
-}
-
-/*
- * Print the node neighborood set
- */
-static void print_node_neighborood_set(node_t node) {
- int i;
- printf(" neighborood:\n");
- for (i=0; i<NEIGHBORHOOD_SIZE; i++)
- printf(" %08x\n", node->neighborhood_set[i]);
-}
-
-/*
- * Print the routing table
- */
-static void print_node_routing_table(node_t node) {
- int i,j;
- printf(" routing table:\n");
- for (i=0; i<LEVELS_COUNT; i++){
- printf(" ");
- for (j=0; j<LEVEL_SIZE; j++)
- printf("%08x ", node->routing_table[i][j]);
- printf("\n");
- }
-}
-
-/*
- * Print the node namespace set
- */
-static void print_node_namespace_set(node_t node) {
- int i;
- printf(" namespace:\n");
- for (i=0; i<NAMESPACE_SIZE; i++)
- printf(" %08x\n", node->namespace_set[i]);
- printf("\n");
-
-}
-
-/*
- * Get the corresponding state of a node
- */
-static state_t node_get_state(node_t node) {
- int i,j;
- state_t state = xbt_new0(s_state_t,1);
- state->id = node->id;
- for (i=0; i<NEIGHBORHOOD_SIZE; i++)
- state->neighborhood_set[i] = node->neighborhood_set[i];
-
- for (i=0; i<LEVELS_COUNT; i++)
- for (j=0; j<LEVEL_SIZE; j++)
- state->routing_table[i][j] = node->routing_table[i][j];
-
- for (i=0; i<NAMESPACE_SIZE; i++)
- state->namespace_set[i] = node->namespace_set[i];
-
- return state;
-}
-
/**
* \brief Node Function
return 1;
}
-/*
- * Node data.
- */
-/*typedef struct s_node {
- int id; // my id
- char mailbox[MAILBOX_NAME_SIZE]; // my mailbox name (string representation of the id)
- s_finger_t *fingers; // finger table, of size nb_bits (fingers[0] is my successor)
- int pred_id; // predecessor id
- char pred_mailbox[MAILBOX_NAME_SIZE]; // predecessor's mailbox name
- int next_finger_to_fix; // index of the next finger to fix in fix_fingers()
- msg_comm_t comm_receive; // current communication to receive
- double last_change_date; // last time I changed a finger or my predecessor
-} s_node_t, *node_t;*/
-
-/**
- * \brief Main function.
- */
+/** \brief Main function. */
int main(int argc, char *argv[])
{
MSG_init(&argc, argv);
"Usage: %s [-nb_bits=n] [-timeout=t] platform_file deployment_file\n"
"\tExample: %s ../msg_platform.xml pastry10.xml\n",
argv[0], argv[0]);
-
+
char **options = &argv[1];
while (!strncmp(options[0], "-", 1)) {
-
int length = strlen("-nb_bits=");
if (!strncmp(options[0], "-nb_bits=", length) && strlen(options[0]) > length) {
nb_bits = xbt_str_parse_int(options[0] + length, "Invalid nb_bits parameter: %s");
XBT_DEBUG("Set nb_bits to %d", nb_bits);
- }
- else {
-
+ } else {
length = strlen("-timeout=");
if (!strncmp(options[0], "-timeout=", length) && strlen(options[0]) > length) {
timeout = xbt_str_parse_int(options[0] + length, "Invalid timeout parameter: %s");
XBT_DEBUG("Set timeout to %d", timeout);
- }
- else {
+ } else {
xbt_die("Invalid chord option '%s'", options[0]);
}
}
options++;
}
- const char* platform_file = options[0];
- const char* application_file = options[1];
+ MSG_create_environment(options[0]);
- MSG_create_environment(platform_file);
-
MSG_function_register("node", node);
- MSG_launch_application(application_file);
+ MSG_launch_application(options[1]);
msg_error_t res = MSG_main();
XBT_CRITICAL("Messages created: %ld", smx_total_comms);