From: Samuel Lepetit Date: Mon, 2 Jul 2012 16:13:42 +0000 (+0200) Subject: Add kademlia lua example X-Git-Tag: v3_8~384 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/78b439281c3499aaece9e9cbb9e1ea8f2db55fb5?hp=7622b935dd8a79a4d564bd87bdee36195633b758;ds=inline Add kademlia lua example --- diff --git a/examples/lua/kademlia/kademlia.lua b/examples/lua/kademlia/kademlia.lua new file mode 100644 index 0000000000..70d67d6d41 --- /dev/null +++ b/examples/lua/kademlia/kademlia.lua @@ -0,0 +1,258 @@ +-- A SimGrid Lua implementation of the Kademlia protocol. +require("simgrid") + +-- Common constants +common = { + COMM_SIZE = 1, + RANDOM_LOOKUP_INTERVAL = 100, + ALPHA = 3, + IDENTIFIER_SIZE = 32, + BUCKET_SIZE = 20, + MAX_JOIN_TRIALS = 4, + FIND_NODE_TIMEOUT = 10, + FIND_NODE_GLOBAL_TIMEOUT = 50, + PING_TIMEOUT = 35, + MAX_STEPS = 10, + JOIN_BUCKETS_QUERIES = 5 +} +require("tools") +-- Routing table +require("routing_table") + +data = { + id = -1, + mailbox = "", + deadline = 0, + comm = nil, + find_node_succedded = 0, + find_node_failed = 0 + +} + +function node(...) + local args = {...} + + if #args ~= 2 and #args ~= 3 then + simgrid.info("Wrong argument count: " .. #args) + return + end + data.id = tonumber(args[1]) + routing_table_init(data.id) + data.mailbox = tostring(data.id) + if #args == 3 then + data.deadline = tonumber(args[3]) + simgrid.info("Hi, I'm going to join the network with the id " .. data.id .. " !") + if join_network(tonumber(args[2])) then + main_loop() + else + simgrid.info("Couldn't join the network") + end + else + data.deadline = tonumber(args[2]) + routing_table_update(data.id) + data.comm = simgrid.task.irecv(data.mailbox) + main_loop() + end + simgrid.info(data.find_node_succedded .. "/" .. (data.find_node_succedded + data.find_node_failed) .. " FIND_NODE have succedded"); + simgrid.process.sleep(10000) +end +function main_loop() + local next_lookup_time = simgrid.get_clock() + common.RANDOM_LOOKUP_INTERVAL + local now = simgrid.get_clock() + while now < data.deadline do + task,err = data.comm:test() + if task then + handle_task(task) + data.comm = simgrid.task.irecv(data.mailbox) + elseif err then + data.comm = simgrid.task.irecv(data.mailbox) + else + if now >= next_lookup_time then + random_lookup() + next_lookup_time = next_lookup_time + common.RANDOM_LOOKUP_INTERVAL + now = simgrid.get_clock() + else + simgrid.process.sleep(1) + now = simgrid.get_clock() + end + end + end +end +function random_lookup() + find_node(0,true) +end +function join_network(id_known) + local answer_got = false + local time_begin = simgrid.get_clock() + + simgrid.debug("Joining the network knowing " .. id_known) + + routing_table_update(data.id) + routing_table_update(id_known) + + -- Send a FIND_NODE to the node we know + send_find_node(id_known,data.id) + -- Wait for the answer + local trials = 0 + + data.comm = simgrid.task.irecv(data.mailbox) + + repeat + task,err = data.comm:test() + if task then + if task.type == "FIND_NODE_ANSWER" then + answer_got = true + local answer = task.answer + -- Add the nodes we received to our routing table + for i,v in pairs(answer.nodes) do + routing_table_update(v.id) + end + else + handle_task(task) + end + data.comm = simgrid.task.irecv(data.mailbox) + elseif err then + data.comm = simgrid.task.irecv(data.mailbox) + else + simgrid.process.sleep(1) + end + until answer_got or trials >= common.MAX_JOIN_TRIALS + -- Second step: Send a FIND_NODE in a node in each bucket + local bucket_id = find_bucket(id_known).id + local i = 0 + while ((bucket_id - i) > 0 or (bucket_id + i) <= common.IDENTIFIER_SIZE) and i < common.JOIN_BUCKETS_QUERIES do + if bucket_id - i > 0 then + local id_in_bucket = get_id_in_prefix(data.id,bucket_id - i) + find_node(id_in_bucket,false) + end + if bucket_id + i <= common.IDENTIFIER_SIZE then + local id_in_bucket = get_id_in_prefix(data.id,bucket_id + i) + find_node(id_in_bucket,false) + end + i = i + 1 + end + return answer_got +end +-- Send a request to find a node in the node's routing table. +function find_node(destination, counts) + local queries, answers + local total_answers = 0 + total_queries = 0 + local nodes_added = 0 + local destination_found = false + local steps = 0 + local timeout + local global_timeout = simgrid.get_clock() + common.FIND_NODE_GLOBAL_TIMEOUT + -- Build a list of the closest nodes we already know. + local node_list = find_closest(destination) + + simgrid.debug("Doing a FIND_NODE on " .. destination) + repeat + answers = 0 + queries = send_find_node_to_best(node_list) + nodes_added = 0 + timeout = simgrid.get_clock() + common.FIND_NODE_TIMEOUT + steps = steps + 1 + repeat + task, err = data.comm:test() + if task then + if task.type == "FIND_NODE_ANSWER" and task.answer.destination == destination then + routing_table_update(task.sender_id) + for i,v in pairs(task.answer.nodes) do + routing_table_update(v.id) + end + nodes_added = merge_answer(node_list,task.answer) + else + handle_task(task) + end + data.comm = simgrid.task.irecv(data.mailbox) + elseif err then + data.comm = simgrid.task.irecv(data.mailbox) + else + simgrid.process.sleep(1) + end + + until answers >= queries or simgrid.get_clock() >= timeout + if (#node_list.nodes > 0) then + destination_found = (node_list.nodes[1].distance == 0) + else + destination_found = false + end + until destination_found or (nodes_added > 0 and answers == 0) or simgrid.get_clock() >= global_timeout or steps >= common.MAX_STEPS + if destination_found then + simgrid.debug("Find node on " .. destination .. " succedded") + if counts then + data.find_node_succedded = data.find_node_succedded + 1 + end + else + simgrid.debug("Find node on " .. destination .. " failed") + if counts then + data.find_node_failed = data.find_node_failed + 1 + end + end + + return destination_found +end +-- Sends a "FIND_NODE" request (task) to a node we know. +function send_find_node(id, destination) + simgrid.debug("Sending a FIND_NODE to " .. id .. " to find " .. destination); + + local task = simgrid.task.new("",0, common.COMM_SIZE) + task.type = "FIND_NODE" + task.sender_id = data.id + task.destination = destination + + task:dsend(tostring(id)) + +end +-- Sends a "FIND_NODE" request to the best "alpha" nodes in a node +-- list +function send_find_node_to_best(node_list) + destination = node_list.destination + local i = 1 + while i <= common.ALPHA and i < #node_list.nodes do + if data.id ~= node_list.nodes[i].id then + send_find_node(node_list.nodes[i].id,destination) + end + i = i + 1 + end + return i - 1 +end +-- Handles an incomming task +function handle_task(task) + routing_table_update(task.sender_id) + if task.type == "FIND_NODE" then + handle_find_node(task) + elseif task.type == "PING" then + handle_ping(task) + end +end +function handle_find_node(task) + simgrid.debug("Received a FIND_NODE from " .. task.sender_id) + local answer = find_closest(task.destination) + local task_answer = simgrid.task.new("",0, common.COMM_SIZE) + task_answer.type = "FIND_NODE_ANSWER" + task_answer.sender_id = data.id + task_answer.destination = task.destination + task_answer.answer = answer + task_answer:dsend(tostring(task.sender_id)) +end +function handle_ping(task) + simgrid.info("Received a PING from " .. task.sender_id) + local task_answer = simgrid.task.new("",0, common.COMM_SIZE) + task_answer.type = "PING_ANSWER" + task_answer.sender_id = data.id + task_answer.destination = task.destination + task_answer:dsend(tostring(task.sender_id)) +end +function merge_answer(m1, m2) + local nb_added = 0 + for i,v in pairs(m2.nodes) do + table.insert(m1.nodes,v) + nb_added = nb_added + 1 + end + return nb_added +end +simgrid.platform(arg[1] or "../../msg/msg_platform.xml") +simgrid.application(arg[2] or "kademlia.xml") +simgrid.run() diff --git a/examples/lua/kademlia/kademlia.xml b/examples/lua/kademlia/kademlia.xml new file mode 100644 index 0000000000..bc7c27d352 --- /dev/null +++ b/examples/lua/kademlia/kademlia.xml @@ -0,0 +1,72 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/lua/kademlia/routing_table.lua b/examples/lua/kademlia/routing_table.lua new file mode 100644 index 0000000000..3c2b636a03 --- /dev/null +++ b/examples/lua/kademlia/routing_table.lua @@ -0,0 +1,92 @@ +-- Routing table data +routing_table = { +buckets = {}, +} +-- Initialize the routing table +function routing_table_init(id) + routing_table.id = id + -- Bucket initialization + for i = 0,common.IDENTIFIER_SIZE do + routing_table.buckets[i] = {id = i, set = {}, list = {}} + end +end +-- Returns an identifier which is in a specific bucket of a routing table +function get_id_in_prefix(id, prefix) + if id == 0 then + return 0 + end + local identifier = 1 + identifier = math.pow(identifier,prefix - 1) + identifier = bxor(identifier,id) + return identifier +end +-- Returns the corresponding node prefix for a given id +function get_node_prefix(id) + for i = 0,32 do + if is_integer(id / math.pow(2,i)) and (id / math.pow(2,i)) % 2 == 1 then + return 31 - i + end + end + return 0 +end +-- Finds the corresponding bucket in a routing table for a given identifier +function find_bucket(id) + local xor_number = bxor(id,routing_table.id) + local prefix = get_node_prefix(xor_number) + --simgrid.info("Prefix:" .. prefix .. " number:" .. xor_number) + return routing_table.buckets[prefix] +end +-- Updates the routing table with a new value. +function routing_table_update(id) + if id == routing_table.id then + return + end + local bucket = find_bucket(id) + if bucket.set[id] ~= nil then + simgrid.debug("Updating " .. id .. " in my routing table") + -- If the element is already in the bucket, we update it. + table.remove(bucket.list,index_of(bucket.list,id)) + table.insert(bucket.list,0,id) + else + simgrid.debug("Insert " .. id .. " in my routing table in bucket " .. bucket.id) + table.insert(bucket.list,id) + bucket.set[id] = true + end +end +-- Returns the closest notes we know to a given id +function find_closest(destination_id) + local answer = {destination = destination_id, nodes = {}} + + local bucket = find_bucket(destination_id) + for i,v in pairs(bucket.list) do + table.insert(answer.nodes,{id = v, distance = bxor(v,destination_id)}) + end + + local i = 1 + + while #answer.nodes < common.BUCKET_SIZE and ((bucket.id - i) >= 0 or (bucket.id + i) <= common.IDENTIFIER_SIZE) do + -- Check the previous buckets + if bucket.id - i >= 0 then + local bucket_p = routing_table.buckets[bucket.id - i] + for i,v in pairs(bucket_p.list) do + table.insert(answer.nodes,{id = v, distance = bxor(v,destination_id)}) + end + end + -- Check the next buckets + if bucket.id + i <= common.IDENTIFIER_SIZE then + local bucket_n = routing_table.buckets[bucket.id + i] + for i,v in pairs(bucket_n.list) do + table.insert(answer.nodes,{id = v, distance = bxor(v,destination_id)}) + end + end + i = i + 1 + end + -- Sort the list + table.sort(answer.nodes, function(a,b) return a.distance < b.distance end) + -- Trim the list + while #answer.nodes > common.BUCKET_SIZE do + table.remove(answer.nodes) + end + + return answer +end \ No newline at end of file diff --git a/examples/lua/kademlia/tools.lua b/examples/lua/kademlia/tools.lua new file mode 100644 index 0000000000..664c3424a2 --- /dev/null +++ b/examples/lua/kademlia/tools.lua @@ -0,0 +1,24 @@ +function bxor (a,b) + local r = 0 + for i = 0, 31 do + local x = a / 2 + b / 2 + if x ~= math.floor (x) then + r = r + 2^i + end + a = math.floor (a / 2) + b = math.floor (b / 2) + end + return r +end + +function index_of(table, o) + for i,v in pairs(table) do + if v == o then + return i; + end + end + return -1 +end +function is_integer(x) +return math.floor(x)==x +end \ No newline at end of file