From: Samuel Lepetit Date: Wed, 4 Jul 2012 16:58:43 +0000 (+0200) Subject: Add kademlia C example X-Git-Tag: v3_8~331 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/9cc88bb9ed0a8974e524f67198984a1e03cb00b0 Add kademlia C example --- diff --git a/buildtools/Cmake/AddTests.cmake b/buildtools/Cmake/AddTests.cmake index c50ef8cf36..fd3a4a42bd 100644 --- a/buildtools/Cmake/AddTests.cmake +++ b/buildtools/Cmake/AddTests.cmake @@ -214,6 +214,8 @@ if(NOT enable_memcheck) ADD_TEST(msg-chord-thread-parallel ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:thread --cfg contexts/nthreads:4 --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/chord --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/chord chord_crosstraffic.tesh) ADD_TEST(msg-bittorrent-thread ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:thread --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/bittorrent --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/bittorrent bittorrent.tesh) ADD_TEST(msg-bittorrent-thread-parallel ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/nthreads:4 --cfg contexts/factory:thread --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/bittorrent --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/bittorrent bittorrent.tesh) + ADD_TEST(msg-kademlia-thread ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:thread --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/kademlia --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/kademlia kademlia.tesh) + ADD_TEST(msg-kademlia-thread-parallel ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/nthreads:4 --cfg contexts/factory:thread --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/kademlia --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/kademlia kademlia.tesh) if(CONTEXT_UCONTEXT) ADD_TEST(msg-migration-ucontext ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:ucontext --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/msg --cd ${CMAKE_BINARY_DIR}/examples/msg ${CMAKE_HOME_DIRECTORY}/examples/msg/migration/migration.tesh) @@ -229,6 +231,8 @@ if(NOT enable_memcheck) ADD_TEST(msg-chord-ucontext-parallel ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:ucontext --cfg contexts/nthreads:4 --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/chord --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/chord chord_crosstraffic.tesh) ADD_TEST(msg-bittorrent-ucontext ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:ucontext --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/bittorrent --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/bittorrent bittorrent.tesh) ADD_TEST(msg-bittorrent-ucontext-parallel ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/nthreads:4 --cfg contexts/factory:ucontext --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/bittorrent --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/bittorrent bittorrent.tesh) + ADD_TEST(msg-kademlia-ucontext ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:ucontext --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/kademlia --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/kademlia kademlia.tesh) + ADD_TEST(msg-kademlia-ucontext-parallel ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/nthreads:4 --cfg contexts/factory:ucontext --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/kademlia --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/kademlia kademlia.tesh) endif(CONTEXT_UCONTEXT) if(HAVE_RAWCTX) @@ -245,7 +249,8 @@ if(NOT enable_memcheck) ADD_TEST(msg-chord-raw-parallel ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:raw --cfg contexts/nthreads:4 --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/chord --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/chord chord_crosstraffic.tesh) ADD_TEST(msg-bittorrent-raw ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:raw --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/bittorrent --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/bittorrent bittorrent.tesh) ADD_TEST(msg-bittorrent-raw-parallel ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/nthreads:4 --cfg contexts/factory:raw --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/bittorrent --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/bittorrent bittorrent.tesh) - + ADD_TEST(msg-kademlia-raw ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:raw --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/kademlia --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/kademlia kademlia.tesh) + ADD_TEST(msg-kademlia-raw-parallel ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/nthreads:4 --cfg contexts/factory:raw --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/kademlia --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/kademlia kademlia.tesh) endif(HAVE_RAWCTX) IF(${ARCH_32_BITS}) diff --git a/buildtools/Cmake/MakeExe.cmake b/buildtools/Cmake/MakeExe.cmake index 246b93e823..57d7da86a0 100644 --- a/buildtools/Cmake/MakeExe.cmake +++ b/buildtools/Cmake/MakeExe.cmake @@ -53,8 +53,9 @@ add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/parallel_task) add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/priority) add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/masterslave) add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/icomms) -add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/chord) add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/bittorrent) +add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/chord) +add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/kademlia) add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/token_ring) add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/pmm) add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/start_kill_time) diff --git a/examples/msg/kademlia/CMakeLists.txt b/examples/msg/kademlia/CMakeLists.txt new file mode 100644 index 0000000000..4a9f9652f1 --- /dev/null +++ b/examples/msg/kademlia/CMakeLists.txt @@ -0,0 +1,8 @@ +cmake_minimum_required(VERSION 2.6) + +set(EXECUTABLE_OUTPUT_PATH "${CMAKE_CURRENT_BINARY_DIR}") + +add_executable(kademlia "kademlia.c" "node.c" "routing_table.c" +"task.c" "answer.c") +### Add definitions for compile +target_link_libraries(kademlia simgrid ) diff --git a/examples/msg/kademlia/answer.c b/examples/msg/kademlia/answer.c new file mode 100644 index 0000000000..ad0ef99c36 --- /dev/null +++ b/examples/msg/kademlia/answer.c @@ -0,0 +1,140 @@ +/* Copyright (c) 2012. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ +#include "answer.h" + +XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(msg_kademlia_node); + +/** + * Initialize a node answer object. + */ +answer_t answer_init(unsigned int destination_id) { + answer_t answer = xbt_new(s_answer_t,1); + answer->nodes = xbt_dynar_new(sizeof(node_contact_t),NULL); + answer->size = 0; + answer->destination_id = destination_id; + + return answer; +} +/** + * Destroys a node answer object. + */ +void answer_free(answer_t answer) { + unsigned int i; + for (i = 0; i < answer->size; i++) { + node_contact_free(*(void**)xbt_dynar_get_ptr(answer->nodes,i)); + } + xbt_dynar_free(&answer->nodes); + xbt_free(answer); +} +/** + * @brief Prints a answer_t, for debugging purposes + */ +void answer_print(answer_t answer) { + unsigned int cpt; + node_contact_t contact; + XBT_INFO("Searching %d, size %d",answer->destination_id,answer->size); + xbt_dynar_foreach(answer->nodes,cpt,contact) { + XBT_INFO("Node %d: %d is at distance %d",cpt,contact->id,contact->distance); + } +} +/** + * @brief Merge two answer_t together, only keeping the best nodes + * @param destination the destination in which the nodes will be put + * @param source the source of the nodes to add + */ +unsigned int answer_merge(answer_t destination, answer_t source) { + node_contact_t contact, contact_copy; + unsigned int cpt; + unsigned int nb_added = 0; + /* TODO: Check if same destination */ + xbt_dynar_foreach(source->nodes,cpt,contact) { + if (!answer_contains(destination,contact->id)) { + contact_copy = node_contact_copy(contact); + xbt_dynar_push(destination->nodes,&contact_copy); + destination->size++; + nb_added++; + } + } + answer_sort(destination); + answer_trim(destination); + return nb_added; +} +/** + * Helper to sort answer_t objects + */ +static int _answer_sort_function(const void* e1,const void* e2) { + node_contact_t c1 = *(void**)e1; + node_contact_t c2 = *(void**)e2; + return c1->distance >= c2->distance; +} +/** + * Sorts a answer_t, by node distance. + * @param answer the answer to sort + * @param destination_id the id of the guy we are trying to find + */ +void answer_sort(answer_t answer) { + xbt_dynar_sort(answer->nodes,&_answer_sort_function); +} +/** + * Trims a answer_t, in order for it to have a size of less or equal + * to "bucket_size" + * @param answer the answer_t to trim + */ +void answer_trim(answer_t answer) { + node_contact_t value; + while (answer->size > bucket_size) { + xbt_dynar_pop(answer->nodes,&value); + answer->size--; + node_contact_free(value); + } + xbt_assert(xbt_dynar_length(answer->nodes) == answer->size,"Wrong size for the answer"); +} +/** + * Adds the content of a bucket unsigned into a answer object. + * @param bucket the bucket we have to had unsigned into + * @param answer the answer object we're going to put the data in + * @param destination_id the id of the guy we are trying to find. + */ +void answer_add_bucket(bucket_t bucket, answer_t answer) { + unsigned int cpt; + unsigned int id, distance; + node_contact_t contact; + xbt_assert((bucket != NULL), "Provided a NULL bucket"); + xbt_assert((bucket->nodes != NULL), "Provided a bucket which nodes are NULL"); + xbt_dynar_foreach(bucket->nodes,cpt,id) { + distance = id ^ answer->destination_id; + contact = node_contact_new(id,distance); + xbt_dynar_push(answer->nodes,&contact); + answer->size++; + } +} +/** + * Returns if the id supplied is in the answer. + * @param id : id we're looking for + */ +unsigned int answer_contains(answer_t answer, unsigned int id) { + unsigned int i = 0, size = xbt_dynar_length(answer->nodes); + node_contact_t contact; + for (i = 0; i < size; i++) { + contact = xbt_dynar_get_as(answer->nodes,i,node_contact_t); + if (id == contact->id) { + return 1; + } + } + return 0; +} +/** + * Returns if the destination we are trying to find is found + * @param answer the answer + * @return if the destination is found. + */ +unsigned int answer_destination_found(answer_t answer) { + if (xbt_dynar_length(answer->nodes) < 1) { + return 0; + } + node_contact_t contact_tail = xbt_dynar_get_as(answer->nodes,0,node_contact_t); + return contact_tail->distance == 0; +} diff --git a/examples/msg/kademlia/answer.h b/examples/msg/kademlia/answer.h new file mode 100644 index 0000000000..2e9f585bdd --- /dev/null +++ b/examples/msg/kademlia/answer.h @@ -0,0 +1,32 @@ +/* Copyright (c) 2012. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ +#ifndef _KADEMLIA_EXAMPLES_ANSWER_H_ +#define _KADEMLIA_EXAMPLES_ANSWER_H_ +#include +#include "routing_table.h" +/* + * Node query anwser. contains the elements closest + * to the id given. + */ +typedef struct s_node_answer { + unsigned int destination_id; + xbt_dynar_t nodes; //Dynar of node_contact_t + unsigned int size; +} s_answer_t, *answer_t; + +#include "node.h" + +answer_t answer_init(unsigned int destination_id); +void answer_free(answer_t answer); +void answer_print(answer_t answer); +unsigned int answer_merge(answer_t destination, answer_t source); +void answer_sort(answer_t answer); +void answer_trim(answer_t answer); +void answer_add_bucket(bucket_t bucket, answer_t answer); +unsigned int answer_contains(answer_t answer, unsigned int id); +unsigned int answer_destination_found(answer_t answer); + +#endif /* _KADEMLIA_EXAMPLES_ANSWER_H_ */ diff --git a/examples/msg/kademlia/common.h b/examples/msg/kademlia/common.h new file mode 100644 index 0000000000..3245f2ab7b --- /dev/null +++ b/examples/msg/kademlia/common.h @@ -0,0 +1,37 @@ +/* Copyright (c) 2012. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ +#ifndef _KADEMLIA_EXAMPLES_COMMON +#define _KADEMLIA_EXAMPLES_COMMON +#define max_join_trials 4 + +#define RECEIVE_TIMEOUT 1 + +#define ping_timeout 55 +#define find_node_timeout 10; +#define find_node_global_timeout 50 + +#define kademlia_alpha 3 +#define bucket_size 20 + +#define identifier_size 32 +#define max_answers_to_ask 20 + +#define random_lookup_interval 100 + +#define MAILBOX_NAME_SIZE identifier_size / 4 + 1 +#define IDENTIFIER_HEX_SIZE identifier_size / 4 + 1 + +#define COMM_SIZE 1 +#define COMP_SIZE 0 + +#define MAX_STEPS 10 + +#define JOIN_BUCKETS_QUERIES 5 + +#define RANDOM_LOOKUP_NODE 0 + + +#endif /* _KADEMLIA_EXAMPLES_COMMON */ diff --git a/examples/msg/kademlia/generate.py b/examples/msg/kademlia/generate.py new file mode 100755 index 0000000000..19b483e7b6 --- /dev/null +++ b/examples/msg/kademlia/generate.py @@ -0,0 +1,33 @@ +#!/usr/bin/python + +import sys, random + +if len(sys.argv) != 4: + print("Usage: python generate.py nb_nodes nb_bits end_date > deployment_file.xml") + sys.exit(1) + +nb_nodes = int(sys.argv[1]) +nb_bits = int(sys.argv[2]) +end_date = int(sys.argv[3]) + +max_id = 2 ** nb_bits - 1 +all_ids = [0] + +sys.stdout.write("\n" +"\n" +"\n" +" \n" % end_date) + +for i in range(1, nb_nodes): + ok = False + while not ok: + my_id = random.randint(0, max_id) + ok = not my_id in all_ids + known_id = all_ids[random.randint(0, len(all_ids) - 1)] + start_date = i * 10 + line = " \n" % (i, my_id, known_id,end_date) + sys.stdout.write(line) + all_ids.append(my_id) + +sys.stdout.write("") + diff --git a/examples/msg/kademlia/kademlia.c b/examples/msg/kademlia/kademlia.c new file mode 100644 index 0000000000..9b10609bd2 --- /dev/null +++ b/examples/msg/kademlia/kademlia.c @@ -0,0 +1,465 @@ +/* Copyright (c) 2012. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ +#include "kademlia.h" +#include "node.h" +#include "task.h" + +#include "msg/msg.h" +#include "xbt/log.h" +#include "xbt/asserts.h" +/** @addtogroup MSG_examples + * kademlia/kademlia.c: Kademlia protocol + * Implements the Kademlia protocol, using 32 bits identifiers. + */ + XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia, + "Messages specific for this msg example"); + +extern long unsigned int smx_total_comms; + +/** + * \brief Node function + * Arguments : + * - my node ID + * - the ID of the person I know in the system (or not) + * - Time before I leave the system because I'm bored + */ +int node(int argc, char *argv[]) { + unsigned int join_sucess = 1, deadline; + xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments"); + /* Node initialization */ + unsigned int id = atoi(argv[1]);; + node_t node = node_init(id); + + if (argc == 4) { + XBT_INFO("Hi, I'm going to join the network !"); + unsigned int id_known = (unsigned int)atoi(argv[2]); + join_sucess = join(node,id_known); + deadline = atoi(argv[3]); + } + else { + deadline = atoi(argv[2]); + XBT_INFO("Hi, I'm going to create the network with id %s !",node->mailbox); + node_routing_table_update(node,node->id); + } + if (join_sucess) { + XBT_VERB("Ok, I'm joining the network with id %s",node->mailbox); + //We start the main loop + main_loop(node,deadline); + } + else { + XBT_INFO("I couldn't join the network :("); + } + XBT_DEBUG("I'm leaving the network"); + XBT_INFO("%d/%d FIND_NODE have succeeded",node->find_node_success,node->find_node_success + node->find_node_failed); + node_free(node); + + return 0; +} +/** + * Main loop for the process + */ +void main_loop(node_t node, unsigned int deadline) { + double next_lookup_time = MSG_get_clock() + random_lookup_interval; + XBT_VERB("Main loop start"); + while (MSG_get_clock() < deadline) { + + if (node->receive_comm == NULL) { + node->task_received = NULL; + node->receive_comm = MSG_task_irecv(&node->task_received,node->mailbox); + } + if (node->receive_comm) { + if (!MSG_comm_test(node->receive_comm)) { + /* We search for a pseudo random node */ + if (MSG_get_clock() >= next_lookup_time) { + random_lookup(node); + next_lookup_time += random_lookup_interval; + } + else { + //Didn't get a task: sleep for a while... + MSG_process_sleep(1); + } + } + else { + //There has been a transfer, we need to handle it ! + msg_error_t status = MSG_comm_get_status(node->receive_comm); + MSG_comm_destroy(node->receive_comm); + node->receive_comm = NULL; + + if (status == MSG_OK) { + xbt_assert( (node->task_received != NULL), "We received an incorrect task"); + handle_task(node,node->task_received); + } + else { + xbt_assert((MSG_comm_get_task(node->receive_comm) == NULL),"Comm failed but received a task."); + XBT_DEBUG("Nevermind, the communication has failed."); + } + } + } + else { + //Didn't get a comm: sleep. + MSG_process_sleep(1); + } + } + //Cleanup the receiving communication. + if (node->receive_comm != NULL ) { + if (MSG_comm_test(node->receive_comm) && MSG_comm_get_status(node->receive_comm) == MSG_OK) { + task_free(MSG_comm_get_task(node->receive_comm)); + } + MSG_comm_destroy(node->receive_comm); + } +} +/** + * Tries to join the network + * @param node node data + * @param id_known id of the node I know in the network. + */ +unsigned int join(node_t node, unsigned int id_known) { + answer_t node_list; + msg_error_t status; + unsigned int trial = 0; + unsigned int i, answer_got = 0; + + /* Add the guy we know to our routing table and ourselves. */ + node_routing_table_update(node,node->id); + node_routing_table_update(node,id_known); + + /* First step: Send a "FIND_NODE" request to the node we know */ + send_find_node(node,id_known,node->id); + do { + if (node->receive_comm == NULL) { + node->task_received = NULL; + node->receive_comm = MSG_task_irecv(&node->task_received,node->mailbox); + } + if (node->receive_comm) { + if (MSG_comm_test(node->receive_comm)) { + status = MSG_comm_get_status(node->receive_comm); + MSG_comm_destroy(node->receive_comm); + node->receive_comm = NULL; + if (status == MSG_OK) { + XBT_DEBUG("Received an answer from the node I know."); + answer_got = 1; + //retrieve the node list and ping them. + task_data_t data = MSG_task_get_data(node->task_received); + xbt_assert( (data != NULL), "Null data received"); + if (data->type == TASK_FIND_NODE_ANSWER) { + node_contact_t contact; + node_list = data->answer; + xbt_dynar_foreach(node_list->nodes,i, contact) { + node_routing_table_update(node,contact->id); + //ping(node,contact->id); + } + task_free(node->task_received); + } + else { + handle_task(node,node->task_received); + } + } + else { + trial++; + } + } + else { + MSG_process_sleep(1); + } + } + else { + MSG_process_sleep(1); + } + } while (answer_got == 0 && trial < max_join_trials); + /* Second step: Send a FIND_NODE to a a random node in buckets */ + unsigned int bucket_id = routing_table_find_bucket(node->table,id_known)->id; + for (i = 0; ((bucket_id -i) > 0 || (bucket_id + i) <= identifier_size) && i < JOIN_BUCKETS_QUERIES; i++) { + if (bucket_id - i > 0) { + unsigned int id_in_bucket = get_id_in_prefix(node->id,bucket_id - i); + find_node(node,id_in_bucket,0); + } + if (bucket_id + i <= identifier_size) { + unsigned int id_in_bucket = get_id_in_prefix(node->id,bucket_id + i); + find_node(node,id_in_bucket,0); + } + } + return answer_got; +} +/** + * Send a request to find a node in the node routing table. + * @brief node our node data + * @brief id_to_find the id of the node we are trying to find + */ +unsigned int find_node(node_t node, unsigned int id_to_find, unsigned int count_in_stats) { + + unsigned int i = 0; + unsigned int queries, answers; + unsigned int destination_found = 0; + unsigned int nodes_added = 0; + double time_beginreceive; + double timeout, global_timeout = MSG_get_clock() + find_node_global_timeout; + unsigned int steps = 0; + + xbt_assert( (id_to_find >= 0), "Id supplied incorrect"); + + /* First we build a list of who we already know */ + answer_t node_list = node_find_closest(node,id_to_find); + xbt_assert((node_list != NULL),"node_list incorrect"); + + XBT_DEBUG("Doing a FIND_NODE on %d", id_to_find); + + msg_error_t status; + + /* Ask the nodes on our list if they have information about + * the node we are trying to find */ + + do { + answers = 0; + queries = send_find_node_to_best(node,node_list); + nodes_added = 0; + timeout = MSG_get_clock() + find_node_timeout; + steps++; + time_beginreceive = MSG_get_clock(); + do { + if (node->receive_comm == NULL) { + node->task_received = NULL; + node->receive_comm = MSG_task_irecv(&node->task_received,node->mailbox); + } + if (node->receive_comm) { + if (MSG_comm_test(node->receive_comm)) { + status = MSG_comm_get_status(node->receive_comm); + MSG_comm_destroy(node->receive_comm); + node->receive_comm = NULL; + if (status == MSG_OK) { + xbt_assert( (node->task_received != NULL), "Invalid task received"); + //Figure out if we received an answer or something else + task_data_t data = MSG_task_get_data(node->task_received); + xbt_assert( (data != NULL), "No data in the task"); + + //Check if what we have received is what we are looking for. + if (data->type == TASK_FIND_NODE_ANSWER && data->answer->destination_id == id_to_find) { + //Handle the answer + node_routing_table_update(node,data->sender_id); + node_contact_t contact; + xbt_dynar_foreach(node_list->nodes,i, contact) { + node_routing_table_update(node,contact->id); + } + answers++; + + nodes_added = answer_merge(node_list,data->answer); + XBT_DEBUG("Received an answer from %s (%s) with %ld nodes on it",data->answer_to,data->issuer_host_name,xbt_dynar_length(data->answer->nodes)); + + task_free(node->task_received); + } + else { + handle_task(node,node->task_received); + //Update the timeout if we didn't have our answer + timeout += MSG_get_clock() - time_beginreceive; + time_beginreceive = MSG_get_clock(); + } + } + } + else { + MSG_process_sleep(1); + } + } + else { + MSG_process_sleep(1); + } + } while (MSG_get_clock() < timeout && answers < queries) ; + destination_found = answer_destination_found(node_list); + } while (!destination_found && (nodes_added > 0 || answers == 0) && MSG_get_clock() < global_timeout && steps < MAX_STEPS); + if (destination_found) { + if (count_in_stats) + node->find_node_success++; + if (queries > 4) + XBT_VERB("FIND_NODE on %d success in %d steps",id_to_find,steps); + node_routing_table_update(node,id_to_find); + } + else { + if (count_in_stats) { + node->find_node_failed++; + XBT_VERB("%d not found in %d steps",id_to_find,steps); + } + } + answer_free(node_list); + return destination_found; +} +/** + * Pings a node in the system to see if it is online. + * @param node Our node data + * @param id_to_ping the id of a node we want to see if it is online. + * @return if the ping succeded or not. + */ +unsigned int ping(node_t node, unsigned int id_to_ping) { + char mailbox[MAILBOX_NAME_SIZE]; + sprintf(mailbox,"%d",id_to_ping); + + unsigned int destination_found = 0; + unsigned int timeout = MSG_get_clock() + ping_timeout; + + msg_task_t ping_task = task_new_ping(node->id,node->mailbox,MSG_host_get_name(MSG_host_self())); + msg_task_t task_received = NULL; + + XBT_VERB("PING %d",id_to_ping); + + //Check that we aren't trying to ping ourselves + if (id_to_ping == node->id) { + return 1; + } + + /* Sending the ping task */ + MSG_task_dsend(ping_task,mailbox,task_free_v); + do + { + task_received = NULL; + msg_error_t status = MSG_task_receive_with_timeout(&task_received,node->mailbox,ping_timeout); + if (status == MSG_OK) { + xbt_assert( (task_received != NULL), "Invalid task received"); + //Checking if it's what we are waiting for or not. + task_data_t data = MSG_task_get_data(task_received); + xbt_assert( (data != NULL) ,"didn't receive any data..."); + if (data->type == TASK_PING_ANSWER && id_to_ping == data->sender_id) { + XBT_VERB("Ping to %s succeeded",mailbox); + node_routing_table_update(node,data->sender_id); + destination_found = 1; + task_free(task_received); + } + else { + //If it's not our answer, we answer the query anyway. + handle_task(node,task_received); + } + } + } while (destination_found == 0 && MSG_get_clock() < timeout); + + if (MSG_get_clock() >= timeout) { + XBT_DEBUG("Ping to %s has timeout.",mailbox); + return 0; + } + if (destination_found == -1) { + XBT_DEBUG("It seems that %s is offline...",mailbox); + return 0; + } + return 1; +} +/** + * Does a pseudo-random lookup for someone in the system + * @param node caller node data + */ +void random_lookup(node_t node) { + unsigned int id_to_look = RANDOM_LOOKUP_NODE; //Totally random. + /* TODO: Use some pseudorandom generator like RngStream. */ + XBT_DEBUG("I'm doing a random lookup"); + find_node(node,id_to_look,1); +} +/** + * @brief Send a "FIND_NODE" to a node + * @param node sender node data + * @param id node we are querying + * @param destination node we are trying to find. + */ +void send_find_node(node_t node, unsigned int id, unsigned int destination) { + char mailbox[MAILBOX_NAME_SIZE]; + /* Gets the mailbox to send to */ + get_node_mailbox(id,mailbox); + /* Build the task */ + msg_task_t task = task_new_find_node(node->id,destination,node->mailbox,MSG_host_get_name(MSG_host_self())); + /* Send the task */ + xbt_assert( (task != NULL), "Trying to send a NULL task."); + MSG_task_dsend(task,mailbox,task_free_v); + XBT_VERB("Asking %s for its closest nodes",mailbox); +} +/** + * Sends to the best "kademlia_alpha" nodes in the "node_list" array a "FIND_NODE" request, to ask them for their best nodes + */ +unsigned int send_find_node_to_best(node_t node, answer_t node_list) { + unsigned int i = 0, j = 0; + unsigned int destination = node_list->destination_id; + node_contact_t node_to_query; + while (j < kademlia_alpha && i < node_list->size) { /* We need to have at most "kademlia_alpha" requets each time, according to the protocol */ + /* Gets the node we want to send the query to */ + node_to_query = xbt_dynar_get_as(node_list->nodes,i,node_contact_t); + if (node_to_query->id != node->id) { /* No need to query ourselves */ + send_find_node(node,node_to_query->id,destination); + j++; + } + i++; + } + return i; +} +/** + * \brief Handles an incomming received task + */ +void handle_task(node_t node, msg_task_t task) { + task_data_t data = MSG_task_get_data(task); + xbt_assert( (data != NULL), "Received NULL data"); + //Adding/updating the guy to our routing table + node_routing_table_update(node,data->sender_id); + switch (data->type) { + case TASK_FIND_NODE: + handle_find_node(node,data); + break; + case TASK_FIND_NODE_ANSWER: + XBT_DEBUG("Received a wrong answer for a FIND_NODE"); + break; + case TASK_PING: + handle_ping(node,data); + break; + default: + + break; + } + task_free(task); +} +/** + * \brief Handles the answer to an incomming "find_node" task + */ +void handle_find_node(node_t node, task_data_t data) { + XBT_VERB("Received a FIND_NODE from %s (%s), he's trying to find %d",data->answer_to,data->issuer_host_name,data->destination_id); + //Building the answer to the request + answer_t answer = node_find_closest(node,data->destination_id); + //Building the task to send + msg_task_t task = task_new_find_node_answer(node->id,data->destination_id,answer,node->mailbox,MSG_host_get_name(MSG_host_self())); + //Sending the task + MSG_task_dsend(task,data->answer_to,task_free_v); +} +/** + * \brief handles the answer to a ping + */ +void handle_ping(node_t node, task_data_t data) { + XBT_VERB("Received a PING request from %s (%s)",data->answer_to,data->issuer_host_name); + //Building the answer to the request + msg_task_t task = task_new_ping_answer(node->id,data->answer_to,MSG_host_get_name(MSG_host_self())); + + MSG_task_dsend(task,data->answer_to,task_free_v); +} +/** + * \brief Main function + */ +int main(int argc, char *argv[]) { + + MSG_init(&argc, argv); + + /* Check the arguments */ + if (argc < 3) { + printf("Usage: %s platform_file deployment_file \n",argv[0]); + return -1; + } + + const char *platform_file = argv[1]; + const char *deployment_file = argv[2]; + + MSG_create_environment(platform_file); + + MSG_function_register("node",node); + MSG_launch_application(deployment_file); + + msg_error_t res = MSG_main(); + + XBT_CRITICAL("Messages created: %ld", smx_total_comms); + XBT_INFO("Simulated time: %g", MSG_get_clock()); + MSG_clean(); + + if ( res == MSG_OK) + return 0; + else + return 1; +} diff --git a/examples/msg/kademlia/kademlia.h b/examples/msg/kademlia/kademlia.h new file mode 100644 index 0000000000..d9f193d9e7 --- /dev/null +++ b/examples/msg/kademlia/kademlia.h @@ -0,0 +1,28 @@ + +/* Copyright (c) 2012. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ +#ifndef _MSG_EXAMPLES_KADEMLIA_H +#define _MSG_EXAMPLES_KADEMLIA_H +#include "node.h" +#include "task.h" +//process functions +static int node(int argc, char *argv[]); +static void main_loop(node_t node, unsigned int deadline); +//core kademlia functions +unsigned int join(node_t node, unsigned int id_known); +unsigned int find_node(node_t node, unsigned int id_to_find, unsigned int count_in_stats); +unsigned int ping(node_t node, unsigned int id_to_ping); +void random_lookup(node_t node); + +void send_find_node(node_t node, unsigned int id, unsigned int destination); +unsigned int send_find_node_to_best(node_t node, answer_t node_list); + +void handle_task(node_t node, msg_task_t task); +void handle_find_node(node_t node, task_data_t data); +void handle_ping(node_t node, task_data_t data); + + +#endif /* _MSG_EXAMPLES_KADEMLIA_H */ diff --git a/examples/msg/kademlia/kademlia.tesh b/examples/msg/kademlia/kademlia.tesh new file mode 100644 index 0000000000..8d1ed7be6d --- /dev/null +++ b/examples/msg/kademlia/kademlia.tesh @@ -0,0 +1,35 @@ +#! ./tesh + +p Testing the Kademlia implementation with MSG + +! output sort +$ $SG_TEST_EXENV ${bindir:=.}/kademlia ${srcdir:=.}/../msg_platform.xml ${srcdir:=.}/kademlia.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n" +> [ 0.000000] (10:node@Laroche) Hi, I'm going to join the network ! +> [ 0.000000] (11:node@Tanguay) Hi, I'm going to join the network ! +> [ 0.000000] (12:node@Morin) Hi, I'm going to join the network ! +> [ 0.000000] (13:node@Ethernet) Hi, I'm going to join the network ! +> [ 0.000000] (1:node@Jacquelin) Hi, I'm going to create the network with id 0 ! +> [ 0.000000] (2:node@Boivin) Hi, I'm going to join the network ! +> [ 0.000000] (3:node@Jean_Yves) Hi, I'm going to join the network ! +> [ 0.000000] (4:node@TeX) Hi, I'm going to join the network ! +> [ 0.000000] (5:node@Geoff) Hi, I'm going to join the network ! +> [ 0.000000] (6:node@Disney) Hi, I'm going to join the network ! +> [ 0.000000] (7:node@iRMX) Hi, I'm going to join the network ! +> [ 0.000000] (8:node@McGee) Hi, I'm going to join the network ! +> [ 0.000000] (9:node@Gatien) Hi, I'm going to join the network ! +> [900.000000] (0:@) Messages created: 1319 +> [900.000000] (0:@) Simulated time: 900 +> [900.000000] (10:node@Laroche) 7/7 FIND_NODE have succeeded +> [900.000000] (11:node@Tanguay) 8/8 FIND_NODE have succeeded +> [900.000000] (12:node@Morin) 7/7 FIND_NODE have succeeded +> [900.000000] (13:node@Ethernet) 7/7 FIND_NODE have succeeded +> [900.000000] (1:node@Jacquelin) 8/8 FIND_NODE have succeeded +> [900.000000] (2:node@Boivin) 4/4 FIND_NODE have succeeded +> [900.000000] (3:node@Jean_Yves) 7/7 FIND_NODE have succeeded +> [900.000000] (4:node@TeX) 6/6 FIND_NODE have succeeded +> [900.000000] (5:node@Geoff) 6/6 FIND_NODE have succeeded +> [900.000000] (6:node@Disney) 6/6 FIND_NODE have succeeded +> [900.000000] (7:node@iRMX) 6/6 FIND_NODE have succeeded +> [900.000000] (8:node@McGee) 7/7 FIND_NODE have succeeded +> [900.000000] (9:node@Gatien) 7/7 FIND_NODE have succeeded + diff --git a/examples/msg/kademlia/kademlia.xml b/examples/msg/kademlia/kademlia.xml new file mode 100644 index 0000000000..bc7c27d352 --- /dev/null +++ b/examples/msg/kademlia/kademlia.xml @@ -0,0 +1,72 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/msg/kademlia/node.c b/examples/msg/kademlia/node.c new file mode 100644 index 0000000000..ec36ade47f --- /dev/null +++ b/examples/msg/kademlia/node.c @@ -0,0 +1,177 @@ + +/* Copyright (c) 2010. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "node.h" +#include "routing_table.h" + +#include "msg/msg.h" +#include "xbt/log.h" +#include "xbt/asserts.h" + XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia_node, + "Messages specific for this msg example"); +/** + * \brief Initialization of a node + * \param node_id the id of the node + * \return the node created + */ +node_t node_init(unsigned int node_id) { + node_t node = xbt_new(s_node_t,1); + + node->id = node_id; + node->table = routing_table_init(node_id); + sprintf(node->mailbox,"%d",node_id); + node->find_node_failed = 0; + node->find_node_success = 0; + + node->task_received = NULL; + node->receive_comm = NULL; + + return node; +} +/* + * \brief Node destructor + */ +void node_free(node_t node) +{ + routing_table_free(node->table); + xbt_free(node); +} + +/** + * @brief Updates/Puts the node id unsigned into our routing table + * @param node Our node data + * @param id The id of the node we need to add unsigned into our routing table + */ +void node_routing_table_update(node_t node, unsigned int id) { + routing_table_t table = node->table; + //retrieval of the bucket in which the should be + bucket_t bucket = routing_table_find_bucket(table,id); + + //check if the id is already in the bucket. + unsigned int id_pos = bucket_find_id(bucket,id); + + if (id_pos == -1) { + /* We check if the bucket is full or not. If it is, we evict + * old offline elements */ + if (xbt_dynar_length(bucket->nodes) < bucket_size) { + //TODO: this is not really very efficient. Maybe we should use something else than dynars ? + xbt_dynar_unshift(bucket->nodes,&id); + XBT_VERB("I'm adding to my routing table %d",id); + } + else { + /* TODO: we need to evict the old elements: that's why this function is in "node" instead of "routing table". This is not implemented yet. */ + } + } + else { + //We push to the front of the dynar the element. + unsigned int element = xbt_dynar_get_as(bucket->nodes,id_pos,unsigned int); + xbt_dynar_remove_at(bucket->nodes,id_pos,NULL); + xbt_dynar_unshift(bucket->nodes,&element); + XBT_VERB("I'm updating %d",element); + } +} +/** + * Finds the closest nodes to the node given. + * @param node : our node + * @param destination_id : the id of the guy we are trying to find + */ +answer_t node_find_closest(node_t node, unsigned int destination_id) { + int i; + answer_t answer = answer_init(destination_id); + /* We find the corresponding bucket for the id */ + bucket_t bucket = routing_table_find_bucket(node->table,destination_id); + int bucket_id = bucket->id; + xbt_assert((bucket_id <= identifier_size), "Bucket found has a wrong identifier"); + /* So, we copy the contents of the bucket unsigned into our result dynar */ + answer_add_bucket(bucket,answer); + + /* However, if we don't have enough elements in our bucket, we NEED to + include at least + * "bucket_size" elements (if, of course, we know at least "bucket_size" elements. So we're going to look unsigned into the other buckets. + */ + for (i = 1; answer->size < bucket_size && ( (bucket_id - i > 0) || (bucket_id + i < identifier_size) ); i++) { + /* We check the previous buckets */ + if (bucket_id - i >= 0) { + bucket_t bucket_p = &node->table->buckets[bucket_id - i]; + answer_add_bucket(bucket_p,answer); + } + /* We check the next buckets */ + if (bucket_id + i <= identifier_size) { + bucket_t bucket_n = &node->table->buckets[bucket_id + i]; + answer_add_bucket(bucket_n,answer); + } + } + /* We sort the array by XOR distance */ + answer_sort(answer); + /* We trim the array to have only bucket_size or less elements */ + answer_trim(answer); + + return answer; +} +/** + * Returns an identifier which is in a specific bucket of a routing table + * @brief id id of the routing table owner + * @brief prefix id of the bucket where we want that identifier to be + */ +unsigned int get_id_in_prefix(unsigned int id, unsigned int prefix) { + if (prefix == 0) { + return 0; + } + unsigned int n = 1 << (prefix - 1); + return n ^ id; +} +/** + * \brief Returns the prefix of an identifier. + * The prefix is the id of the bucket in which the remote identifier xor our identifier should be stored. + * @param id : bigunsigned int id to test + * @param nb_bits : key size + */ +unsigned int get_node_prefix(unsigned int id, unsigned int nb_bits) { + unsigned int j, size = sizeof(unsigned int) * 8; + for (j = 0; j < size; j++) { + if ( ( (id >> (size - 1 - j)) & 0x1) != 0) { + return nb_bits - (j); + } + } + return 0; +} +/** + * \brief Gets the mailbox name of a host given its identifier + */ +void get_node_mailbox(unsigned int id, char *mailbox) { + sprintf(mailbox,"%d",id); +} + +/** + * Constructor, build a new contact information. + */ +node_contact_t node_contact_new(unsigned int id, unsigned int distance) { + node_contact_t contact = xbt_new(s_node_contact_t,1); + + contact->id = id; + contact->distance = distance; + + return contact; +} +/** + * Builds a contact information from a contact information + */ +node_contact_t node_contact_copy(node_contact_t node_contact) { + node_contact_t contact = xbt_new(s_node_contact_t,1); + + contact->id = node_contact->id; + contact->distance = node_contact->distance; + + return contact; +} +/** + * Destructor + * @param contact the node_contact to kill. + */ +void node_contact_free(node_contact_t contact) { + xbt_free(contact); +} diff --git a/examples/msg/kademlia/node.h b/examples/msg/kademlia/node.h new file mode 100644 index 0000000000..f2a3794991 --- /dev/null +++ b/examples/msg/kademlia/node.h @@ -0,0 +1,57 @@ + +/* Copyright (c) 2012. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef _MSG_EXAMPLES_ROUTING_H +#define _MSG_EXAMPLES_ROUTING_H +#include "xbt/dynar.h" +#include "msg/msg.h" + +#include "common.h" + +#include "answer.h" +#include "routing_table.h" +/** + * Information about a foreign node + */ +typedef struct s_node_contact { + unsigned int id; //The node identifier + unsigned int distance; //The distance from the node +} s_node_contact_t, *node_contact_t; + +/* + * Node data + */ +typedef struct s_node { + unsigned int id; //node id - 160 bits + routing_table_t table; //node routing table + msg_comm_t receive_comm; //current receiving communication. + msg_task_t task_received; //current task being received + + char mailbox[MAILBOX_NAME_SIZE]; //node mailbox + unsigned int find_node_success; //Number of find_node which have succeeded. + unsigned int find_node_failed; //Number of find_node which have failed. + +} s_node_t, *node_t; + +// node functions +node_t node_init(unsigned int id); +void node_free(node_t node); + +void node_routing_table_update(node_t node, unsigned int id); +answer_t node_find_closest(node_t node, unsigned int destination_id); + + +// identifier functions +unsigned int get_id_in_prefix(unsigned int id, unsigned int prefix); +unsigned int get_node_prefix(unsigned int id, unsigned int nb_bits); +void get_node_mailbox(unsigned int id, char *mailbox); + +// node contact functions +node_contact_t node_contact_new(unsigned int id, unsigned int distance); +node_contact_t node_contact_copy(node_contact_t node_contact); +void node_contact_free(node_contact_t contact); +#endif /* _MSG_EXAMPLES_ROUTING_H */ diff --git a/examples/msg/kademlia/routing_table.c b/examples/msg/kademlia/routing_table.c new file mode 100644 index 0000000000..6e2646176b --- /dev/null +++ b/examples/msg/kademlia/routing_table.c @@ -0,0 +1,102 @@ +/* Copyright (c) 2012. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ +#include "routing_table.h" +#include "node.h" +#include "msg/msg.h" +#include "xbt/log.h" +#include "xbt/asserts.h" + XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia_routing_table, + "Messages specific for this msg example"); +/** + * \brief Initialization of a node routing table. + */ +routing_table_t routing_table_init(unsigned int node_id) { + unsigned int i; + routing_table_t table = xbt_new(s_routing_table_t,1); + table->buckets = xbt_new(s_bucket_t,identifier_size + 1); + for (i = 0; i < identifier_size + 1; i++) { + table->buckets[i].nodes = xbt_dynar_new(sizeof(unsigned int),NULL); + table->buckets[i].id = i; + } + table->id = node_id; + return table; +} +/** + * \brief Frees the routing table + */ +void routing_table_free(routing_table_t table) { + unsigned int i; + //Free the buckets. + for (i = 0; i <= identifier_size; i++) { + xbt_dynar_free(&table->buckets[i].nodes); + } + xbt_free(table->buckets); + xbt_free(table); +} +/** + * Returns if the routing table contains the id. + */ +unsigned int routing_table_contains(routing_table_t table, unsigned int node_id) { + bucket_t bucket = routing_table_find_bucket(table,node_id); + return bucket_contains(bucket,node_id); +} +/** + * @brief prints the routing table, to debug stuff. + */ +void routing_table_print(routing_table_t table) { + unsigned int i, j, value; + XBT_INFO("Routing table of %d:",table->id); + + for (i = 0; i <= identifier_size; i++) { + if (xbt_dynar_length(table->buckets[i].nodes) > 0) { + XBT_INFO("Bucket number %d: ",i); + xbt_dynar_foreach (table->buckets[i].nodes,j,value) { + XBT_INFO("Element %d: %d",j,value); + } + } + } +} +/** + * Finds an identifier in a bucket and returns its position + * or returns -1 if it doesn't exists + * @param bucket the bucket in which we try to find our identifier + * @param id the id + */ +unsigned int bucket_find_id(bucket_t bucket, unsigned int id) { + unsigned int i, length = xbt_dynar_length(bucket->nodes); + for (i = 0; i < length; i++) { //TODO: Use foreach maybe ? + if (id == xbt_dynar_get_as(bucket->nodes,i,unsigned int)) { + return i; + } + } + return -1; +} +/** + * Returns if the bucket contains an identifier. + */ +unsigned int bucket_contains(bucket_t bucket, unsigned int id) { + unsigned int length = xbt_dynar_length(bucket->nodes), i = 0; + for (i = 0; i < length; i++) { + if (id == xbt_dynar_get_as(bucket->nodes,i,unsigned int)) { + return 1; + } + } + return 0; +} +/** + * Fins the corresponding bucket in a routing table for a given identifier + * @param table the routing table + * @param id the identifier + * @return the bucket in which the the identifier would be. + */ +bucket_t routing_table_find_bucket(routing_table_t table, unsigned int id) { + unsigned int xor_number = table->id ^ id; + unsigned int prefix = get_node_prefix(xor_number,identifier_size); + xbt_assert(prefix >= 0 && prefix <= identifier_size, "Tried to return a bucket that doesn't exist."); + bucket_t bucket = &table->buckets[prefix]; + return bucket; +} + diff --git a/examples/msg/kademlia/routing_table.h b/examples/msg/kademlia/routing_table.h new file mode 100644 index 0000000000..d13b40e6c0 --- /dev/null +++ b/examples/msg/kademlia/routing_table.h @@ -0,0 +1,38 @@ + +/* Copyright (c) 2012. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef _MSG_KADEMLIA_EXAMPLES_ROUTING_TABLE +#define _MSG_KADEMLIA_EXAMPLES_ROUTING_TABLE +#include "common.h" +#include +/* + * Routing table bucket + */ +typedef struct s_bucket { + xbt_dynar_t nodes; //Nodes in the bucket. + unsigned int id; //bucket id +} s_bucket_t, *bucket_t; + +/* + * Node routing table + */ +typedef struct s_routing_table { + unsigned int id; //node id of the client's routing table + s_bucket_t *buckets; //Node bucket list - 160 sized. +} s_routing_table_t, *routing_table_t; +// bucket functions +unsigned int bucket_find_id(bucket_t bucket, unsigned int id); +unsigned int bucket_contains(bucket_t bucket, unsigned int id); +// routing table functions +routing_table_t routing_table_init(unsigned int node_id); +void routing_table_free(routing_table_t table); +unsigned int routing_table_contains(routing_table_t table, unsigned int node_id); +void routing_table_print(routing_table_t table); +bucket_t routing_table_find_bucket(routing_table_t table, unsigned int id); + + +#endif /* _MSG_KADEMLIA_EXAMPLES_ROUTING_TABLE */ diff --git a/examples/msg/kademlia/task.c b/examples/msg/kademlia/task.c new file mode 100644 index 0000000000..1230818d05 --- /dev/null +++ b/examples/msg/kademlia/task.c @@ -0,0 +1,120 @@ +/* Copyright (c) 2012. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ +#include "task.h" +#include "answer.h" + XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia_task, + "Messages specific for this msg example"); +/** + * Creates a new "find node" task + * @param sender_id the id of the node who sends the task + * @param destination_id the id the sender is trying to find + * @param hostname the hostname of the node, for logging purposes + */ +msg_task_t task_new_find_node(unsigned int sender_id, unsigned int destination_id, char *mailbox, const char *hostname) { + + task_data_t data = xbt_new(s_task_data_t,1); + + data->type = TASK_FIND_NODE; + data->sender_id = sender_id; + data->destination_id = destination_id; + data->answer = NULL; + data->answer_to = mailbox; + data->issuer_host_name = hostname; + + + msg_task_t task = MSG_task_create(NULL,COMP_SIZE,COMM_SIZE,data); + + return task; +} +/** + * Creates a new "answer to find node" task + * @param sender_id the node who sent the task + * @param destination_id the node that should be found + * @param answer the answer to send + * @param mailbox The mailbox of the sender + * @param hostname sender hostname + */ +msg_task_t task_new_find_node_answer(unsigned int sender_id, unsigned int destination_id, answer_t answer, char *mailbox, const char *hostname) { + + task_data_t data = xbt_new(s_task_data_t,1); + + data->type = TASK_FIND_NODE_ANSWER; + data->sender_id = sender_id; + data->destination_id = destination_id; + data->answer = answer; + data->answer_to = mailbox; + data->issuer_host_name = hostname; + + msg_task_t task = MSG_task_create(NULL,COMP_SIZE,COMM_SIZE,data); + + return task; +} +/** + * Creates a new "ping" task + * @param sender_id : sender node identifier + * @param mailbox : mailbox where we should respond + * @param hostname : hostname of the sender, for debugging purposes + */ +msg_task_t task_new_ping(unsigned int sender_id, char *mailbox, const char *hostname) { + + task_data_t data = xbt_new(s_task_data_t,1); + + data->type = TASK_PING; + data->sender_id = sender_id; + data->destination_id = 0; + data->answer = NULL; + data->answer_to = mailbox; + data->issuer_host_name = hostname; + + msg_task_t task = MSG_task_create(NULL,COMP_SIZE,COMM_SIZE,data); + + return task; +} +/** + * Creates a new "ping answer" task + * @param sender_id : sender node identifier + * @param mailbox : mailbox of the sender + * @param hostname : hostname of the sender, for debugging purposes + */ +msg_task_t task_new_ping_answer(unsigned int sender_id, char *mailbox, const char *hostname) { + + task_data_t data = xbt_new(s_task_data_t,1); + + data->type = TASK_PING_ANSWER; + data->sender_id = sender_id; + data->destination_id = 0; + data->answer = NULL; + data->answer_to = mailbox; + data->issuer_host_name = hostname; + + msg_task_t task = MSG_task_create(NULL,COMP_SIZE,COMM_SIZE,data); + + return task; +} +/** + * Destroys a task and its data + * @param task the task that'll be destroyed + */ +void task_free(msg_task_t task) { + xbt_assert( (task != NULL), "Tried to free a NULL task"); + + task_data_t data = MSG_task_get_data(task); + + if (data->answer) { + answer_free(data->answer); + } + xbt_free(data); + + MSG_task_destroy(task); + +} +/** + * Destroys a task and its data (taking a void* pointer + * @param task The task that'll be destroyed + */ +void task_free_v(void *task) { + task_free(task); +} diff --git a/examples/msg/kademlia/task.h b/examples/msg/kademlia/task.h new file mode 100644 index 0000000000..6b0a44eaec --- /dev/null +++ b/examples/msg/kademlia/task.h @@ -0,0 +1,44 @@ +/* Copyright (c) 2012. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ +#ifndef _MSG_KADEMLIA_EXAMPLES_TASK +#define _MSG_KADEMLIA_EXAMPLES_TASK +#include "common.h" +#include "node.h" +#include "msg/msg.h" +/** + * Types of tasks exchanged + */ +typedef enum { + TASK_FIND_NODE, + TASK_FIND_NODE_ANSWER, + TASK_FIND_VALUE, + TASK_FIND_VALUE_ANSWER, + TASK_PING, + TASK_PING_ANSWER, + TASK_LEAVING +} e_task_type_t; +/** + * Data attached with the tasks + */ +typedef struct s_task_data { + e_task_type_t type; + unsigned int sender_id; //Id of the guy who sent the task + unsigned int destination_id; //Id we are trying to find, if needed. + answer_t answer; //Answer to the request made, if needed. + char *answer_to; // mailbox to send the answer to (if not an answer). + const char *issuer_host_name; // used for logging +} s_task_data_t, *task_data_t; + +//Task handling functions +msg_task_t task_new_find_node(unsigned int sender_id, unsigned int destination_id, char *mailbox, const char *hostname); + +msg_task_t task_new_find_node_answer(unsigned int sender_id, unsigned int destination_id, answer_t answer, char *mailbox, const char *hostname); + +msg_task_t task_new_ping(unsigned int sender_id, char *mailbox, const char *hostname); +msg_task_t task_new_ping_answer(unsigned int sender_id, char *mailbox, const char *hostname); +void task_free(msg_task_t task); +void task_free_v(void *task); + #endif /* _MSG_KADEMLIA_EXAMPLES_TASK */