--- /dev/null
+-- A SimGrid Lua implementation of the Kademlia protocol.
+require("simgrid")
+
+-- Common constants
+common = {
+ COMM_SIZE = 1,
+ RANDOM_LOOKUP_INTERVAL = 100,
+ ALPHA = 3,
+ IDENTIFIER_SIZE = 32,
+ BUCKET_SIZE = 20,
+ MAX_JOIN_TRIALS = 4,
+ FIND_NODE_TIMEOUT = 10,
+ FIND_NODE_GLOBAL_TIMEOUT = 50,
+ PING_TIMEOUT = 35,
+ MAX_STEPS = 10,
+ JOIN_BUCKETS_QUERIES = 5
+}
+require("tools")
+-- Routing table
+require("routing_table")
+
+data = {
+ id = -1,
+ mailbox = "",
+ deadline = 0,
+ comm = nil,
+ find_node_succedded = 0,
+ find_node_failed = 0
+
+}
+
+function node(...)
+ local args = {...}
+
+ if #args ~= 2 and #args ~= 3 then
+ simgrid.info("Wrong argument count: " .. #args)
+ return
+ end
+ data.id = tonumber(args[1])
+ routing_table_init(data.id)
+ data.mailbox = tostring(data.id)
+ if #args == 3 then
+ data.deadline = tonumber(args[3])
+ simgrid.info("Hi, I'm going to join the network with the id " .. data.id .. " !")
+ if join_network(tonumber(args[2])) then
+ main_loop()
+ else
+ simgrid.info("Couldn't join the network")
+ end
+ else
+ data.deadline = tonumber(args[2])
+ routing_table_update(data.id)
+ data.comm = simgrid.task.irecv(data.mailbox)
+ main_loop()
+ end
+ simgrid.info(data.find_node_succedded .. "/" .. (data.find_node_succedded + data.find_node_failed) .. " FIND_NODE have succedded");
+ simgrid.process.sleep(10000)
+end
+function main_loop()
+ local next_lookup_time = simgrid.get_clock() + common.RANDOM_LOOKUP_INTERVAL
+ local now = simgrid.get_clock()
+ while now < data.deadline do
+ task,err = data.comm:test()
+ if task then
+ handle_task(task)
+ data.comm = simgrid.task.irecv(data.mailbox)
+ elseif err then
+ data.comm = simgrid.task.irecv(data.mailbox)
+ else
+ if now >= next_lookup_time then
+ random_lookup()
+ next_lookup_time = next_lookup_time + common.RANDOM_LOOKUP_INTERVAL
+ now = simgrid.get_clock()
+ else
+ simgrid.process.sleep(1)
+ now = simgrid.get_clock()
+ end
+ end
+ end
+end
+function random_lookup()
+ find_node(0,true)
+end
+function join_network(id_known)
+ local answer_got = false
+ local time_begin = simgrid.get_clock()
+
+ simgrid.debug("Joining the network knowing " .. id_known)
+
+ routing_table_update(data.id)
+ routing_table_update(id_known)
+
+ -- Send a FIND_NODE to the node we know
+ send_find_node(id_known,data.id)
+ -- Wait for the answer
+ local trials = 0
+
+ data.comm = simgrid.task.irecv(data.mailbox)
+
+ repeat
+ task,err = data.comm:test()
+ if task then
+ if task.type == "FIND_NODE_ANSWER" then
+ answer_got = true
+ local answer = task.answer
+ -- Add the nodes we received to our routing table
+ for i,v in pairs(answer.nodes) do
+ routing_table_update(v.id)
+ end
+ else
+ handle_task(task)
+ end
+ data.comm = simgrid.task.irecv(data.mailbox)
+ elseif err then
+ data.comm = simgrid.task.irecv(data.mailbox)
+ else
+ simgrid.process.sleep(1)
+ end
+ until answer_got or trials >= common.MAX_JOIN_TRIALS
+ -- Second step: Send a FIND_NODE in a node in each bucket
+ local bucket_id = find_bucket(id_known).id
+ local i = 0
+ while ((bucket_id - i) > 0 or (bucket_id + i) <= common.IDENTIFIER_SIZE) and i < common.JOIN_BUCKETS_QUERIES do
+ if bucket_id - i > 0 then
+ local id_in_bucket = get_id_in_prefix(data.id,bucket_id - i)
+ find_node(id_in_bucket,false)
+ end
+ if bucket_id + i <= common.IDENTIFIER_SIZE then
+ local id_in_bucket = get_id_in_prefix(data.id,bucket_id + i)
+ find_node(id_in_bucket,false)
+ end
+ i = i + 1
+ end
+ return answer_got
+end
+-- Send a request to find a node in the node's routing table.
+function find_node(destination, counts)
+ local queries, answers
+ local total_answers = 0
+ total_queries = 0
+ local nodes_added = 0
+ local destination_found = false
+ local steps = 0
+ local timeout
+ local global_timeout = simgrid.get_clock() + common.FIND_NODE_GLOBAL_TIMEOUT
+ -- Build a list of the closest nodes we already know.
+ local node_list = find_closest(destination)
+
+ simgrid.debug("Doing a FIND_NODE on " .. destination)
+ repeat
+ answers = 0
+ queries = send_find_node_to_best(node_list)
+ nodes_added = 0
+ timeout = simgrid.get_clock() + common.FIND_NODE_TIMEOUT
+ steps = steps + 1
+ repeat
+ task, err = data.comm:test()
+ if task then
+ if task.type == "FIND_NODE_ANSWER" and task.answer.destination == destination then
+ routing_table_update(task.sender_id)
+ for i,v in pairs(task.answer.nodes) do
+ routing_table_update(v.id)
+ end
+ nodes_added = merge_answer(node_list,task.answer)
+ else
+ handle_task(task)
+ end
+ data.comm = simgrid.task.irecv(data.mailbox)
+ elseif err then
+ data.comm = simgrid.task.irecv(data.mailbox)
+ else
+ simgrid.process.sleep(1)
+ end
+
+ until answers >= queries or simgrid.get_clock() >= timeout
+ if (#node_list.nodes > 0) then
+ destination_found = (node_list.nodes[1].distance == 0)
+ else
+ destination_found = false
+ end
+ until destination_found or (nodes_added > 0 and answers == 0) or simgrid.get_clock() >= global_timeout or steps >= common.MAX_STEPS
+ if destination_found then
+ simgrid.debug("Find node on " .. destination .. " succedded")
+ if counts then
+ data.find_node_succedded = data.find_node_succedded + 1
+ end
+ else
+ simgrid.debug("Find node on " .. destination .. " failed")
+ if counts then
+ data.find_node_failed = data.find_node_failed + 1
+ end
+ end
+
+ return destination_found
+end
+-- Sends a "FIND_NODE" request (task) to a node we know.
+function send_find_node(id, destination)
+ simgrid.debug("Sending a FIND_NODE to " .. id .. " to find " .. destination);
+
+ local task = simgrid.task.new("",0, common.COMM_SIZE)
+ task.type = "FIND_NODE"
+ task.sender_id = data.id
+ task.destination = destination
+
+ task:dsend(tostring(id))
+
+end
+-- Sends a "FIND_NODE" request to the best "alpha" nodes in a node
+-- list
+function send_find_node_to_best(node_list)
+ destination = node_list.destination
+ local i = 1
+ while i <= common.ALPHA and i < #node_list.nodes do
+ if data.id ~= node_list.nodes[i].id then
+ send_find_node(node_list.nodes[i].id,destination)
+ end
+ i = i + 1
+ end
+ return i - 1
+end
+-- Handles an incomming task
+function handle_task(task)
+ routing_table_update(task.sender_id)
+ if task.type == "FIND_NODE" then
+ handle_find_node(task)
+ elseif task.type == "PING" then
+ handle_ping(task)
+ end
+end
+function handle_find_node(task)
+ simgrid.debug("Received a FIND_NODE from " .. task.sender_id)
+ local answer = find_closest(task.destination)
+ local task_answer = simgrid.task.new("",0, common.COMM_SIZE)
+ task_answer.type = "FIND_NODE_ANSWER"
+ task_answer.sender_id = data.id
+ task_answer.destination = task.destination
+ task_answer.answer = answer
+ task_answer:dsend(tostring(task.sender_id))
+end
+function handle_ping(task)
+ simgrid.info("Received a PING from " .. task.sender_id)
+ local task_answer = simgrid.task.new("",0, common.COMM_SIZE)
+ task_answer.type = "PING_ANSWER"
+ task_answer.sender_id = data.id
+ task_answer.destination = task.destination
+ task_answer:dsend(tostring(task.sender_id))
+end
+function merge_answer(m1, m2)
+ local nb_added = 0
+ for i,v in pairs(m2.nodes) do
+ table.insert(m1.nodes,v)
+ nb_added = nb_added + 1
+ end
+ return nb_added
+end
+simgrid.platform(arg[1] or "../../msg/msg_platform.xml")
+simgrid.application(arg[2] or "kademlia.xml")
+simgrid.run()
--- /dev/null
+<?xml version='1.0'?>
+<!DOCTYPE platform SYSTEM "http://simgrid.gforge.inria.fr/simgrid.dtd">
+<platform version="3">
+
+ <process host="Jacquelin" function="node">
+ <argument value="0"/> <!-- my id -->
+ <argument value ="900"/> <!-- deadline -->
+ </process>
+
+ <process host="Boivin" function="node">
+ <argument value="1"/> <!-- my id -->
+ <argument value="0"/> <!-- known id -->
+ <argument value ="900"/> <!-- deadline -->
+ </process>
+
+ <process host="Jean_Yves" function="node">
+ <argument value="11"/> <!-- my id -->
+ <argument value="1"/> <!-- known id -->
+ <argument value ="900"/> <!-- deadline -->
+ </process>
+
+ <process host="TeX" function="node">
+ <argument value="111"/> <!-- my id -->
+ <argument value="11"/> <!-- known id -->
+ <argument value ="900"/> <!-- deadline -->
+ </process>
+ <process host="Geoff" function="node">
+ <argument value="1111"/> <!-- my id -->
+ <argument value="111"/> <!-- known id -->
+ <argument value ="900"/> <!-- deadline -->
+ </process>
+ <process host="Disney" function="node">
+ <argument value="11111"/> <!-- my id -->
+ <argument value="1111"/> <!-- known id -->
+ <argument value ="900"/> <!-- deadline -->
+ </process>
+ <process host="iRMX" function="node">
+ <argument value="111111"/> <!-- my id -->
+ <argument value="11111"/> <!-- known id -->
+ <argument value ="900"/> <!-- deadline -->
+ </process>
+ <process host="McGee" function="node">
+ <argument value="1111111"/> <!-- my id -->
+ <argument value="111111"/> <!-- known id -->
+ <argument value ="900"/> <!-- deadline -->
+ </process>
+ <process host="Gatien" function="node">
+ <argument value="11111111"/> <!-- my id -->
+ <argument value="1111111"/> <!-- known id -->
+ <argument value ="900"/> <!-- deadline -->
+ </process>
+ <process host="Laroche" function="node">
+ <argument value="111111111"/> <!-- my id -->
+ <argument value="11111111"/> <!-- known id -->
+ <argument value ="900"/> <!-- deadline -->
+ </process>
+ <process host="Tanguay" function="node">
+ <argument value="1111111111"/> <!-- my id -->
+ <argument value="111111111"/> <!-- known id -->
+ <argument value ="900"/> <!-- deadline -->
+ </process>
+ <process host="Morin" function="node">
+ <argument value="11111111111"/> <!-- my id -->
+ <argument value="1111111111"/> <!-- known id -->
+ <argument value ="900"/> <!-- deadline -->
+ </process>
+ <process host="Ethernet" function="node">
+ <argument value="11111111111"/> <!-- my id -->
+ <argument value="0"/> <!-- known id -->
+ <argument value ="900"/> <!-- deadline -->
+ </process>
+</platform>
--- /dev/null
+-- Routing table data
+routing_table = {
+buckets = {},
+}
+-- Initialize the routing table
+function routing_table_init(id)
+ routing_table.id = id
+ -- Bucket initialization
+ for i = 0,common.IDENTIFIER_SIZE do
+ routing_table.buckets[i] = {id = i, set = {}, list = {}}
+ end
+end
+-- Returns an identifier which is in a specific bucket of a routing table
+function get_id_in_prefix(id, prefix)
+ if id == 0 then
+ return 0
+ end
+ local identifier = 1
+ identifier = math.pow(identifier,prefix - 1)
+ identifier = bxor(identifier,id)
+ return identifier
+end
+-- Returns the corresponding node prefix for a given id
+function get_node_prefix(id)
+ for i = 0,32 do
+ if is_integer(id / math.pow(2,i)) and (id / math.pow(2,i)) % 2 == 1 then
+ return 31 - i
+ end
+ end
+ return 0
+end
+-- Finds the corresponding bucket in a routing table for a given identifier
+function find_bucket(id)
+ local xor_number = bxor(id,routing_table.id)
+ local prefix = get_node_prefix(xor_number)
+ --simgrid.info("Prefix:" .. prefix .. " number:" .. xor_number)
+ return routing_table.buckets[prefix]
+end
+-- Updates the routing table with a new value.
+function routing_table_update(id)
+ if id == routing_table.id then
+ return
+ end
+ local bucket = find_bucket(id)
+ if bucket.set[id] ~= nil then
+ simgrid.debug("Updating " .. id .. " in my routing table")
+ -- If the element is already in the bucket, we update it.
+ table.remove(bucket.list,index_of(bucket.list,id))
+ table.insert(bucket.list,0,id)
+ else
+ simgrid.debug("Insert " .. id .. " in my routing table in bucket " .. bucket.id)
+ table.insert(bucket.list,id)
+ bucket.set[id] = true
+ end
+end
+-- Returns the closest notes we know to a given id
+function find_closest(destination_id)
+ local answer = {destination = destination_id, nodes = {}}
+
+ local bucket = find_bucket(destination_id)
+ for i,v in pairs(bucket.list) do
+ table.insert(answer.nodes,{id = v, distance = bxor(v,destination_id)})
+ end
+
+ local i = 1
+
+ while #answer.nodes < common.BUCKET_SIZE and ((bucket.id - i) >= 0 or (bucket.id + i) <= common.IDENTIFIER_SIZE) do
+ -- Check the previous buckets
+ if bucket.id - i >= 0 then
+ local bucket_p = routing_table.buckets[bucket.id - i]
+ for i,v in pairs(bucket_p.list) do
+ table.insert(answer.nodes,{id = v, distance = bxor(v,destination_id)})
+ end
+ end
+ -- Check the next buckets
+ if bucket.id + i <= common.IDENTIFIER_SIZE then
+ local bucket_n = routing_table.buckets[bucket.id + i]
+ for i,v in pairs(bucket_n.list) do
+ table.insert(answer.nodes,{id = v, distance = bxor(v,destination_id)})
+ end
+ end
+ i = i + 1
+ end
+ -- Sort the list
+ table.sort(answer.nodes, function(a,b) return a.distance < b.distance end)
+ -- Trim the list
+ while #answer.nodes > common.BUCKET_SIZE do
+ table.remove(answer.nodes)
+ end
+
+ return answer
+end
\ No newline at end of file