+++ /dev/null
--- A SimGrid Lua implementation of the Kademlia protocol.
-
--- Copyright (c) 2012, 2014. The SimGrid Team.
--- All rights reserved.
-
--- This program is free software; you can redistribute it and/or modify it
--- under the terms of the license (GNU LGPL) which comes with this package.
-
-require("simgrid")
-
--- Common constants
-common = {
- COMM_SIZE = 1,
- RANDOM_LOOKUP_INTERVAL = 100,
- ALPHA = 3,
- IDENTIFIER_SIZE = 32,
- BUCKET_SIZE = 20,
- MAX_JOIN_TRIALS = 4,
- FIND_NODE_TIMEOUT = 10,
- FIND_NODE_GLOBAL_TIMEOUT = 50,
- PING_TIMEOUT = 35,
- MAX_STEPS = 10,
- JOIN_BUCKETS_QUERIES = 5
-}
-require("tools")
--- Routing table
-require("routing_table")
-
-data = {
- id = -1,
- mailbox = "",
- deadline = 0,
- comm = nil,
- find_node_succedded = 0,
- find_node_failed = 0
-
-}
-
-function node(...)
- local args = {...}
-
- if #args ~= 2 and #args ~= 3 then
- simgrid.info("Wrong argument count: " .. #args)
- return
- end
- data.id = tonumber(args[1])
- routing_table_init(data.id)
- data.mailbox = tostring(data.id)
- if #args == 3 then
- data.deadline = tonumber(args[3])
- simgrid.info("Hi, I'm going to join the network with the id " .. data.id .. " !")
- if join_network(tonumber(args[2])) then
- main_loop()
- else
- simgrid.info("Couldn't join the network")
- end
- else
- data.deadline = tonumber(args[2])
- routing_table_update(data.id)
- data.comm = simgrid.task.irecv(data.mailbox)
- main_loop()
- end
- simgrid.info(data.find_node_succedded .. "/" .. (data.find_node_succedded + data.find_node_failed) .. " FIND_NODE have succedded");
- simgrid.process.sleep(10000)
-end
-function main_loop()
- local next_lookup_time = simgrid.get_clock() + common.RANDOM_LOOKUP_INTERVAL
- local now = simgrid.get_clock()
- while now < data.deadline do
- task,err = data.comm:test()
- if task then
- handle_task(task)
- data.comm = simgrid.task.irecv(data.mailbox)
- elseif err then
- data.comm = simgrid.task.irecv(data.mailbox)
- else
- if now >= next_lookup_time then
- random_lookup()
- next_lookup_time = next_lookup_time + common.RANDOM_LOOKUP_INTERVAL
- now = simgrid.get_clock()
- else
- simgrid.process.sleep(1)
- now = simgrid.get_clock()
- end
- end
- end
-end
-function random_lookup()
- find_node(0,true)
-end
-function join_network(id_known)
- local answer_got = false
- local time_begin = simgrid.get_clock()
-
- simgrid.debug("Joining the network knowing " .. id_known)
-
- routing_table_update(data.id)
- routing_table_update(id_known)
-
- -- Send a FIND_NODE to the node we know
- send_find_node(id_known,data.id)
- -- Wait for the answer
- local trials = 0
-
- data.comm = simgrid.task.irecv(data.mailbox)
-
- repeat
- task,err = data.comm:test()
- if task then
- if task.type == "FIND_NODE_ANSWER" then
- answer_got = true
- local answer = task.answer
- -- Add the nodes we received to our routing table
- for i,v in pairs(answer.nodes) do
- routing_table_update(v.id)
- end
- else
- handle_task(task)
- end
- data.comm = simgrid.task.irecv(data.mailbox)
- elseif err then
- data.comm = simgrid.task.irecv(data.mailbox)
- else
- simgrid.process.sleep(1)
- end
- until answer_got or trials >= common.MAX_JOIN_TRIALS
- -- Second step: Send a FIND_NODE in a node in each bucket
- local bucket_id = find_bucket(id_known).id
- local i = 0
- while ((bucket_id - i) > 0 or (bucket_id + i) <= common.IDENTIFIER_SIZE) and i < common.JOIN_BUCKETS_QUERIES do
- if bucket_id - i > 0 then
- local id_in_bucket = get_id_in_prefix(data.id,bucket_id - i)
- find_node(id_in_bucket,false)
- end
- if bucket_id + i <= common.IDENTIFIER_SIZE then
- local id_in_bucket = get_id_in_prefix(data.id,bucket_id + i)
- find_node(id_in_bucket,false)
- end
- i = i + 1
- end
- return answer_got
-end
--- Send a request to find a node in the node's routing table.
-function find_node(destination, counts)
- local queries, answers
- local total_answers = 0
- total_queries = 0
- local nodes_added = 0
- local destination_found = false
- local steps = 0
- local timeout
- local global_timeout = simgrid.get_clock() + common.FIND_NODE_GLOBAL_TIMEOUT
- -- Build a list of the closest nodes we already know.
- local node_list = find_closest(destination)
-
- simgrid.debug("Doing a FIND_NODE on " .. destination)
- repeat
- answers = 0
- queries = send_find_node_to_best(node_list)
- nodes_added = 0
- timeout = simgrid.get_clock() + common.FIND_NODE_TIMEOUT
- steps = steps + 1
- repeat
- task, err = data.comm:test()
- if task then
- if task.type == "FIND_NODE_ANSWER" and task.answer.destination == destination then
- routing_table_update(task.sender_id)
- for i,v in pairs(task.answer.nodes) do
- routing_table_update(v.id)
- end
- nodes_added = merge_answer(node_list,task.answer)
- else
- handle_task(task)
- end
- data.comm = simgrid.task.irecv(data.mailbox)
- elseif err then
- data.comm = simgrid.task.irecv(data.mailbox)
- else
- simgrid.process.sleep(1)
- end
-
- until answers >= queries or simgrid.get_clock() >= timeout
- if (#node_list.nodes > 0) then
- destination_found = (node_list.nodes[1].distance == 0)
- else
- destination_found = false
- end
- until destination_found or (nodes_added > 0 and answers == 0) or simgrid.get_clock() >= global_timeout or steps >= common.MAX_STEPS
- if destination_found then
- simgrid.debug("Find node on " .. destination .. " succedded")
- if counts then
- data.find_node_succedded = data.find_node_succedded + 1
- end
- else
- simgrid.debug("Find node on " .. destination .. " failed")
- if counts then
- data.find_node_failed = data.find_node_failed + 1
- end
- end
-
- return destination_found
-end
--- Sends a "FIND_NODE" request (task) to a node we know.
-function send_find_node(id, destination)
- simgrid.debug("Sending a FIND_NODE to " .. id .. " to find " .. destination);
-
- local task = simgrid.task.new("",0, common.COMM_SIZE)
- task.type = "FIND_NODE"
- task.sender_id = data.id
- task.destination = destination
-
- task:dsend(tostring(id))
-
-end
--- Sends a "FIND_NODE" request to the best "alpha" nodes in a node
--- list
-function send_find_node_to_best(node_list)
- destination = node_list.destination
- local i = 1
- while i <= common.ALPHA and i < #node_list.nodes do
- if data.id ~= node_list.nodes[i].id then
- send_find_node(node_list.nodes[i].id,destination)
- end
- i = i + 1
- end
- return i - 1
-end
--- Handles an incomming task
-function handle_task(task)
- routing_table_update(task.sender_id)
- if task.type == "FIND_NODE" then
- handle_find_node(task)
- elseif task.type == "PING" then
- handle_ping(task)
- end
-end
-function handle_find_node(task)
- simgrid.debug("Received a FIND_NODE from " .. task.sender_id)
- local answer = find_closest(task.destination)
- local task_answer = simgrid.task.new("",0, common.COMM_SIZE)
- task_answer.type = "FIND_NODE_ANSWER"
- task_answer.sender_id = data.id
- task_answer.destination = task.destination
- task_answer.answer = answer
- task_answer:dsend(tostring(task.sender_id))
-end
-function handle_ping(task)
- simgrid.info("Received a PING from " .. task.sender_id)
- local task_answer = simgrid.task.new("",0, common.COMM_SIZE)
- task_answer.type = "PING_ANSWER"
- task_answer.sender_id = data.id
- task_answer.destination = task.destination
- task_answer:dsend(tostring(task.sender_id))
-end
-function merge_answer(m1, m2)
- local nb_added = 0
- for i,v in pairs(m2.nodes) do
- table.insert(m1.nodes,v)
- nb_added = nb_added + 1
- end
- return nb_added
-end
-simgrid.platform(arg[1] or "../../platforms/platform.xml")
-simgrid.application(arg[2] or "kademlia.xml")
-simgrid.run()