* under the terms of the license (GNU LGPL) which comes with this package. */
#include "simgrid/msg.h"
-#include "xbt/fifo.h"
+#include "xbt/dynar.h"
#include <math.h>
+
XBT_LOG_NEW_DEFAULT_CATEGORY(msg_pastry, "Messages specific for this msg example");
/* TODO: *
int routing_table[LEVELS_COUNT][LEVEL_SIZE];
int ready;
msg_comm_t comm_receive; // current communication to receive
- xbt_fifo_t pending_tasks;
+ xbt_dynar_t pending_tasks;
} s_node_t, *node_t;
typedef struct s_state {
typedef struct s_task_data {
e_task_type_t type; // type of task
- int sender_id; // id paramater (used by some types of tasks)
+ int sender_id; // id parameter (used by some types of tasks)
//int request_finger; // finger parameter (used by some types of tasks)
int answer_id; // answer (used by some types of tasks)
char answer_to[MAILBOX_NAME_SIZE]; // mailbox to send an answer to (if any)
/* Frees the memory used by a task and destroy it */
static void task_free(void* task)
{
- // TODO add a parameter data_free_function to MSG_task_create?
if(task != NULL){
s_task_data_t* data = (s_task_data_t*)MSG_task_get_data(task);
xbt_free(data->state);
//rare case
int dist = abs(node->id - dest);
- int i;
- for (i=l; i<LEVELS_COUNT; i++) {
+ for (int i=l; i<LEVELS_COUNT; i++) {
for (int j=0; j<LEVEL_SIZE; j++) {
res = node->routing_table[i][j];
if (res!=-1 && abs(res - dest)<dist)
}
}
- for (i=0; i<NEIGHBORHOOD_SIZE; i++) {
+ for (int i=0; i<NEIGHBORHOOD_SIZE; i++) {
res = node->neighborhood_set[i];
if (res!=-1 && shl(res, dest)>=l && abs(res - dest)<dist)
return res;
}
- for (i=0; i<NAMESPACE_SIZE; i++) {
+ for (int i=0; i<NAMESPACE_SIZE; i++) {
res = node->namespace_set[i];
if (res!=-1 && shl(res, dest)>=l && abs(res - dest)<dist)
return res;
/* Get the corresponding state of a node */
static state_t node_get_state(node_t node) {
- int i;
state_t state = xbt_new0(s_state_t,1);
state->id = node->id;
- for (i=0; i<NEIGHBORHOOD_SIZE; i++)
+ for (int i=0; i<NEIGHBORHOOD_SIZE; i++)
state->neighborhood_set[i] = node->neighborhood_set[i];
- for (i=0; i<LEVELS_COUNT; i++)
+ for (int i=0; i<LEVELS_COUNT; i++)
for (int j=0; j<LEVEL_SIZE; j++)
state->routing_table[i][j] = node->routing_table[i][j];
- for (i=0; i<NAMESPACE_SIZE; i++)
+ for (int i=0; i<NAMESPACE_SIZE; i++)
state->namespace_set[i] = node->namespace_set[i];
return state;
}
-/* Print the node id */
static void print_node_id(node_t node) {
XBT_INFO(" Id: %i '%08x' ", node->id, node->id);
}
-/* * Print the node neighborhood set */
static void print_node_neighborood_set(node_t node) {
XBT_INFO(" Neighborhood:");
for (int i=0; i<NEIGHBORHOOD_SIZE; i++)
XBT_INFO(" %08x", node->neighborhood_set[i]);
}
-/* Print the routing table */
static void print_node_routing_table(node_t node) {
XBT_INFO(" Routing table:");
for (int i=0; i<LEVELS_COUNT; i++){
XBT_INFO(" %08x ", node->routing_table[i][j]);
}
}
-
/* Print the node namespace set */
static void print_node_namespace_set(node_t node) {
XBT_INFO(" Namespace:");
// If the node is not ready keep the task for later
if (node->ready != 0 && !(type==TASK_JOIN_LAST_REPLY || type==TASK_JOIN_REPLY)) {
XBT_DEBUG("Task pending %i", type);
- xbt_fifo_push(node->pending_tasks, task);
+ xbt_dynar_push(node->pending_tasks, &task);
return;
}
switch (type) {
// if the node is ready, do all the pending tasks and send update to known nodes
if (node->ready==0) {
XBT_DEBUG("Node %i is ready!!!", node->id);
-
- while(xbt_fifo_size(node->pending_tasks))
- handle_task(node, xbt_fifo_pop(node->pending_tasks));
+ while(xbt_dynar_length(node->pending_tasks)){
+ msg_task_t task;
+ xbt_dynar_shift(node->pending_tasks, &task);
+ handle_task(node, task);
+ }
for (i=0; i<NAMESPACE_SIZE; i++) {
j = node->namespace_set[i];
task_free(task);
}
-/** \brief Initializes the current node as the first one of the system.
- * \param node the current node
- */
-static void create(node_t node){
- node->ready = 0;
- XBT_DEBUG("Create a new Pastry ring...");
-}
-
/* Join the ring */
static int join(node_t node){
task_data_t req_data = xbt_new0(s_task_data_t,1);
node.id = xbt_str_parse_int(argv[1], "Invalid ID: %s");
node.known_id = -1;
node.ready = -1;
- node.pending_tasks = xbt_fifo_new();
+ node.pending_tasks = xbt_dynar_new(sizeof(msg_task_t), NULL);
get_mailbox(node.id, node.mailbox);
XBT_DEBUG("New node with id %s (%08x)", node.mailbox, node.id);
-
- int i;
- for (i=0; i<LEVELS_COUNT; i++){
+
+ for (int i=0; i<LEVELS_COUNT; i++){
int d = domain(node.id, i);
for (int j=0; j<LEVEL_SIZE; j++)
node.routing_table[i][j] = (d==j) ? node.id : -1;
}
- for (i=0; i<NEIGHBORHOOD_SIZE; i++)
+ for (int i=0; i<NEIGHBORHOOD_SIZE; i++)
node.neighborhood_set[i] = -1;
- for (i=0; i<NAMESPACE_SIZE; i++)
+ for (int i=0; i<NAMESPACE_SIZE; i++)
node.namespace_set[i] = -1;
if (argc == 3) { // first ring
XBT_DEBUG("Hey! Let's create the system.");
deadline = xbt_str_parse_double(argv[2], "Invalid deadline: %s");
- create(&node);
+ node.ready = 0;
+ XBT_DEBUG("Create a new Pastry ring...");
join_success = 1;
- }
- else {
+ } else {
node.known_id = xbt_str_parse_int(argv[2], "Invalid known ID: %s");
double sleep_time = xbt_str_parse_double(argv[3], "Invalid sleep time: %s");
deadline = xbt_str_parse_double(argv[4], "Invalid deadline: %s");
XBT_DEBUG("Failed to receive a task. Nevermind.");
MSG_comm_destroy(node.comm_receive);
node.comm_receive = NULL;
- }
- else {
+ } else {
// the task was successfully received
MSG_comm_destroy(node.comm_receive);
node.comm_receive = NULL;
}
}
- xbt_free(node.pending_tasks);
+ xbt_dynar_free(&node.pending_tasks);
return 1;
}