Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add kademlia C example
authorSamuel Lepetit <samuel.lepetit@inria.fr>
Wed, 4 Jul 2012 16:58:43 +0000 (18:58 +0200)
committerSamuel Lepetit <samuel.lepetit@inria.fr>
Wed, 4 Jul 2012 16:58:43 +0000 (18:58 +0200)
17 files changed:
buildtools/Cmake/AddTests.cmake
buildtools/Cmake/MakeExe.cmake
examples/msg/kademlia/CMakeLists.txt [new file with mode: 0644]
examples/msg/kademlia/answer.c [new file with mode: 0644]
examples/msg/kademlia/answer.h [new file with mode: 0644]
examples/msg/kademlia/common.h [new file with mode: 0644]
examples/msg/kademlia/generate.py [new file with mode: 0755]
examples/msg/kademlia/kademlia.c [new file with mode: 0644]
examples/msg/kademlia/kademlia.h [new file with mode: 0644]
examples/msg/kademlia/kademlia.tesh [new file with mode: 0644]
examples/msg/kademlia/kademlia.xml [new file with mode: 0644]
examples/msg/kademlia/node.c [new file with mode: 0644]
examples/msg/kademlia/node.h [new file with mode: 0644]
examples/msg/kademlia/routing_table.c [new file with mode: 0644]
examples/msg/kademlia/routing_table.h [new file with mode: 0644]
examples/msg/kademlia/task.c [new file with mode: 0644]
examples/msg/kademlia/task.h [new file with mode: 0644]

index c50ef8c..fd3a4a4 100644 (file)
@@ -214,6 +214,8 @@ if(NOT enable_memcheck)
   ADD_TEST(msg-chord-thread-parallel            ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:thread --cfg contexts/nthreads:4 --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/chord --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/chord chord_crosstraffic.tesh)
   ADD_TEST(msg-bittorrent-thread                ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:thread --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/bittorrent --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/bittorrent bittorrent.tesh)
   ADD_TEST(msg-bittorrent-thread-parallel       ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/nthreads:4 --cfg contexts/factory:thread --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/bittorrent --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/bittorrent bittorrent.tesh)
+  ADD_TEST(msg-kademlia-thread                  ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:thread --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/kademlia --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/kademlia kademlia.tesh)
+  ADD_TEST(msg-kademlia-thread-parallel         ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/nthreads:4 --cfg contexts/factory:thread --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/kademlia --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/kademlia kademlia.tesh)
 
   if(CONTEXT_UCONTEXT)
     ADD_TEST(msg-migration-ucontext             ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:ucontext --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/msg --cd ${CMAKE_BINARY_DIR}/examples/msg ${CMAKE_HOME_DIRECTORY}/examples/msg/migration/migration.tesh)
@@ -229,6 +231,8 @@ if(NOT enable_memcheck)
     ADD_TEST(msg-chord-ucontext-parallel        ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:ucontext --cfg contexts/nthreads:4 --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/chord --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/chord chord_crosstraffic.tesh)
     ADD_TEST(msg-bittorrent-ucontext            ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:ucontext --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/bittorrent --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/bittorrent bittorrent.tesh)
     ADD_TEST(msg-bittorrent-ucontext-parallel   ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/nthreads:4 --cfg contexts/factory:ucontext --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/bittorrent --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/bittorrent bittorrent.tesh)
+    ADD_TEST(msg-kademlia-ucontext              ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:ucontext --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/kademlia --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/kademlia kademlia.tesh)
+    ADD_TEST(msg-kademlia-ucontext-parallel     ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/nthreads:4 --cfg contexts/factory:ucontext --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/kademlia --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/kademlia kademlia.tesh)
 
   endif(CONTEXT_UCONTEXT)
   if(HAVE_RAWCTX)
@@ -245,7 +249,8 @@ if(NOT enable_memcheck)
     ADD_TEST(msg-chord-raw-parallel             ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:raw --cfg contexts/nthreads:4 --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/chord --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/chord chord_crosstraffic.tesh)
     ADD_TEST(msg-bittorrent-raw                 ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:raw --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/bittorrent --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/bittorrent bittorrent.tesh)
     ADD_TEST(msg-bittorrent-raw-parallel        ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/nthreads:4 --cfg contexts/factory:raw --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/bittorrent --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/bittorrent bittorrent.tesh)
-
+    ADD_TEST(msg-kademlia-raw                   ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/factory:raw --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/kademlia --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/kademlia kademlia.tesh)
+    ADD_TEST(msg-kademlia-raw-parallel          ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg contexts/nthreads:4 --cfg contexts/factory:raw --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/kademlia --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/kademlia kademlia.tesh)
   endif(HAVE_RAWCTX)
 
   IF(${ARCH_32_BITS})
index 246b93e..57d7da8 100644 (file)
@@ -53,8 +53,9 @@ add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/parallel_task)
 add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/priority)
 add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/masterslave)
 add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/icomms)
-add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/chord)
 add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/bittorrent)
+add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/chord)
+add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/kademlia)
 add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/token_ring)
 add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/pmm)
 add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/msg/start_kill_time)
