X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/c20b20602a3b551b39aa7910874cca47c45430ab..248b2a21c9e6be98e6b569b9af0d58dde6c52c34:/examples/msg/dht-pastry/dht-pastry.c diff --git a/examples/msg/dht-pastry/dht-pastry.c b/examples/msg/dht-pastry/dht-pastry.c index df375656c2..c19cd35798 100644 --- a/examples/msg/dht-pastry/dht-pastry.c +++ b/examples/msg/dht-pastry/dht-pastry.c @@ -1,18 +1,20 @@ -/* Copyright (c) 2013-2015. The SimGrid Team. +/* Copyright (c) 2013-2017. The SimGrid Team. * All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ -#include #include "simgrid/msg.h" +#include "xbt/dynar.h" +#include - XBT_LOG_NEW_DEFAULT_CATEGORY(msg_pastry, "Messages specific for this msg example"); + +XBT_LOG_NEW_DEFAULT_CATEGORY(msg_pastry, "Messages specific for this msg example"); /* TODO: * * - handle node departure * * - handle objects on the network * - * - handle neighborood in the update */ + * - handle neighborhood in the update */ #define COMM_SIZE 10 #define COMP_SIZE 0 @@ -29,8 +31,6 @@ static int nb_bits = 16; static int timeout = 50; static int max_simulation_time = 1000; -extern long int smx_total_comms; - typedef struct s_node { int id; //128bits generated random(2^128 -1) int known_id; @@ -40,15 +40,17 @@ typedef struct s_node { int routing_table[LEVELS_COUNT][LEVEL_SIZE]; int ready; msg_comm_t comm_receive; // current communication to receive - xbt_fifo_t pending_tasks; -} s_node_t, *node_t; + xbt_dynar_t pending_tasks; +} s_node_t; +typedef s_node_t* node_t; typedef struct s_state { int id; int namespace_set[NAMESPACE_SIZE]; int neighborhood_set[NEIGHBORHOOD_SIZE]; int routing_table[LEVELS_COUNT][LEVEL_SIZE]; -} s_state_t, *state_t; +} s_state_t; +typedef s_state_t* state_t; /** Types of tasks exchanged between nodes. */ typedef enum { @@ -60,17 +62,17 @@ typedef enum { typedef struct s_task_data { e_task_type_t type; // type of task - int sender_id; // id paramater (used by some types of tasks) + int sender_id; // id parameter (used by some types of tasks) //int request_finger; // finger parameter (used by some types of tasks) int answer_id; // answer (used by some types of tasks) char answer_to[MAILBOX_NAME_SIZE]; // mailbox to send an answer to (if any) //const char* issuer_host_name; // used for logging int steps; state_t state; -} s_task_data_t, *task_data_t; +} s_task_data_t; +typedef s_task_data_t* task_data_t; -static void get_mailbox(int node_id, char* mailbox); -static int domain(int a, int level); +static int domain(unsigned int a, unsigned int level); static int shl(int a, int b); static int closest_in_namespace_set(node_t node, int dest); static int routing_next(node_t node, int dest); @@ -87,11 +89,12 @@ static void get_mailbox(int node_id, char* mailbox) } /** Get the specific level of a node id */ -int domain_mask = 0; -static int domain(int a, int level) { +unsigned int domain_mask = 0; +static int domain(unsigned int a, unsigned int level) +{ if (domain_mask == 0) domain_mask = pow(2, DOMAIN_SIZE) - 1; - int shift = (LEVELS_COUNT-level-1)*DOMAIN_SIZE; + unsigned int shift = (LEVELS_COUNT-level-1)*DOMAIN_SIZE; return (a >> shift) & domain_mask; } @@ -103,20 +106,29 @@ static int shl(int a, int b) { return l; } +/* Frees the memory used by a task and destroy it */ +static void task_free(void* task) +{ + if(task != NULL){ + s_task_data_t* data = (s_task_data_t*)MSG_task_get_data(task); + xbt_free(data->state); + xbt_free(data); + MSG_task_destroy(task); + } +} + /* Get the closest id to the dest in the node namespace_set */ static int closest_in_namespace_set(node_t node, int dest) { - int best_dist; int res = -1; - if ((node->namespace_set[NAMESPACE_SIZE-1] <= dest) & (dest <= node->namespace_set[0])) { - best_dist = abs(node->id - dest); + if ((node->namespace_set[NAMESPACE_SIZE-1] <= dest) && (dest <= node->namespace_set[0])) { + int best_dist = abs(node->id - dest); res = node->id; - int i, dist; - for (i=0; inamespace_set[i]!=-1) { - dist = abs(node->namespace_set[i] - dest); + int dist = abs(node->namespace_set[i] - dest); if (distnamespace_set[i]; + res = node->namespace_set[i]; } } } @@ -127,33 +139,31 @@ static int closest_in_namespace_set(node_t node, int dest) { /* Find the next node to forward a message to */ static int routing_next(node_t node, int dest) { int closest = closest_in_namespace_set(node, dest); - int res = -1; if (closest!=-1) return closest; int l = shl(node->id, dest); - res = node->routing_table[l][domain(dest, l)]; - if (res!=-1) + int res = node->routing_table[l][domain(dest, l)]; + if (res != -1) return res; //rare case int dist = abs(node->id - dest); - int i,j; - for (i=l; irouting_table[i][j]; if (res!=-1 && abs(res - dest)neighborhood_set[i]; if (res!=-1 && shl(res, dest)>=l && abs(res - dest)namespace_set[i]; if (res!=-1 && shl(res, dest)>=l && abs(res - dest)id = node->id; - for (i=0; ineighborhood_set[i] = node->neighborhood_set[i]; - for (i=0; irouting_table[i][j] = node->routing_table[i][j]; - for (i=0; inamespace_set[i] = node->namespace_set[i]; return state; } -/* Print the node id */ static void print_node_id(node_t node) { - int i; - printf(" id: %i '%08x' ", node->id, node->id); - for (i=0;iid, i)); - printf("\n"); + XBT_INFO(" Id: %i '%08x' ", node->id, (unsigned)node->id); } -/* * Print the node neighborhood set */ static void print_node_neighborood_set(node_t node) { - int i; - printf(" Neighborhood:\n"); - for (i=0; ineighborhood_set[i]); + XBT_INFO(" Neighborhood:"); + for (int i=0; ineighborhood_set[i]); } -/* Print the routing table */ static void print_node_routing_table(node_t node) { - printf(" routing table:\n"); + XBT_INFO(" Routing table:"); for (int i=0; irouting_table[i][j]); - printf("\n"); + XBT_INFO(" %08x ", (unsigned)node->routing_table[i][j]); } } - /* Print the node namespace set */ static void print_node_namespace_set(node_t node) { - printf(" namespace:\n"); + XBT_INFO(" Namespace:"); for (int i=0; inamespace_set[i]); - printf("\n"); + XBT_INFO(" %08x", (unsigned)node->namespace_set[i]); } /* Print the node information */ static void print_node(node_t node) { - printf("Node:\n"); + XBT_INFO("Node:"); print_node_id(node); print_node_neighborood_set(node); print_node_routing_table(node); @@ -229,51 +226,60 @@ static void print_node(node_t node) { static void handle_task(node_t node, msg_task_t task) { XBT_DEBUG("Handling task %p", task); char mailbox[MAILBOX_NAME_SIZE]; - int i, j, min, max, d; + int i; + int j; + int min; + int max; + int next; msg_task_t task_sent; task_data_t req_data; task_data_t task_data = (task_data_t) MSG_task_get_data(task); e_task_type_t type = task_data->type; // If the node is not ready keep the task for later if (node->ready != 0 && !(type==TASK_JOIN_LAST_REPLY || type==TASK_JOIN_REPLY)) { - XBT_DEBUG("Task pending %i", type); - xbt_fifo_push(node->pending_tasks, task); + XBT_DEBUG("Task pending %u", type); + xbt_dynar_push(node->pending_tasks, &task); return; } switch (type) { /* Try to join the ring */ - case TASK_JOIN: { - int next = routing_next(node, task_data->answer_id); - XBT_DEBUG("Join request from %08x forwarding to %08x", task_data->answer_id, next); + case TASK_JOIN: + next = routing_next(node, task_data->answer_id); + XBT_DEBUG("Join request from %08x forwarding to %08x", (unsigned)task_data->answer_id, (unsigned)next); type = TASK_JOIN_LAST_REPLY; req_data = xbt_new0(s_task_data_t,1); req_data->answer_id = task_data->sender_id; req_data->steps = task_data->steps + 1; - + // if next different from current node forward the join if (next!=node->id) { get_mailbox(next, mailbox); task_data->sender_id = node->id; task_data->steps++; task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, task_data); - MSG_task_send_with_timeout(task_sent, mailbox, timeout); + if (MSG_task_send_with_timeout(task_sent, mailbox, timeout)== MSG_TIMEOUT) { + XBT_DEBUG("Timeout expired when forwarding join to next %d", next); + task_free(task_sent); + } type = TASK_JOIN_REPLY; - } - + } + // send back the current node state to the joining node req_data->type = type; req_data->sender_id = node->id; get_mailbox(node->id, req_data->answer_to); req_data->state = node_get_state(node); task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data); - MSG_task_send_with_timeout(task_sent, task_data->answer_to, timeout); + if (MSG_task_send_with_timeout(task_sent, task_data->answer_to, timeout)== MSG_TIMEOUT) { + XBT_DEBUG("Timeout expired when sending back the current node state to the joining node to %d", node->id); + task_free(task_sent); + } break; - } /* Join reply from all the node touched by the join */ case TASK_JOIN_LAST_REPLY: // if last node touched reply, copy its namespace set - // TODO: it's work only if the two nodes are side to side (is it really the case ?) + // TODO: it works only if the two nodes are side to side (is it really the case ?) j = (task_data->sender_id < node->id) ? -1 : 0; for (i=0; inamespace_set[i] = task_data->state->namespace_set[i-j]; @@ -281,6 +287,7 @@ static void handle_task(node_t node, msg_task_t task) { } node->namespace_set[NAMESPACE_SIZE/2+j] = task_data->sender_id; node->ready += task_data->steps + 1; + /* no break */ case TASK_JOIN_REPLY: XBT_DEBUG("Joining Reply"); @@ -295,19 +302,21 @@ static void handle_task(node_t node, msg_task_t task) { min = (node->id==task_data->answer_id) ? 0 : shl(node->id, task_data->answer_id); max = shl(node->id, task_data->sender_id)+1; for (i=min;iid, i); + int d = domain(node->id, i); for (j=0; jrouting_table[i][j] = task_data->state->routing_table[i][j]; - } + } node->ready--; // if the node is ready, do all the pending tasks and send update to known nodes if (node->ready==0) { XBT_DEBUG("Node %i is ready!!!", node->id); - - while(xbt_fifo_size(node->pending_tasks)) - handle_task(node, xbt_fifo_pop(node->pending_tasks)); + while(xbt_dynar_length(node->pending_tasks)){ + msg_task_t task; + xbt_dynar_shift(node->pending_tasks, &task); + handle_task(node, task); + } for (i=0; inamespace_set[i]; @@ -323,7 +332,10 @@ static void handle_task(node_t node, msg_task_t task) { get_mailbox(node->id, req_data->answer_to); req_data->state = node_get_state(node); task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data); - MSG_task_send_with_timeout(task_sent, mailbox, timeout); + if (MSG_task_send_with_timeout(task_sent, mailbox, timeout)== MSG_TIMEOUT) { + XBT_DEBUG("Timeout expired when sending update to %d", j); + task_free(task_sent); + } } } } @@ -333,14 +345,14 @@ static void handle_task(node_t node, msg_task_t task) { XBT_DEBUG("Task update %i !!!", node->id); /* Update namespace ses */ - printf("Task update from %i !!!\n", task_data->sender_id); + XBT_INFO("Task update from %i !!!", task_data->sender_id); + XBT_INFO("Node:"); print_node_id(node); print_node_namespace_set(node); int curr_namespace_set[NAMESPACE_SIZE]; int task_namespace_set[NAMESPACE_SIZE+1]; - - // Copy the current namedspace - // and the task state namespace with state->id in the middle + + // Copy the current namespace and the task state namespace with state->id in the middle i=0; for (; inamespace_set[i]; @@ -348,7 +360,7 @@ static void handle_task(node_t node, msg_task_t task) { } task_namespace_set[i] = task_data->state->id; for (; inamespace_set[i]; + curr_namespace_set[i] = node->namespace_set[i]; task_namespace_set[i+1] = task_data->state->namespace_set[i]; } @@ -357,27 +369,22 @@ static void handle_task(node_t node, msg_task_t task) { max = -1; for (i=0; i<=NAMESPACE_SIZE; i++) { j = task_namespace_set[i]; - if (iid) min = i; - if (j != -1 && max == -1 && j > node->id) max = i; + if (j != -1 && j < node->id) + min = i; + if (j != -1 && max == -1 && j > node->id) + max = i; } - printf("\n"); // add lower elements j = NAMESPACE_SIZE/2-1; for (i=NAMESPACE_SIZE/2-1; i>=0; i--) { - printf("i:%i, j:%i, min:%i, currj:%08x, taskmin:%08x\n", i, j, min, curr_namespace_set[j], - task_namespace_set[min]); - if (min<0) { + if (min < 0 || curr_namespace_set[j] > task_namespace_set[min]) { node->namespace_set[i] = curr_namespace_set[j]; j--; } else if (curr_namespace_set[j] == task_namespace_set[min]) { - node->namespace_set[i] = curr_namespace_set[j]; - j--; min--; - } else if (curr_namespace_set[j] > task_namespace_set[min]) { node->namespace_set[i] = curr_namespace_set[j]; j--; + min--; } else { node->namespace_set[i] = task_namespace_set[min]; min--; @@ -387,26 +394,23 @@ static void handle_task(node_t node, msg_task_t task) { // add greater elements j = NAMESPACE_SIZE/2; for (i=NAMESPACE_SIZE/2; i=NAMESPACE_SIZE) { node->namespace_set[i] = curr_namespace_set[j]; j++; - } else if (curr_namespace_set[j] == -1) { - node->namespace_set[i] = task_namespace_set[max]; - max++; - } else if (curr_namespace_set[j] == task_namespace_set[max]) { - node->namespace_set[i] = curr_namespace_set[j]; - j++; max++; - } else if (curr_namespace_set[j] < task_namespace_set[max]) { - node->namespace_set[i] = curr_namespace_set[j]; - j++; - } else { - node->namespace_set[i] = task_namespace_set[max]; - max++; + } else if (max >= 0){ + if (curr_namespace_set[j] == -1 || curr_namespace_set[j] > task_namespace_set[max]) { + node->namespace_set[i] = task_namespace_set[max]; + max++; + } else if (curr_namespace_set[j] == task_namespace_set[max]) { + node->namespace_set[i] = curr_namespace_set[j]; + j++; + max++; + } else { + node->namespace_set[i] = curr_namespace_set[j]; + j++; + } } } - print_node_namespace_set(node); /* Update routing table */ for (i=shl(node->id, task_data->state->id); irouting_table[i][j] = task_data->state->routing_table[i][j]; } } + break; + default: + THROW_IMPOSSIBLE; } -} - -/** \brief Initializes the current node as the first one of the system. - * \param node the current node - */ -static void create(node_t node){ - node->ready = 0; - XBT_DEBUG("Create a new Pastry ring..."); + task_free(task); } /* Join the ring */ @@ -440,12 +440,14 @@ static int join(node_t node){ msg_task_t task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data); XBT_DEBUG("Trying to join Pastry ring... (with node %s)", mailbox); - MSG_task_send_with_timeout(task_sent, mailbox, timeout); + if (MSG_task_send_with_timeout(task_sent, mailbox, timeout)== MSG_TIMEOUT) { + XBT_DEBUG("Timeout expired when joining ring with node %d", node->known_id); + task_free(task_sent); + } return 1; } - /** * \brief Node Function * Arguments: @@ -457,38 +459,37 @@ static int join(node_t node){ static int node(int argc, char *argv[]) { double init_time = MSG_get_clock(); - msg_task_t task_received = NULL; - int join_success = 0; + msg_task_t task_received = NULL; + int join_success = 0; double deadline; xbt_assert(argc == 3 || argc == 5, "Wrong number of arguments for this node"); s_node_t node = {0}; node.id = xbt_str_parse_int(argv[1], "Invalid ID: %s"); node.known_id = -1; node.ready = -1; - node.pending_tasks = xbt_fifo_new(); + node.pending_tasks = xbt_dynar_new(sizeof(msg_task_t), NULL); get_mailbox(node.id, node.mailbox); - XBT_DEBUG("New node with id %s (%08x)", node.mailbox, node.id); - - int i,j,d; - for (i=0; i 2, + xbt_assert(argc > 2, "Usage: %s [-nb_bits=n] [-timeout=t] platform_file deployment_file\n" - "\tExample: %s ../msg_platform.xml pastry10.xml\n", + "\tExample: %s ../msg_platform.xml pastry10.xml\n", argv[0], argv[0]); char **options = &argv[1]; @@ -559,7 +567,7 @@ int main(int argc, char *argv[]) timeout = xbt_str_parse_int(options[0] + length, "Invalid timeout parameter: %s"); XBT_DEBUG("Set timeout to %d", timeout); } else { - xbt_die("Invalid chord option '%s'", options[0]); + xbt_die("Invalid pastry option '%s'", options[0]); } } options++; @@ -571,7 +579,6 @@ int main(int argc, char *argv[]) MSG_launch_application(options[1]); msg_error_t res = MSG_main(); - XBT_CRITICAL("Messages created: %ld", smx_total_comms); XBT_INFO("Simulated time: %g", MSG_get_clock()); return res != MSG_OK;