Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add kademlia lua example
authorSamuel Lepetit <samuel.lepetit@inria.fr>
Mon, 2 Jul 2012 16:13:42 +0000 (18:13 +0200)
committerSamuel Lepetit <samuel.lepetit@inria.fr>
Mon, 2 Jul 2012 16:13:47 +0000 (18:13 +0200)
examples/lua/kademlia/kademlia.lua [new file with mode: 0644]
examples/lua/kademlia/kademlia.xml [new file with mode: 0644]
examples/lua/kademlia/routing_table.lua [new file with mode: 0644]
examples/lua/kademlia/tools.lua [new file with mode: 0644]

diff --git a/examples/lua/kademlia/kademlia.lua b/examples/lua/kademlia/kademlia.lua
new file mode 100644 (file)
index 0000000..70d67d6
--- /dev/null
@@ -0,0 +1,258 @@
+-- A SimGrid Lua implementation of the Kademlia protocol.
+require("simgrid")
+
+-- Common constants
+common = {
+       COMM_SIZE = 1,
+       RANDOM_LOOKUP_INTERVAL = 100,
+       ALPHA = 3,
+       IDENTIFIER_SIZE = 32,
+       BUCKET_SIZE = 20,
+       MAX_JOIN_TRIALS = 4,
+       FIND_NODE_TIMEOUT = 10,
+       FIND_NODE_GLOBAL_TIMEOUT = 50,
+       PING_TIMEOUT = 35,
+       MAX_STEPS = 10,
+       JOIN_BUCKETS_QUERIES = 5
+}
+require("tools")
+-- Routing table 
+require("routing_table")
+
+data = {
+       id = -1,
+       mailbox = "",
+       deadline = 0,
+       comm = nil,
+       find_node_succedded = 0,
+       find_node_failed = 0
+       
+}
+
+function node(...) 
+       local args = {...}
+       
+       if #args ~= 2 and #args ~= 3 then
+               simgrid.info("Wrong argument count: " .. #args)
+               return
+       end
+       data.id = tonumber(args[1])
+       routing_table_init(data.id)
+       data.mailbox = tostring(data.id)
+       if #args == 3 then
+               data.deadline = tonumber(args[3])
+               simgrid.info("Hi, I'm going to join the network with the id " .. data.id .. " !")
+               if join_network(tonumber(args[2])) then
+                       main_loop()
+               else
+                       simgrid.info("Couldn't join the network")
+               end
+       else
+               data.deadline = tonumber(args[2])
+               routing_table_update(data.id)
+               data.comm = simgrid.task.irecv(data.mailbox)            
+               main_loop()
+       end
+       simgrid.info(data.find_node_succedded .. "/" .. (data.find_node_succedded + data.find_node_failed) .. " FIND_NODE have succedded");
+       simgrid.process.sleep(10000)
+end    
+function main_loop()
+       local next_lookup_time = simgrid.get_clock() + common.RANDOM_LOOKUP_INTERVAL
+       local now = simgrid.get_clock() 
+       while now < data.deadline do
+               task,err = data.comm:test()
+               if task then
+                       handle_task(task)
+                       data.comm = simgrid.task.irecv(data.mailbox)            
+               elseif err then
+                       data.comm = simgrid.task.irecv(data.mailbox)                            
+               else
+                       if now >= next_lookup_time then
+                               random_lookup()
+                               next_lookup_time = next_lookup_time + common.RANDOM_LOOKUP_INTERVAL
+                               now = simgrid.get_clock()
+                       else
+                               simgrid.process.sleep(1)
+                               now = simgrid.get_clock()
+                       end
+               end
+       end
+end
+function random_lookup()
+       find_node(0,true)
+end
+function join_network(id_known)
+       local answer_got = false
+       local time_begin = simgrid.get_clock()
+       
+       simgrid.debug("Joining the network knowing " .. id_known)
+       
+       routing_table_update(data.id)
+       routing_table_update(id_known)
+       
+       -- Send a FIND_NODE to the node we know
+       send_find_node(id_known,data.id)
+       -- Wait for the answer
+       local trials = 0
+       
+       data.comm = simgrid.task.irecv(data.mailbox)
+       
+       repeat 
+               task,err = data.comm:test()
+               if task then
+                       if task.type == "FIND_NODE_ANSWER" then
+                               answer_got = true
+                               local answer = task.answer
+                               -- Add the nodes we received to our routing table
+                               for i,v in pairs(answer.nodes) do
+                                       routing_table_update(v.id)
+                               end
+                       else
+                               handle_task(task)                               
+                       end
+                       data.comm = simgrid.task.irecv(data.mailbox)
+               elseif err then 
+                       data.comm = simgrid.task.irecv(data.mailbox)
+               else
+                       simgrid.process.sleep(1)
+               end
+       until answer_got or trials >= common.MAX_JOIN_TRIALS
+       -- Second step: Send a FIND_NODE in a node in each bucket
+       local bucket_id = find_bucket(id_known).id
+       local i = 0
+       while ((bucket_id - i) > 0 or (bucket_id + i) <= common.IDENTIFIER_SIZE) and i < common.JOIN_BUCKETS_QUERIES do
+               if bucket_id - i > 0 then
+                       local id_in_bucket = get_id_in_prefix(data.id,bucket_id - i)
+                       find_node(id_in_bucket,false)
+               end
+               if bucket_id + i <= common.IDENTIFIER_SIZE then
+                       local id_in_bucket = get_id_in_prefix(data.id,bucket_id + i)
+                       find_node(id_in_bucket,false)
+               end
+               i = i + 1
+       end
+       return answer_got
+end
+-- Send a request to find a node in the node's routing table.
+function find_node(destination, counts)
+       local queries, answers
+       local total_answers = 0
+       total_queries = 0
+       local nodes_added = 0
+       local destination_found = false
+       local steps = 0
+       local timeout
+       local global_timeout = simgrid.get_clock() + common.FIND_NODE_GLOBAL_TIMEOUT
+       -- Build a list of the closest nodes we already know.
+       local node_list = find_closest(destination)
+       
+       simgrid.debug("Doing a FIND_NODE on " .. destination)
+       repeat
+               answers = 0
+               queries = send_find_node_to_best(node_list)
+               nodes_added = 0
+               timeout = simgrid.get_clock() + common.FIND_NODE_TIMEOUT
+               steps = steps + 1
+               repeat
+                       task, err = data.comm:test()
+                       if task then
+                               if task.type == "FIND_NODE_ANSWER" and task.answer.destination == destination then
+                                       routing_table_update(task.sender_id)
+                                       for i,v in pairs(task.answer.nodes) do
+                                               routing_table_update(v.id)
+                                       end
+                                       nodes_added = merge_answer(node_list,task.answer)
+                               else
+                                       handle_task(task)
+                               end
+                               data.comm = simgrid.task.irecv(data.mailbox)                    
+                       elseif err then
+                               data.comm = simgrid.task.irecv(data.mailbox)
+                       else
+                               simgrid.process.sleep(1)
+                       end
+                       
+               until answers >= queries or simgrid.get_clock() >= timeout
+               if (#node_list.nodes > 0) then
+                       destination_found = (node_list.nodes[1].distance == 0)
+               else
+                       destination_found = false
+               end
+       until destination_found or (nodes_added > 0 and answers == 0) or simgrid.get_clock() >= global_timeout or steps >= common.MAX_STEPS
+       if destination_found then
+               simgrid.debug("Find node on " .. destination .. " succedded")
+               if counts then
+                       data.find_node_succedded = data.find_node_succedded + 1
+               end
+       else
+               simgrid.debug("Find node on " .. destination .. " failed")
+               if counts then
+                       data.find_node_failed = data.find_node_failed + 1
+               end
+       end
+
+       return destination_found
+end
+-- Sends a "FIND_NODE" request (task) to a node we know.
+function send_find_node(id, destination)
+       simgrid.debug("Sending a FIND_NODE to " .. id .. " to find " .. destination);
+       
+       local task = simgrid.task.new("",0, common.COMM_SIZE)
+       task.type = "FIND_NODE"
+       task.sender_id = data.id
+       task.destination = destination
+               
+       task:dsend(tostring(id))
+       
+end
+-- Sends a "FIND_NODE" request to the best "alpha" nodes in a node
+-- list
+function send_find_node_to_best(node_list)
+       destination = node_list.destination
+       local i = 1
+       while i <= common.ALPHA and i < #node_list.nodes do
+               if data.id ~= node_list.nodes[i].id then
+                       send_find_node(node_list.nodes[i].id,destination)
+               end
+               i = i + 1
+       end
+       return i - 1
+end
+-- Handles an incomming task
+function handle_task(task)
+       routing_table_update(task.sender_id)
+       if task.type == "FIND_NODE" then
+               handle_find_node(task)
+       elseif task.type == "PING" then
+               handle_ping(task)
+       end
+end
+function handle_find_node(task)
+       simgrid.debug("Received a FIND_NODE from " .. task.sender_id)
+       local answer = find_closest(task.destination)
+       local task_answer = simgrid.task.new("",0, common.COMM_SIZE)
+       task_answer.type = "FIND_NODE_ANSWER"
+       task_answer.sender_id = data.id
+       task_answer.destination = task.destination
+       task_answer.answer = answer
+       task_answer:dsend(tostring(task.sender_id))     
+end    
+function handle_ping(task)
+       simgrid.info("Received a PING from " .. task.sender_id)
+       local task_answer = simgrid.task.new("",0, common.COMM_SIZE)
+       task_answer.type = "PING_ANSWER"
+       task_answer.sender_id = data.id
+       task_answer.destination = task.destination
+       task_answer:dsend(tostring(task.sender_id))
+end
+function merge_answer(m1, m2)
+       local nb_added = 0
+       for i,v in pairs(m2.nodes) do
+               table.insert(m1.nodes,v)
+               nb_added = nb_added + 1
+       end
+       return nb_added
+end
+simgrid.platform(arg[1] or  "../../msg/msg_platform.xml")
+simgrid.application(arg[2] or "kademlia.xml")
+simgrid.run()
diff --git a/examples/lua/kademlia/kademlia.xml b/examples/lua/kademlia/kademlia.xml
new file mode 100644 (file)
index 0000000..bc7c27d
--- /dev/null
@@ -0,0 +1,72 @@
+<?xml version='1.0'?>
+<!DOCTYPE platform SYSTEM "http://simgrid.gforge.inria.fr/simgrid.dtd">
+<platform version="3">
+
+  <process host="Jacquelin" function="node">
+    <argument value="0"/>        <!-- my id -->
+    <argument value ="900"/>           <!-- deadline -->
+  </process>
+
+  <process host="Boivin" function="node">
+    <argument value="1"/>        <!-- my id -->
+    <argument value="0"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->
+  </process>
+
+  <process host="Jean_Yves" function="node">
+    <argument value="11"/>        <!-- my id -->
+    <argument value="1"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->
+  </process>
+
+  <process host="TeX" function="node">
+    <argument value="111"/>        <!-- my id -->
+    <argument value="11"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+  <process host="Geoff" function="node">
+    <argument value="1111"/>        <!-- my id -->
+    <argument value="111"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+  <process host="Disney" function="node">
+    <argument value="11111"/>        <!-- my id -->
+    <argument value="1111"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+  <process host="iRMX" function="node">
+    <argument value="111111"/>        <!-- my id -->
+    <argument value="11111"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+  <process host="McGee" function="node">
+    <argument value="1111111"/>        <!-- my id -->
+    <argument value="111111"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+  <process host="Gatien" function="node">
+    <argument value="11111111"/>        <!-- my id -->
+    <argument value="1111111"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+  <process host="Laroche" function="node">
+    <argument value="111111111"/>        <!-- my id -->
+    <argument value="11111111"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+  <process host="Tanguay" function="node">
+    <argument value="1111111111"/>        <!-- my id -->
+    <argument value="111111111"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+   <process host="Morin" function="node">
+    <argument value="11111111111"/>        <!-- my id -->
+    <argument value="1111111111"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>    
+   <process host="Ethernet" function="node">
+    <argument value="11111111111"/>        <!-- my id -->
+    <argument value="0"/>         <!-- known id -->
+    <argument value ="900"/>           <!-- deadline -->  
+  </process>
+</platform>
diff --git a/examples/lua/kademlia/routing_table.lua b/examples/lua/kademlia/routing_table.lua
new file mode 100644 (file)
index 0000000..3c2b636
--- /dev/null
@@ -0,0 +1,92 @@
+-- Routing table data
+routing_table = {
+buckets = {},
+}
+-- Initialize the routing table
+function routing_table_init(id) 
+       routing_table.id = id
+       -- Bucket initialization
+       for i = 0,common.IDENTIFIER_SIZE do
+               routing_table.buckets[i] = {id = i, set = {}, list = {}}
+       end
+end
+-- Returns an identifier which is in a specific bucket of a routing table
+function get_id_in_prefix(id, prefix)
+       if id == 0 then
+               return 0
+       end
+       local identifier = 1
+       identifier = math.pow(identifier,prefix - 1)
+       identifier = bxor(identifier,id)
+       return identifier
+end
+-- Returns the corresponding node prefix for a given id
+function get_node_prefix(id)
+       for i = 0,32 do
+               if is_integer(id / math.pow(2,i)) and (id / math.pow(2,i)) % 2 == 1 then
+                       return 31 - i
+               end
+       end 
+       return 0
+end
+-- Finds the corresponding bucket in a routing table for a given identifier
+function find_bucket(id)
+       local xor_number = bxor(id,routing_table.id)
+       local prefix = get_node_prefix(xor_number)
+       --simgrid.info("Prefix:" .. prefix .. " number:" .. xor_number)
+       return routing_table.buckets[prefix]
+end
+-- Updates the routing table with a new value.
+function routing_table_update(id)
+       if id == routing_table.id then
+               return
+       end
+       local bucket = find_bucket(id)
+       if bucket.set[id] ~= nil then
+               simgrid.debug("Updating " .. id .. " in my routing table")
+               -- If the element is already in the bucket, we update it.
+               table.remove(bucket.list,index_of(bucket.list,id))
+               table.insert(bucket.list,0,id)
+       else    
+               simgrid.debug("Insert " .. id .. " in my routing table in bucket " .. bucket.id)
+               table.insert(bucket.list,id)
+               bucket.set[id] = true
+       end
+end
+-- Returns the closest notes we know to a given id 
+function find_closest(destination_id)
+       local answer = {destination = destination_id, nodes = {}}
+       
+       local bucket = find_bucket(destination_id)
+       for i,v in pairs(bucket.list) do
+               table.insert(answer.nodes,{id = v, distance = bxor(v,destination_id)})
+       end
+       
+       local i = 1
+       
+       while #answer.nodes < common.BUCKET_SIZE and ((bucket.id - i) >= 0 or (bucket.id + i) <= common.IDENTIFIER_SIZE) do
+               -- Check the previous buckets
+               if bucket.id - i >= 0 then
+                       local bucket_p = routing_table.buckets[bucket.id - i]
+                       for i,v in pairs(bucket_p.list) do
+                               table.insert(answer.nodes,{id = v, distance = bxor(v,destination_id)})
+                       end
+               end                     
+               -- Check the next buckets
+               if bucket.id + i <= common.IDENTIFIER_SIZE then
+                       local bucket_n = routing_table.buckets[bucket.id + i]
+                       for i,v in pairs(bucket_n.list) do
+                               table.insert(answer.nodes,{id = v, distance = bxor(v,destination_id)})
+                       end                     
+               end
+               i = i + 1
+       end
+       -- Sort the list
+       table.sort(answer.nodes, function(a,b) return a.distance < b.distance end)
+       -- Trim the list
+       while #answer.nodes > common.BUCKET_SIZE do
+               table.remove(answer.nodes)
+       end
+       
+       return answer
+end
\ No newline at end of file
diff --git a/examples/lua/kademlia/tools.lua b/examples/lua/kademlia/tools.lua
new file mode 100644 (file)
index 0000000..664c342
--- /dev/null
@@ -0,0 +1,24 @@
+function bxor (a,b)
+  local r = 0
+  for i = 0, 31 do
+    local x = a / 2 + b / 2
+    if x ~= math.floor (x) then
+      r = r + 2^i
+    end
+    a = math.floor (a / 2)
+    b = math.floor (b / 2)
+  end
+  return r
+end
+
+function index_of(table, o)
+       for i,v in pairs(table) do
+               if v == o then
+                       return i;
+               end
+       end
+       return -1
+end
+function is_integer(x)
+return math.floor(x)==x
+end
\ No newline at end of file