diff --git a/examples/msg/kademlia/CMakeLists.txt b/examples/msg/kademlia/CMakeLists.txt
new file mode 100644 (file)
index 0000000..4a9f965
--- /dev/null
@@ -0,0 +1,8 @@
+cmake_minimum_required(VERSION 2.6)
+
+set(EXECUTABLE_OUTPUT_PATH "${CMAKE_CURRENT_BINARY_DIR}")
+
+add_executable(kademlia "kademlia.c" "node.c" "routing_table.c" 
+"task.c" "answer.c")
+### Add definitions for compile
+target_link_libraries(kademlia simgrid )
diff --git a/examples/msg/kademlia/answer.c b/examples/msg/kademlia/answer.c
new file mode 100644 (file)
index 0000000..ad0ef99
--- /dev/null
@@ -0,0 +1,140 @@
+/* Copyright (c) 2012. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+#include "answer.h"
+
+XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(msg_kademlia_node);
+
+/**
+  * Initialize a node answer object.
+  */
+answer_t answer_init(unsigned int destination_id) {
+  answer_t answer = xbt_new(s_answer_t,1);
+  answer->nodes = xbt_dynar_new(sizeof(node_contact_t),NULL);
+  answer->size = 0;
+  answer->destination_id = destination_id;
+
+  return answer;
+}
+/**
+  * Destroys a node answer object.
+  */
+void answer_free(answer_t answer) {
+  unsigned int i;
+  for (i = 0; i < answer->size; i++) {
+    node_contact_free(*(void**)xbt_dynar_get_ptr(answer->nodes,i));
+  }
+  xbt_dynar_free(&answer->nodes);
+  xbt_free(answer);
+}
+/**
+  * @brief Prints a answer_t, for debugging purposes
+  */
+void answer_print(answer_t answer) {
+  unsigned int cpt;
+  node_contact_t contact;
+  XBT_INFO("Searching %d, size %d",answer->destination_id,answer->size);
+  xbt_dynar_foreach(answer->nodes,cpt,contact) {
+    XBT_INFO("Node %d: %d is at distance %d",cpt,contact->id,contact->distance);
+  }
+}
+/**
+  * @brief Merge two answer_t together, only keeping the best nodes
+  * @param destination the destination in which the nodes will be put
+  * @param source the source of the nodes to add
+  */
+unsigned int answer_merge(answer_t destination, answer_t source) {
+  node_contact_t contact, contact_copy;
+  unsigned int cpt;
+  unsigned int nb_added = 0;
+  /* TODO: Check if same destination */
+  xbt_dynar_foreach(source->nodes,cpt,contact) {
+    if (!answer_contains(destination,contact->id)) {
+      contact_copy = node_contact_copy(contact);
+      xbt_dynar_push(destination->nodes,&contact_copy);
+      destination->size++;
+      nb_added++;
+    }
+  }
+  answer_sort(destination);
+  answer_trim(destination);
+  return nb_added;
+}
+/**
+  * Helper to sort answer_t objects
+  */
+static int _answer_sort_function(const void* e1,const void* e2) {
+  node_contact_t c1 = *(void**)e1;
+  node_contact_t c2 = *(void**)e2;
+  return c1->distance >= c2->distance;
+}
+/**
+  * Sorts a answer_t, by node distance.
+  * @param answer the answer to sort
+  * @param destination_id the id of the guy we are trying to find
+  */
+void answer_sort(answer_t answer) {
+  xbt_dynar_sort(answer->nodes,&_answer_sort_function);
+}
+/**
+  * Trims a answer_t, in order for it to have a size of less or equal
+  * to "bucket_size"
+  * @param answer the answer_t to trim
+  */
+void answer_trim(answer_t answer) {
+  node_contact_t value;
+  while (answer->size > bucket_size)  {
+    xbt_dynar_pop(answer->nodes,&value);
+    answer->size--;
+    node_contact_free(value);
+  }
+  xbt_assert(xbt_dynar_length(answer->nodes) == answer->size,"Wrong size for the answer");
+}
+/**
+  * Adds the content of a bucket unsigned into a answer object.
+  * @param bucket the bucket we have to had unsigned into
+  * @param answer the answer object we're going  to put the data in
+  * @param destination_id the id of the guy we are trying to find.
+  */
+void answer_add_bucket(bucket_t bucket, answer_t answer) {
+  unsigned int cpt;
+  unsigned int id, distance;
+  node_contact_t contact;
+  xbt_assert((bucket != NULL), "Provided a NULL bucket");
+  xbt_assert((bucket->nodes != NULL), "Provided a bucket which nodes are NULL");
+  xbt_dynar_foreach(bucket->nodes,cpt,id) {
+    distance = id ^ answer->destination_id;
+    contact = node_contact_new(id,distance);
+    xbt_dynar_push(answer->nodes,&contact);
+    answer->size++;
+  }
+}
+/**
+  * Returns if the id supplied is in the answer.
+  * @param id : id we're looking for
+  */
+unsigned int answer_contains(answer_t answer, unsigned int id) {
+  unsigned int i = 0, size = xbt_dynar_length(answer->nodes);
+  node_contact_t contact;
+  for (i = 0; i < size; i++) {
+    contact = xbt_dynar_get_as(answer->nodes,i,node_contact_t);
+    if (id == contact->id) {
+      return 1;
+    }
+  }
+  return 0;
+}
+/**
+  * Returns if the destination we are trying to find is found
+  * @param answer the answer
+  * @return if the destination is found.
+  */
+unsigned int answer_destination_found(answer_t answer) {
+  if (xbt_dynar_length(answer->nodes) < 1) {
+    return 0;
+  }
+  node_contact_t contact_tail = xbt_dynar_get_as(answer->nodes,0,node_contact_t);
+  return contact_tail->distance == 0;
+}
diff --git a/examples/msg/kademlia/answer.h b/examples/msg/kademlia/answer.h
new file mode 100644 (file)
index 0000000..2e9f585
--- /dev/null
@@ -0,0 +1,32 @@
+/* Copyright (c) 2012. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+#ifndef _KADEMLIA_EXAMPLES_ANSWER_H_
+#define _KADEMLIA_EXAMPLES_ANSWER_H_
+#include <xbt/dynar.h>
+#include "routing_table.h"
+/*
+ * Node query anwser. contains the elements closest
+ * to the id given.
+ */
+typedef struct s_node_answer {
+    unsigned int destination_id;
+  xbt_dynar_t nodes; //Dynar of node_contact_t
+  unsigned int size;
+} s_answer_t, *answer_t;
+
+#include "node.h"
+
+answer_t answer_init(unsigned int destination_id);
+void answer_free(answer_t answer);
+void answer_print(answer_t answer);
+unsigned int answer_merge(answer_t destination, answer_t source);
+void answer_sort(answer_t answer);
+void answer_trim(answer_t answer);
+void answer_add_bucket(bucket_t bucket, answer_t answer);
+unsigned int answer_contains(answer_t answer, unsigned int id);
+unsigned int answer_destination_found(answer_t answer);
+
+#endif /* _KADEMLIA_EXAMPLES_ANSWER_H_ */
diff --git a/examples/msg/kademlia/common.h b/examples/msg/kademlia/common.h
new file mode 100644 (file)
index 0000000..3245f2a
--- /dev/null
@@ -0,0 +1,37 @@
+/* Copyright (c) 2012. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+#ifndef _KADEMLIA_EXAMPLES_COMMON
+#define _KADEMLIA_EXAMPLES_COMMON
+#define max_join_trials 4
+
+#define RECEIVE_TIMEOUT 1
+
+#define ping_timeout 55
+#define find_node_timeout 10;
+#define find_node_global_timeout 50
+
+#define kademlia_alpha 3
+#define bucket_size 20
+
+#define identifier_size 32
+#define max_answers_to_ask 20
+
+#define random_lookup_interval 100
+
+#define MAILBOX_NAME_SIZE identifier_size / 4 + 1
+#define IDENTIFIER_HEX_SIZE identifier_size / 4 + 1
+
+#define COMM_SIZE 1
+#define COMP_SIZE 0
+
+#define MAX_STEPS 10
+
+#define JOIN_BUCKETS_QUERIES 5
+
+#define RANDOM_LOOKUP_NODE 0
+
+
+#endif /* _KADEMLIA_EXAMPLES_COMMON */
diff --git a/examples/msg/kademlia/generate.py b/examples/msg/kademlia/generate.py
new file mode 100755 (executable)
index 0000000..19b483e
--- /dev/null
@@ -0,0 +1,33 @@
+#!/usr/bin/python
+
+import sys, random
+
+if len(sys.argv) != 4:
+       print("Usage: python generate.py nb_nodes nb_bits end_date > deployment_file.xml")
+       sys.exit(1)
+
+nb_nodes = int(sys.argv[1])
+nb_bits = int(sys.argv[2])
+end_date = int(sys.argv[3])
+
+max_id = 2 ** nb_bits - 1
+all_ids = [0]
+
+sys.stdout.write("<?xml version='1.0'?>\n"
+"<!DOCTYPE platform SYSTEM \"http://simgrid.gforge.inria.fr/simgrid.dtd\">\n"
+"<platform version=\"3\">\n"
+"  <process host=\"c-0.me\" function=\"node\"><argument value=\"0000000000000000000000000000000000000000\"/><argument value=\"%d\"/></process>\n" % end_date)
+
+for i in range(1, nb_nodes):
+       ok = False
+       while not ok:
+               my_id = random.randint(0, max_id)
+               ok = not my_id in all_ids
+       known_id = all_ids[random.randint(0, len(all_ids) - 1)]
+       start_date = i * 10
+       line = "  <process host=\"c-%d.me\" function=\"node\"><argument value=\"%s\" /><argument value=\"%s\" /><argument value=\"%d\" /></process>\n" % (i, my_id, known_id,end_date)
+       sys.stdout.write(line)
+       all_ids.append(my_id)
+
+sys.stdout.write("</platform>")
+
diff --git a/examples/msg/kademlia/kademlia.c b/examples/msg/kademlia/kademlia.c
new file mode 100644 (file)
index 0000000..9b10609
--- /dev/null
@@ -0,0 +1,465 @@
+/* Copyright (c) 2012. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+#include "kademlia.h"
+#include "node.h"
+#include "task.h"
+
+#include "msg/msg.h"
+#include "xbt/log.h"
+#include "xbt/asserts.h"
+/** @addtogroup MSG_examples
+  * <b>kademlia/kademlia.c: Kademlia protocol</b>
+  * Implements the Kademlia protocol, using 32 bits identifiers.
+  */
+ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia,
+                             "Messages specific for this msg example");
+
+extern long unsigned int smx_total_comms;
+
+/** 
+  * \brief Node function
+  * Arguments :
+  * - my node ID
+  * - the ID of the person I know in the system (or not)
+  * - Time before I leave the system because I'm bored
+  */
+int node(int argc, char *argv[]) {
+  unsigned int join_sucess = 1, deadline;
+  xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
+  /* Node initialization */
+  unsigned int id = atoi(argv[1]);;
+  node_t node = node_init(id);
+  
+  if (argc == 4) {
+    XBT_INFO("Hi, I'm going to join the network !");        
+    unsigned int id_known = (unsigned int)atoi(argv[2]);
+    join_sucess = join(node,id_known);
+    deadline = atoi(argv[3]);
+  }
+  else {
+    deadline = atoi(argv[2]);
+    XBT_INFO("Hi, I'm going to create the network with id %s !",node->mailbox);
+    node_routing_table_update(node,node->id);
+  }
+  if (join_sucess) {
+    XBT_VERB("Ok, I'm joining the network with id %s",node->mailbox);
+    //We start the main loop
+    main_loop(node,deadline);
+  }
+  else {
+    XBT_INFO("I couldn't join the network :(");
+  } 
+  XBT_DEBUG("I'm leaving the network");
+  XBT_INFO("%d/%d FIND_NODE have succeeded",node->find_node_success,node->find_node_success + node->find_node_failed);
+  node_free(node);
+
+  return 0;
+}
+/**
+  * Main loop for the process
+  */
+void main_loop(node_t node, unsigned int deadline) {
+  double next_lookup_time = MSG_get_clock() + random_lookup_interval;
+  XBT_VERB("Main loop start");
+  while (MSG_get_clock() < deadline) {
+
+    if (node->receive_comm == NULL) {
+      node->task_received = NULL;
+      node->receive_comm = MSG_task_irecv(&node->task_received,node->mailbox);
+  }
+    if (node->receive_comm) {
+      if (!MSG_comm_test(node->receive_comm)) {
+        /* We search for a pseudo random node */
+        if (MSG_get_clock() >= next_lookup_time) {
+          random_lookup(node);
+          next_lookup_time += random_lookup_interval;
+        }
+        else {
+          //Didn't get a task: sleep for a while...
+          MSG_process_sleep(1);
+        }      
+      }
+      else {
+        //There has been a transfer, we need to handle it !
+        msg_error_t status = MSG_comm_get_status(node->receive_comm);
+        MSG_comm_destroy(node->receive_comm);
+        node->receive_comm = NULL;
+
+        if (status == MSG_OK) {
+          xbt_assert( (node->task_received != NULL), "We received an incorrect task");
+          handle_task(node,node->task_received);
+        }
+        else {
+          xbt_assert((MSG_comm_get_task(node->receive_comm) == NULL),"Comm failed but received a task.");
+          XBT_DEBUG("Nevermind, the communication has failed.");
+        }
+      }
+    }
+    else {
+      //Didn't get a comm: sleep.
+      MSG_process_sleep(1);
+    }
+  }
+  //Cleanup the receiving communication.
+  if (node->receive_comm != NULL ) {
+    if (MSG_comm_test(node->receive_comm) && MSG_comm_get_status(node->receive_comm) == MSG_OK) {
+      task_free(MSG_comm_get_task(node->receive_comm));
+    }
+    MSG_comm_destroy(node->receive_comm);
+  }
+}
+/**
+  * Tries to join the network
+  * @param node node data
+  * @param id_known id of the node I know in the network.
+  */
+unsigned int join(node_t node, unsigned int id_known) {
+  answer_t node_list;
+  msg_error_t status;
+  unsigned int trial = 0;
+  unsigned int i, answer_got = 0;
+
+  /* Add the guy we know to our routing table and ourselves. */
+  node_routing_table_update(node,node->id);
+  node_routing_table_update(node,id_known);
+
+  /* First step: Send a "FIND_NODE" request to the node we know */
+  send_find_node(node,id_known,node->id);
+  do {
+    if (node->receive_comm == NULL) {
+      node->task_received = NULL;
+      node->receive_comm = MSG_task_irecv(&node->task_received,node->mailbox);
+    }
+    if (node->receive_comm) {
+      if (MSG_comm_test(node->receive_comm)) {
+        status = MSG_comm_get_status(node->receive_comm);
+        MSG_comm_destroy(node->receive_comm);
+        node->receive_comm = NULL;
+        if (status == MSG_OK) {
+          XBT_DEBUG("Received an answer from the node I know.");
+          answer_got = 1;
+          //retrieve the node list and ping them.
+          task_data_t data = MSG_task_get_data(node->task_received);
+          xbt_assert( (data != NULL), "Null data received");
+          if (data->type == TASK_FIND_NODE_ANSWER) {
+            node_contact_t contact;
+            node_list = data->answer;
+            xbt_dynar_foreach(node_list->nodes,i, contact) {
+              node_routing_table_update(node,contact->id);
+              //ping(node,contact->id);
+            }
+            task_free(node->task_received);
+          }
+          else {
+            handle_task(node,node->task_received);
+          }
+        }
+        else {
+          trial++;
+        }
+      }
+      else {
+        MSG_process_sleep(1);
+      }
+    }
+    else {
+      MSG_process_sleep(1);
+    }
+  } while (answer_got == 0 && trial < max_join_trials);
+  /* Second step: Send a FIND_NODE to a a random node in buckets */
+  unsigned int bucket_id = routing_table_find_bucket(node->table,id_known)->id;
+  for (i = 0; ((bucket_id -i) > 0 || (bucket_id + i) <= identifier_size) && i < JOIN_BUCKETS_QUERIES; i++) {
+    if (bucket_id - i > 0) {
+      unsigned int id_in_bucket = get_id_in_prefix(node->id,bucket_id - i);
+      find_node(node,id_in_bucket,0);
+    }
+    if (bucket_id + i <= identifier_size) {
+      unsigned int id_in_bucket = get_id_in_prefix(node->id,bucket_id + i);
+      find_node(node,id_in_bucket,0);
+    }
+  }
+  return answer_got;
+}
+/**
+  * Send a request to find a node in the node routing table.
+  * @brief node our node data
+  * @brief id_to_find the id of the node we are trying to find
+  */
+unsigned int find_node(node_t node, unsigned int id_to_find, unsigned int count_in_stats) {
+
+  unsigned int i = 0;
+  unsigned int queries, answers;
+  unsigned int destination_found = 0;
+  unsigned int nodes_added = 0;
+  double time_beginreceive;
+  double timeout, global_timeout = MSG_get_clock() + find_node_global_timeout;
+  unsigned int steps = 0;
+
+  xbt_assert( (id_to_find >= 0), "Id supplied incorrect");
+  
+  /* First we build a list of who we already know */
+  answer_t node_list = node_find_closest(node,id_to_find);
+  xbt_assert((node_list != NULL),"node_list incorrect");
+  
+  XBT_DEBUG("Doing a FIND_NODE on %d", id_to_find);
+   
+  msg_error_t status;
+
+  /* Ask the nodes on our list if they   have information about
+   * the node we are trying to find */
+
+  do {
+    answers = 0;
+    queries = send_find_node_to_best(node,node_list);
+    nodes_added = 0;
+    timeout = MSG_get_clock() + find_node_timeout;
+    steps++;
+    time_beginreceive = MSG_get_clock();
+    do {
+      if (node->receive_comm == NULL) {
+        node->task_received = NULL;
+        node->receive_comm = MSG_task_irecv(&node->task_received,node->mailbox);
+      }
+      if (node->receive_comm) {
+        if (MSG_comm_test(node->receive_comm)) {
+          status = MSG_comm_get_status(node->receive_comm);
+          MSG_comm_destroy(node->receive_comm);
+          node->receive_comm = NULL;
+          if (status == MSG_OK) {
+            xbt_assert( (node->task_received != NULL), "Invalid task received");
+            //Figure out if we received an answer or something else
+            task_data_t data = MSG_task_get_data(node->task_received);
+            xbt_assert( (data != NULL), "No data in the task");
+
+            //Check if what we have received is what we are looking for.
+            if (data->type == TASK_FIND_NODE_ANSWER && data->answer->destination_id == id_to_find) {
+              //Handle the answer
+              node_routing_table_update(node,data->sender_id);
+              node_contact_t contact;
+              xbt_dynar_foreach(node_list->nodes,i, contact) {
+                  node_routing_table_update(node,contact->id);
+              }
+              answers++;
+
+              nodes_added = answer_merge(node_list,data->answer);
+              XBT_DEBUG("Received an answer from %s (%s) with %ld nodes on it",data->answer_to,data->issuer_host_name,xbt_dynar_length(data->answer->nodes));
+
+              task_free(node->task_received);
+            }
+            else {
+              handle_task(node,node->task_received);
+              //Update the timeout if we didn't have our answer
+              timeout += MSG_get_clock() - time_beginreceive;
+              time_beginreceive = MSG_get_clock();
+            }
+          }
+        }
+        else {
+            MSG_process_sleep(1);
+        }
+      }
+      else {
+        MSG_process_sleep(1);
+      }
+    } while (MSG_get_clock() < timeout && answers < queries) ;
+    destination_found = answer_destination_found(node_list);
+  } while (!destination_found && (nodes_added > 0 || answers == 0) && MSG_get_clock() < global_timeout && steps < MAX_STEPS);
+  if (destination_found) {
+    if (count_in_stats)
+      node->find_node_success++;
+      if (queries > 4)
+        XBT_VERB("FIND_NODE on %d success in %d steps",id_to_find,steps);
+    node_routing_table_update(node,id_to_find);
+  }
+  else {
+    if (count_in_stats) {
+      node->find_node_failed++;
+      XBT_VERB("%d not found in %d steps",id_to_find,steps);
+    }
+  }
+  answer_free(node_list);
+  return destination_found;
+}
+/**
+  * Pings a node in the system to see if it is online. 
+  * @param node Our node data
+  * @param id_to_ping the id of a node we want to see if it is online.
+  * @return if the ping succeded or not.
+  */
+unsigned int ping(node_t node, unsigned int id_to_ping) {
+  char mailbox[MAILBOX_NAME_SIZE];
+  sprintf(mailbox,"%d",id_to_ping);
+
+  unsigned int destination_found = 0;
+  unsigned int timeout = MSG_get_clock() +  ping_timeout;
+
+  msg_task_t ping_task = task_new_ping(node->id,node->mailbox,MSG_host_get_name(MSG_host_self()));
+  msg_task_t task_received = NULL;
+  
+  XBT_VERB("PING %d",id_to_ping);
+
+  //Check that we aren't trying to ping ourselves
+  if (id_to_ping == node->id) {
+    return 1;
+  }
+
+  /* Sending the ping task */
+  MSG_task_dsend(ping_task,mailbox,task_free_v);
+  do
+  { 
+    task_received = NULL;
+    msg_error_t status = MSG_task_receive_with_timeout(&task_received,node->mailbox,ping_timeout);
+    if (status == MSG_OK) {
+      xbt_assert( (task_received != NULL), "Invalid task received");
+      //Checking if it's what we are waiting for or not.
+      task_data_t data = MSG_task_get_data(task_received);
+      xbt_assert( (data != NULL) ,"didn't receive any data...");
+      if (data->type == TASK_PING_ANSWER && id_to_ping == data->sender_id) {
+        XBT_VERB("Ping to %s succeeded",mailbox);
+        node_routing_table_update(node,data->sender_id);
+        destination_found = 1;
+        task_free(task_received);
+      }
+      else {
+        //If it's not our answer, we answer the query anyway.
+        handle_task(node,task_received);
+      }
+    }
+  } while (destination_found == 0 && MSG_get_clock() < timeout);
+  
+  if (MSG_get_clock() >= timeout) {
+    XBT_DEBUG("Ping to %s has timeout.",mailbox);
+    return 0;
+  }
+  if (destination_found == -1) {
+    XBT_DEBUG("It seems that %s is offline...",mailbox);
+    return 0;
+  }
+  return 1;
+}
+/**
+  * Does a pseudo-random lookup for someone in the system
+  * @param node caller node data
+  */
+void random_lookup(node_t node) {
+  unsigned int id_to_look = RANDOM_LOOKUP_NODE; //Totally random.
+  /* TODO: Use some pseudorandom generator like RngStream. */
+  XBT_DEBUG("I'm doing a random lookup");
+  find_node(node,id_to_look,1);
+}
+/**
+  * @brief Send a "FIND_NODE" to a node 
+  * @param node sender node data
+  * @param id node we are querying
+  * @param destination node we are trying to find.
+  */
+void send_find_node(node_t node, unsigned int id, unsigned int destination) {
+  char mailbox[MAILBOX_NAME_SIZE];
+  /* Gets the mailbox to send to */
+  get_node_mailbox(id,mailbox);
+  /* Build the task */
+  msg_task_t task = task_new_find_node(node->id,destination,node->mailbox,MSG_host_get_name(MSG_host_self()));
+  /* Send the task */
+  xbt_assert( (task != NULL), "Trying to send a NULL task.");
+  MSG_task_dsend(task,mailbox,task_free_v);
+  XBT_VERB("Asking %s for its closest nodes",mailbox);
+} 
+/**
+  * Sends to the best "kademlia_alpha" nodes in the "node_list" array a "FIND_NODE" request, to ask them for their best nodes
+  */
+unsigned int send_find_node_to_best(node_t node, answer_t node_list) {
+  unsigned int i = 0, j = 0;
+  unsigned int destination = node_list->destination_id;
+  node_contact_t node_to_query;
+  while (j < kademlia_alpha && i < node_list->size) { /* We need to have at most "kademlia_alpha" requets each time, according to the protocol */
+    /* Gets the node we want to send the query to */
+    node_to_query = xbt_dynar_get_as(node_list->nodes,i,node_contact_t);
+    if (node_to_query->id != node->id) { /* No need to query ourselves */
+      send_find_node(node,node_to_query->id,destination);
+      j++;
+    }
+    i++;
+  }   
+  return i;  
+}
+/**
+  * \brief Handles an incomming received task
+  */
+void handle_task(node_t node, msg_task_t task) {
+  task_data_t data = MSG_task_get_data(task);
+  xbt_assert( (data != NULL), "Received NULL data");
+  //Adding/updating the guy to our routing table
+  node_routing_table_update(node,data->sender_id);
+  switch (data->type) {
+    case TASK_FIND_NODE:
+      handle_find_node(node,data);
+    break;
+    case TASK_FIND_NODE_ANSWER:
+      XBT_DEBUG("Received a wrong answer for a FIND_NODE");
+    break;
+    case TASK_PING:
+      handle_ping(node,data);
+    break;
+    default:
+    
+    break;
+  }
+  task_free(task);
+}
+/**
+  * \brief Handles the answer to an incomming "find_node" task
+  */
+void handle_find_node(node_t node, task_data_t data) {
+  XBT_VERB("Received a FIND_NODE from %s (%s), he's trying to find %d",data->answer_to,data->issuer_host_name,data->destination_id);
+  //Building the answer to the request
+  answer_t answer = node_find_closest(node,data->destination_id);
+  //Building the task to send
+  msg_task_t task = task_new_find_node_answer(node->id,data->destination_id,answer,node->mailbox,MSG_host_get_name(MSG_host_self()));
+  //Sending the task  
+  MSG_task_dsend(task,data->answer_to,task_free_v);
+}
+/**
+  * \brief handles the answer to a ping
+  */
+void handle_ping(node_t node, task_data_t data) {
+  XBT_VERB("Received a PING request from %s (%s)",data->answer_to,data->issuer_host_name);
+  //Building the answer to the request
+  msg_task_t task = task_new_ping_answer(node->id,data->answer_to,MSG_host_get_name(MSG_host_self()));
+  
+  MSG_task_dsend(task,data->answer_to,task_free_v);
+}
+/**
+  * \brief Main function
+  */
+int main(int argc, char *argv[]) {
+
+  MSG_init(&argc, argv);
+
+  /* Check the arguments */
+  if (argc < 3) {
+    printf("Usage: %s platform_file deployment_file \n",argv[0]);
+    return -1;
+  }  
+  
+  const char *platform_file = argv[1];
+  const char *deployment_file = argv[2];
+
+  MSG_create_environment(platform_file);
+  
+  MSG_function_register("node",node);
+  MSG_launch_application(deployment_file);
+  
+  msg_error_t res = MSG_main();
+
+  XBT_CRITICAL("Messages created: %ld", smx_total_comms);
+  XBT_INFO("Simulated time: %g", MSG_get_clock());
+  MSG_clean();
+  
+  if ( res == MSG_OK)
+    return 0;
+  else
+    return 1;
+}
diff --git a/examples/msg/kademlia/kademlia.h b/examples/msg/kademlia/kademlia.h
new file mode 100644 (file)
index 0000000..d9f193d
--- /dev/null
@@ -0,0 +1,28 @@
+
+/* Copyright (c) 2012. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+#ifndef _MSG_EXAMPLES_KADEMLIA_H
+#define _MSG_EXAMPLES_KADEMLIA_H
+#include "node.h"
+#include "task.h"
+//process functions
+static int node(int argc, char *argv[]);
+static void main_loop(node_t node, unsigned int deadline);
+//core kademlia functions
+unsigned int join(node_t node, unsigned int id_known);
+unsigned int find_node(node_t node, unsigned int id_to_find, unsigned int count_in_stats);
+unsigned int ping(node_t node, unsigned int id_to_ping);
+void random_lookup(node_t node);
+
+void send_find_node(node_t node, unsigned int id, unsigned int destination);
+unsigned int send_find_node_to_best(node_t node, answer_t node_list);
+
+void handle_task(node_t node, msg_task_t task);
+void handle_find_node(node_t node, task_data_t data);
+void handle_ping(node_t node, task_data_t data); 
+
+
+#endif /* _MSG_EXAMPLES_KADEMLIA_H */
diff --git a/examples/msg/kademlia/kademlia.tesh b/examples/msg/kademlia/kademlia.tesh
new file mode 100644 (file)
index 0000000..8d1ed7b
--- /dev/null
@@ -0,0 +1,35 @@
+#! ./tesh
+
+p Testing the Kademlia implementation with MSG
+
+! output sort
+$ $SG_TEST_EXENV ${bindir:=.}/kademlia ${srcdir:=.}/../msg_platform.xml ${srcdir:=.}/kademlia.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n"
+> [  0.000000] (10:node@Laroche) Hi, I'm going to join the network !
+> [  0.000000] (11:node@Tanguay) Hi, I'm going to join the network !
+> [  0.000000] (12:node@Morin) Hi, I'm going to join the network !
+> [  0.000000] (13:node@Ethernet) Hi, I'm going to join the network !
+> [  0.000000] (1:node@Jacquelin) Hi, I'm going to create the network with id 0 !
+> [  0.000000] (2:node@Boivin) Hi, I'm going to join the network !
+> [  0.000000] (3:node@Jean_Yves) Hi, I'm going to join the network !
+> [  0.000000] (4:node@TeX) Hi, I'm going to join the network !
+> [  0.000000] (5:node@Geoff) Hi, I'm going to join the network !
+> [  0.000000] (6:node@Disney) Hi, I'm going to join the network !
+> [  0.000000] (7:node@iRMX) Hi, I'm going to join the network !
+> [  0.000000] (8:node@McGee) Hi, I'm going to join the network !
+> [  0.000000] (9:node@Gatien) Hi, I'm going to join the network !
+> [900.000000] (0:@) Messages created: 1319
+> [900.000000] (0:@) Simulated time: 900
+> [900.000000] (10:node@Laroche) 7/7 FIND_NODE have succeeded
+> [900.000000] (11:node@Tanguay) 8/8 FIND_NODE have succeeded
+> [900.000000] (12:node@Morin) 7/7 FIND_NODE have succeeded
+> [900.000000] (13:node@Ethernet) 7/7 FIND_NODE have succeeded
+> [900.000000] (1:node@Jacquelin) 8/8 FIND_NODE have succeeded
+> [900.000000] (2:node@Boivin) 4/4 FIND_NODE have succeeded
+> [900.000000] (3:node@Jean_Yves) 7/7 FIND_NODE have succeeded
+> [900.000000] (4:node@TeX) 6/6 FIND_NODE have succeeded
+> [900.000000] (5:node@Geoff) 6/6 FIND_NODE have succeeded
+> [900.000000] (6:node@Disney) 6/6 FIND_NODE have succeeded
+> [900.000000] (7:node@iRMX) 6/6 FIND_NODE have succeeded
+> [900.000000] (8:node@McGee) 7/7 FIND_NODE have succeeded
+> [900.000000] (9:node@Gatien) 7/7 FIND_NODE have succeeded
+
diff --git a/examples/msg/kademlia/kademlia.xml b/examples/msg/kademlia/kademlia.xml
new file mode 100644 (file)
index 0000000..bc7c27d
--- /dev/null
@@ -0,0 +1,72 @@
+<?xml version='1.0'?>
+<!DOCTYPE platform SYSTEM "http://simgrid.gforge.inria.fr/simgrid.dtd">
+<platform version="3">
+
+  <process host="Jacquelin" function="node">
+    <argument value="0"/>        <!-- my id -->
+    <argument value ="900"/>           <!-- deadline -->
+  </process>
+
+  <process host="Boivin" function="node">
+    <argument value="1"/>        <!-- my id -->
+    <argument value="0"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->
+  </process>
+
+  <process host="Jean_Yves" function="node">
+    <argument value="11"/>        <!-- my id -->
+    <argument value="1"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->
+  </process>
+
+  <process host="TeX" function="node">
+    <argument value="111"/>        <!-- my id -->
+    <argument value="11"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+  <process host="Geoff" function="node">
+    <argument value="1111"/>        <!-- my id -->
+    <argument value="111"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+  <process host="Disney" function="node">
+    <argument value="11111"/>        <!-- my id -->
+    <argument value="1111"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+  <process host="iRMX" function="node">
+    <argument value="111111"/>        <!-- my id -->
+    <argument value="11111"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+  <process host="McGee" function="node">
+    <argument value="1111111"/>        <!-- my id -->
+    <argument value="111111"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+  <process host="Gatien" function="node">
+    <argument value="11111111"/>        <!-- my id -->
+    <argument value="1111111"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+  <process host="Laroche" function="node">
+    <argument value="111111111"/>        <!-- my id -->
+    <argument value="11111111"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+  <process host="Tanguay" function="node">
+    <argument value="1111111111"/>        <!-- my id -->
+    <argument value="111111111"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+   <process host="Morin" function="node">
+    <argument value="11111111111"/>        <!-- my id -->
+    <argument value="1111111111"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>    
+   <process host="Ethernet" function="node">
+    <argument value="11111111111"/>        <!-- my id -->
+    <argument value="0"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+</platform>
diff --git a/examples/msg/kademlia/node.c b/examples/msg/kademlia/node.c
new file mode 100644 (file)
index 0000000..ec36ade
--- /dev/null
@@ -0,0 +1,177 @@
+
+/* Copyright (c) 2010. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "node.h"
+#include "routing_table.h"
+
+#include "msg/msg.h"
+#include "xbt/log.h"
+#include "xbt/asserts.h"
+ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia_node,
+                             "Messages specific for this msg example");
+/**
+  * \brief Initialization of a node
+  * \param node_id the id of the node
+  * \return the node created
+  */
+node_t node_init(unsigned int node_id) {
+  node_t node = xbt_new(s_node_t,1);
+  
+  node->id = node_id;
+  node->table = routing_table_init(node_id);
+  sprintf(node->mailbox,"%d",node_id);
+  node->find_node_failed = 0;
+  node->find_node_success = 0;
+  
+  node->task_received = NULL;
+  node->receive_comm = NULL;
+
+  return node;
+}
+/*
+ * \brief Node destructor
+ */
+void node_free(node_t node)
+{
+  routing_table_free(node->table);
+  xbt_free(node);  
+}
+
+/**
+  * @brief Updates/Puts the node id unsigned into our routing table
+  * @param node Our node data
+  * @param id The id of the node we need to add unsigned into our routing table
+  */
+void node_routing_table_update(node_t node, unsigned int id) {
+  routing_table_t table = node->table;
+  //retrieval of the bucket in which the should be
+  bucket_t bucket = routing_table_find_bucket(table,id);
+  
+  //check if the id is already in the bucket. 
+  unsigned int id_pos = bucket_find_id(bucket,id);
+  
+  if (id_pos == -1) {
+    /* We check if the bucket is full or not. If it is, we evict
+    * old offline elements */
+    if (xbt_dynar_length(bucket->nodes) < bucket_size) {
+      //TODO: this is not really very efficient. Maybe we should use something else than dynars ?
+      xbt_dynar_unshift(bucket->nodes,&id);
+      XBT_VERB("I'm adding to my routing table %d",id);
+    }
+    else {
+      /* TODO: we need to evict the old elements: that's why this function is in "node" instead of "routing table". This is not implemented yet. */ 
+    }
+  }
+  else {
+    //We push to the front of the dynar the element.
+    unsigned int element = xbt_dynar_get_as(bucket->nodes,id_pos,unsigned int);
+    xbt_dynar_remove_at(bucket->nodes,id_pos,NULL);
+    xbt_dynar_unshift(bucket->nodes,&element);
+    XBT_VERB("I'm updating %d",element);
+  }
+}
+/**
+  * Finds the closest nodes to the node given.
+  * @param node : our node
+  * @param destination_id : the id of the guy we are trying to find
+  */
+answer_t node_find_closest(node_t node, unsigned int destination_id) {
+  int i;
+  answer_t answer = answer_init(destination_id);
+  /* We find the corresponding bucket for the id */
+  bucket_t bucket = routing_table_find_bucket(node->table,destination_id);
+  int bucket_id = bucket->id;
+  xbt_assert((bucket_id <= identifier_size), "Bucket found has a wrong identifier");
+  /* So, we copy the contents of the bucket unsigned into our result dynar */
+  answer_add_bucket(bucket,answer);
+  
+  /* However, if we don't have enough elements in our bucket, we NEED to 
+  include at least
+  * "bucket_size" elements (if, of course, we know at least "bucket_size" elements. So we're going to look unsigned into the other buckets.
+  */
+  for (i = 1; answer->size < bucket_size && ( (bucket_id - i > 0) || (bucket_id + i < identifier_size) ); i++) {
+    /* We check the previous buckets */
+    if (bucket_id - i >= 0) {
+      bucket_t bucket_p = &node->table->buckets[bucket_id - i];
+      answer_add_bucket(bucket_p,answer);
+    }
+    /* We check the next buckets */
+    if (bucket_id + i <= identifier_size) {
+      bucket_t bucket_n = &node->table->buckets[bucket_id + i];
+      answer_add_bucket(bucket_n,answer);
+    }
+  }
+  /* We sort the array by XOR distance */
+  answer_sort(answer);
+  /* We trim the array to have only bucket_size or less elements */
+  answer_trim(answer);
+  
+  return answer;
+}
+/**
+ * Returns an identifier which is in a specific bucket of a routing table
+ * @brief id id of the routing table owner
+ * @brief prefix id of the bucket where we want that identifier to be
+ */
+unsigned int get_id_in_prefix(unsigned int id, unsigned int prefix) {
+  if (prefix == 0) {
+    return 0;
+  }
+  unsigned int n = 1 << (prefix - 1);
+  return n ^ id;
+}
+/**
+  * \brief Returns the prefix of an identifier.
+  * The prefix is the id of the bucket in which the remote identifier xor our identifier should be stored.
+  * @param id : bigunsigned int id to test
+  * @param nb_bits : key size
+  */
+unsigned int get_node_prefix(unsigned int id, unsigned int nb_bits) {
+  unsigned int j, size = sizeof(unsigned int) * 8;
+    for (j = 0; j < size; j++)  {
+      if ( ( (id >> (size - 1 - j)) & 0x1) != 0) {
+        return nb_bits - (j);
+      }
+    }
+  return 0;
+}
+/**
+  * \brief Gets the mailbox name of a host given its identifier
+  */
+void get_node_mailbox(unsigned int id, char *mailbox) {
+  sprintf(mailbox,"%d",id);
+}
+
+/**
+  * Constructor, build a new contact information.
+  */
+node_contact_t node_contact_new(unsigned int id, unsigned int distance) {
+  node_contact_t contact = xbt_new(s_node_contact_t,1);
+  
+  contact->id = id;
+  contact->distance = distance;
+
+  return contact;
+}
+/**
+  * Builds a contact information from a contact information
+  */
+node_contact_t node_contact_copy(node_contact_t node_contact) {
+  node_contact_t contact = xbt_new(s_node_contact_t,1);
+  
+  contact->id = node_contact->id;
+  contact->distance = node_contact->distance;
+
+  return contact;
+}
+/**
+  * Destructor
+  * @param contact the node_contact to kill.
+  */
+void node_contact_free(node_contact_t contact) {
+  xbt_free(contact);
+}
diff --git a/examples/msg/kademlia/node.h b/examples/msg/kademlia/node.h
new file mode 100644 (file)
index 0000000..f2a3794
--- /dev/null
@@ -0,0 +1,57 @@
+
+/* Copyright (c) 2012. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef _MSG_EXAMPLES_ROUTING_H
+#define _MSG_EXAMPLES_ROUTING_H
+#include "xbt/dynar.h"
+#include "msg/msg.h"
+
+#include "common.h"
+
+#include "answer.h"
+#include "routing_table.h"
+/**
+  * Information about a foreign node
+  */
+typedef struct s_node_contact {
+  unsigned int id; //The node identifier
+  unsigned int distance; //The distance from the node
+} s_node_contact_t, *node_contact_t;
+
+/* 
+ * Node data
+ */
+typedef struct s_node {
+  unsigned int id; //node id - 160 bits
+  routing_table_t table; //node routing table
+  msg_comm_t receive_comm; //current receiving communication.
+  msg_task_t task_received; //current task being received
+  
+  char mailbox[MAILBOX_NAME_SIZE]; //node mailbox
+  unsigned int find_node_success; //Number of find_node which have succeeded.
+  unsigned int find_node_failed; //Number of find_node which have failed.
+
+} s_node_t, *node_t;
+
+// node functions
+node_t node_init(unsigned int id);
+void node_free(node_t node);
+
+void node_routing_table_update(node_t node, unsigned int id);
+answer_t node_find_closest(node_t node, unsigned int destination_id);
+
+
+// identifier functions
+unsigned int get_id_in_prefix(unsigned int id, unsigned int prefix);
+unsigned int get_node_prefix(unsigned int id, unsigned int nb_bits);
+void get_node_mailbox(unsigned int id, char *mailbox);
+
+// node contact functions
+node_contact_t node_contact_new(unsigned int id, unsigned int distance);
+node_contact_t node_contact_copy(node_contact_t node_contact);
+void node_contact_free(node_contact_t contact);
+#endif /* _MSG_EXAMPLES_ROUTING_H */
diff --git a/examples/msg/kademlia/routing_table.c b/examples/msg/kademlia/routing_table.c
new file mode 100644 (file)
index 0000000..6e26461
--- /dev/null
@@ -0,0 +1,102 @@
+/* Copyright (c) 2012. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+#include "routing_table.h"
+#include "node.h"
+#include "msg/msg.h"
+#include "xbt/log.h"
+#include "xbt/asserts.h"
+ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia_routing_table,
+                             "Messages specific for this msg example");
+/**
+  * \brief Initialization of a node routing table.
+  */
+routing_table_t routing_table_init(unsigned int node_id) {
+  unsigned int i;
+  routing_table_t table = xbt_new(s_routing_table_t,1);
+  table->buckets = xbt_new(s_bucket_t,identifier_size + 1);
+  for (i = 0; i < identifier_size + 1; i++) {
+    table->buckets[i].nodes = xbt_dynar_new(sizeof(unsigned int),NULL);
+    table->buckets[i].id = i;
+  }
+  table->id = node_id;
+  return table;
+}
+/**
+  * \brief Frees the routing table
+  */
+void routing_table_free(routing_table_t table) {
+  unsigned int i;
+  //Free the buckets.
+  for (i = 0; i <= identifier_size; i++) {
+    xbt_dynar_free(&table->buckets[i].nodes);
+  } 
+  xbt_free(table->buckets);
+  xbt_free(table);
+}
+/**
+  * Returns if the routing table contains the id.
+  */
+unsigned int routing_table_contains(routing_table_t table, unsigned int node_id) {
+  bucket_t bucket = routing_table_find_bucket(table,node_id);
+  return bucket_contains(bucket,node_id);
+}
+/**
+  * @brief prints the routing table, to debug stuff.
+  */
+void routing_table_print(routing_table_t table) {
+  unsigned int i, j, value;
+  XBT_INFO("Routing table of %d:",table->id);
+
+  for (i = 0; i <= identifier_size; i++) {
+    if (xbt_dynar_length(table->buckets[i].nodes) > 0) {
+      XBT_INFO("Bucket number %d: ",i);
+      xbt_dynar_foreach (table->buckets[i].nodes,j,value) {
+        XBT_INFO("Element %d: %d",j,value);
+      } 
+    }
+  }
+}
+/**
+  * Finds an identifier in a bucket and returns its position
+  * or returns -1 if it doesn't exists
+  * @param bucket the bucket in which we try to find our identifier
+  * @param id the id 
+  */
+unsigned int bucket_find_id(bucket_t bucket, unsigned int id) {
+  unsigned int i, length = xbt_dynar_length(bucket->nodes);
+  for (i = 0; i < length; i++) { //TODO: Use foreach maybe ?
+    if (id == xbt_dynar_get_as(bucket->nodes,i,unsigned int)) {
+      return i;
+    }
+  }
+  return -1;
+}
+/**
+  * Returns if the bucket contains an identifier.
+  */
+unsigned int bucket_contains(bucket_t bucket, unsigned int id) {
+  unsigned int length = xbt_dynar_length(bucket->nodes), i = 0;
+  for (i = 0; i < length; i++) {
+    if (id == xbt_dynar_get_as(bucket->nodes,i,unsigned int)) {
+      return 1;
+    }
+  }
+  return 0;
+}
+/**
+  * Fins the corresponding bucket in a routing table for a given identifier
+  * @param table the routing table
+  * @param id the identifier 
+  * @return the bucket in which the the identifier would be.
+  */
+bucket_t routing_table_find_bucket(routing_table_t table, unsigned int id) {
+  unsigned int xor_number = table->id ^ id;
+  unsigned int prefix = get_node_prefix(xor_number,identifier_size);
+  xbt_assert(prefix >= 0 && prefix <= identifier_size, "Tried to return a  bucket that doesn't exist.");
+  bucket_t bucket = &table->buckets[prefix];
+  return bucket;
+}
+
diff --git a/examples/msg/kademlia/routing_table.h b/examples/msg/kademlia/routing_table.h
new file mode 100644 (file)
index 0000000..d13b40e
--- /dev/null
@@ -0,0 +1,38 @@
+
+/* Copyright (c) 2012. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef _MSG_KADEMLIA_EXAMPLES_ROUTING_TABLE
+#define _MSG_KADEMLIA_EXAMPLES_ROUTING_TABLE
+#include "common.h"
+#include <xbt/dynar.h>
+/*
+ * Routing table bucket
+ */
+typedef struct s_bucket {
+  xbt_dynar_t nodes; //Nodes in the bucket.
+  unsigned int id; //bucket id
+} s_bucket_t, *bucket_t;
+
+/*
+ * Node routing table
+ */
+typedef struct s_routing_table {
+  unsigned int id; //node id of the client's routing table
+  s_bucket_t *buckets; //Node bucket list - 160 sized.
+} s_routing_table_t, *routing_table_t;
+// bucket functions
+unsigned int bucket_find_id(bucket_t bucket, unsigned int id);
+unsigned int bucket_contains(bucket_t bucket, unsigned int id);
+// routing table functions
+routing_table_t routing_table_init(unsigned int node_id);
+void routing_table_free(routing_table_t table);
+unsigned int routing_table_contains(routing_table_t table, unsigned int node_id);
+void routing_table_print(routing_table_t table);
+bucket_t routing_table_find_bucket(routing_table_t table, unsigned int id);
+
+#endif /* _MSG_KADEMLIA_EXAMPLES_ROUTING_TABLE */
diff --git a/examples/msg/kademlia/task.c b/examples/msg/kademlia/task.c
new file mode 100644 (file)
index 0000000..1230818
--- /dev/null
@@ -0,0 +1,120 @@
+/* Copyright (c) 2012. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+#include "task.h"
+#include "answer.h"
+ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia_task,
+                             "Messages specific for this msg example");
+/**
+  * Creates a new "find node" task
+  * @param sender_id the id of the node who sends the task
+  * @param destination_id the id the sender is trying to find
+  * @param hostname the hostname of the node, for logging purposes
+  */
+msg_task_t task_new_find_node(unsigned int sender_id, unsigned int destination_id, char *mailbox, const char *hostname) {
+
+  task_data_t data = xbt_new(s_task_data_t,1);
+
+  data->type = TASK_FIND_NODE;
+  data->sender_id = sender_id;
+  data->destination_id = destination_id;
+  data->answer = NULL;
+  data->answer_to = mailbox;
+  data->issuer_host_name = hostname;
+
+
+  msg_task_t task = MSG_task_create(NULL,COMP_SIZE,COMM_SIZE,data);  
+  
+  return task;
+}
+/**
+  * Creates a new "answer to find node" task
+  *  @param sender_id the node who sent the task
+  *  @param destination_id the node that should be found
+  *  @param answer the answer to send
+  *  @param mailbox The mailbox of the sender
+  *  @param hostname sender hostname
+  */
+msg_task_t task_new_find_node_answer(unsigned int sender_id, unsigned int destination_id, answer_t answer, char *mailbox, const char *hostname) {
+
+  task_data_t data = xbt_new(s_task_data_t,1);
+  
+  data->type = TASK_FIND_NODE_ANSWER;
+  data->sender_id = sender_id;
+  data->destination_id = destination_id;
+  data->answer = answer;
+  data->answer_to = mailbox;
+  data->issuer_host_name = hostname;      
+
+  msg_task_t task = MSG_task_create(NULL,COMP_SIZE,COMM_SIZE,data);  
+  
+  return task;
+}
+/**
+  * Creates a new "ping" task
+  * @param sender_id : sender node identifier
+  * @param mailbox : mailbox where we should respond
+  * @param hostname : hostname of the sender, for debugging purposes
+  */
+msg_task_t task_new_ping(unsigned int sender_id, char *mailbox, const char *hostname) {
+
+  task_data_t data = xbt_new(s_task_data_t,1);
+
+  data->type = TASK_PING;
+  data->sender_id = sender_id;
+  data->destination_id = 0;
+  data->answer = NULL;
+  data->answer_to = mailbox;
+  data->issuer_host_name = hostname;
+
+  msg_task_t task = MSG_task_create(NULL,COMP_SIZE,COMM_SIZE,data);  
+  
+  return task;
+}
+/**
+  * Creates a new "ping answer" task
+  * @param sender_id : sender node identifier
+  * @param mailbox : mailbox of the sender
+  * @param hostname : hostname of the sender, for debugging purposes
+  */
+msg_task_t task_new_ping_answer(unsigned int sender_id, char *mailbox, const char *hostname) {
+
+  task_data_t data = xbt_new(s_task_data_t,1);
+
+  data->type = TASK_PING_ANSWER;
+  data->sender_id = sender_id;
+  data->destination_id = 0;
+  data->answer = NULL;
+  data->answer_to = mailbox;
+  data->issuer_host_name = hostname;
+
+  msg_task_t task = MSG_task_create(NULL,COMP_SIZE,COMM_SIZE,data);  
+  
+  return task;
+}
+/**
+  * Destroys a task and its data
+  * @param task the task that'll be destroyed
+  */
+void task_free(msg_task_t task) {
+  xbt_assert( (task != NULL), "Tried to free a NULL task");
+
+  task_data_t data = MSG_task_get_data(task);
+    
+  if (data->answer) {
+    answer_free(data->answer);
+  }
+  xbt_free(data);
+  
+  MSG_task_destroy(task);
+
+}
+/**
+  * Destroys a task and its data (taking a void* pointer
+  * @param task The task that'll be destroyed
+  */
+void task_free_v(void *task) {
+  task_free(task);
+}
diff --git a/examples/msg/kademlia/task.h b/examples/msg/kademlia/task.h
new file mode 100644 (file)
index 0000000..6b0a44e
--- /dev/null
@@ -0,0 +1,44 @@
+/* Copyright (c) 2012. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+#ifndef _MSG_KADEMLIA_EXAMPLES_TASK
+#define _MSG_KADEMLIA_EXAMPLES_TASK
+#include "common.h"
+#include "node.h"
+#include "msg/msg.h"
+/**
+  * Types of tasks exchanged
+  */
+typedef enum {
+  TASK_FIND_NODE,
+  TASK_FIND_NODE_ANSWER,
+  TASK_FIND_VALUE,
+  TASK_FIND_VALUE_ANSWER,
+  TASK_PING,
+  TASK_PING_ANSWER,
+  TASK_LEAVING
+} e_task_type_t;
+/**
+  * Data attached with the tasks
+  */
+typedef struct s_task_data {
+  e_task_type_t type;
+  unsigned int sender_id; //Id of the guy who sent the task
+  unsigned int destination_id; //Id we are trying to find, if needed.
+  answer_t answer; //Answer to the request made, if needed.
+  char *answer_to; // mailbox to send the answer to (if not an answer).  
+  const char *issuer_host_name; // used for logging
+} s_task_data_t, *task_data_t;
+
+//Task handling functions
+msg_task_t task_new_find_node(unsigned int sender_id, unsigned int destination_id, char *mailbox, const char *hostname);
+
+msg_task_t task_new_find_node_answer(unsigned int sender_id, unsigned int destination_id, answer_t answer, char *mailbox, const char *hostname);
+
+msg_task_t task_new_ping(unsigned int sender_id, char *mailbox, const char *hostname);
+msg_task_t task_new_ping_answer(unsigned int sender_id, char *mailbox, const char *hostname);
+void task_free(msg_task_t task);
+void task_free_v(void *task);
+ #endif /* _MSG_KADEMLIA_EXAMPLES_TASK */