1 -- A SimGrid Lua implementation of the Kademlia protocol.
3 -- Copyright (c) 2012, 2014. The SimGrid Team.
4 -- All rights reserved.
6 -- This program is free software; you can redistribute it and/or modify it
7 -- under the terms of the license (GNU LGPL) which comes with this package.
14 RANDOM_LOOKUP_INTERVAL = 100,
19 FIND_NODE_TIMEOUT = 10,
20 FIND_NODE_GLOBAL_TIMEOUT = 50,
23 JOIN_BUCKETS_QUERIES = 5
27 require("routing_table")
34 find_node_succedded = 0,
42 if #args ~= 2 and #args ~= 3 then
43 simgrid.info("Wrong argument count: " .. #args)
46 data.id = tonumber(args[1])
47 routing_table_init(data.id)
48 data.mailbox = tostring(data.id)
50 data.deadline = tonumber(args[3])
51 simgrid.info("Hi, I'm going to join the network with the id " .. data.id .. " !")
52 if join_network(tonumber(args[2])) then
55 simgrid.info("Couldn't join the network")
58 data.deadline = tonumber(args[2])
59 routing_table_update(data.id)
60 data.comm = simgrid.task.irecv(data.mailbox)
63 simgrid.info(data.find_node_succedded .. "/" .. (data.find_node_succedded + data.find_node_failed) .. " FIND_NODE have succedded");
64 simgrid.process.sleep(10000)
67 local next_lookup_time = simgrid.get_clock() + common.RANDOM_LOOKUP_INTERVAL
68 local now = simgrid.get_clock()
69 while now < data.deadline do
70 task,err = data.comm:test()
73 data.comm = simgrid.task.irecv(data.mailbox)
75 data.comm = simgrid.task.irecv(data.mailbox)
77 if now >= next_lookup_time then
79 next_lookup_time = next_lookup_time + common.RANDOM_LOOKUP_INTERVAL
80 now = simgrid.get_clock()
82 simgrid.process.sleep(1)
83 now = simgrid.get_clock()
88 function random_lookup()
91 function join_network(id_known)
92 local answer_got = false
93 local time_begin = simgrid.get_clock()
95 simgrid.debug("Joining the network knowing " .. id_known)
97 routing_table_update(data.id)
98 routing_table_update(id_known)
100 -- Send a FIND_NODE to the node we know
101 send_find_node(id_known,data.id)
102 -- Wait for the answer
105 data.comm = simgrid.task.irecv(data.mailbox)
108 task,err = data.comm:test()
110 if task.type == "FIND_NODE_ANSWER" then
112 local answer = task.answer
113 -- Add the nodes we received to our routing table
114 for i,v in pairs(answer.nodes) do
115 routing_table_update(v.id)
120 data.comm = simgrid.task.irecv(data.mailbox)
122 data.comm = simgrid.task.irecv(data.mailbox)
124 simgrid.process.sleep(1)
126 until answer_got or trials >= common.MAX_JOIN_TRIALS
127 -- Second step: Send a FIND_NODE in a node in each bucket
128 local bucket_id = find_bucket(id_known).id
130 while ((bucket_id - i) > 0 or (bucket_id + i) <= common.IDENTIFIER_SIZE) and i < common.JOIN_BUCKETS_QUERIES do
131 if bucket_id - i > 0 then
132 local id_in_bucket = get_id_in_prefix(data.id,bucket_id - i)
133 find_node(id_in_bucket,false)
135 if bucket_id + i <= common.IDENTIFIER_SIZE then
136 local id_in_bucket = get_id_in_prefix(data.id,bucket_id + i)
137 find_node(id_in_bucket,false)
143 -- Send a request to find a node in the node's routing table.
144 function find_node(destination, counts)
145 local queries, answers
146 local total_answers = 0
148 local nodes_added = 0
149 local destination_found = false
152 local global_timeout = simgrid.get_clock() + common.FIND_NODE_GLOBAL_TIMEOUT
153 -- Build a list of the closest nodes we already know.
154 local node_list = find_closest(destination)
156 simgrid.debug("Doing a FIND_NODE on " .. destination)
159 queries = send_find_node_to_best(node_list)
161 timeout = simgrid.get_clock() + common.FIND_NODE_TIMEOUT
164 task, err = data.comm:test()
166 if task.type == "FIND_NODE_ANSWER" and task.answer.destination == destination then
167 routing_table_update(task.sender_id)
168 for i,v in pairs(task.answer.nodes) do
169 routing_table_update(v.id)
171 nodes_added = merge_answer(node_list,task.answer)
175 data.comm = simgrid.task.irecv(data.mailbox)
177 data.comm = simgrid.task.irecv(data.mailbox)
179 simgrid.process.sleep(1)
182 until answers >= queries or simgrid.get_clock() >= timeout
183 if (#node_list.nodes > 0) then
184 destination_found = (node_list.nodes[1].distance == 0)
186 destination_found = false
188 until destination_found or (nodes_added > 0 and answers == 0) or simgrid.get_clock() >= global_timeout or steps >= common.MAX_STEPS
189 if destination_found then
190 simgrid.debug("Find node on " .. destination .. " succedded")
192 data.find_node_succedded = data.find_node_succedded + 1
195 simgrid.debug("Find node on " .. destination .. " failed")
197 data.find_node_failed = data.find_node_failed + 1
201 return destination_found
203 -- Sends a "FIND_NODE" request (task) to a node we know.
204 function send_find_node(id, destination)
205 simgrid.debug("Sending a FIND_NODE to " .. id .. " to find " .. destination);
207 local task = simgrid.task.new("",0, common.COMM_SIZE)
208 task.type = "FIND_NODE"
209 task.sender_id = data.id
210 task.destination = destination
212 task:dsend(tostring(id))
215 -- Sends a "FIND_NODE" request to the best "alpha" nodes in a node
217 function send_find_node_to_best(node_list)
218 destination = node_list.destination
220 while i <= common.ALPHA and i < #node_list.nodes do
221 if data.id ~= node_list.nodes[i].id then
222 send_find_node(node_list.nodes[i].id,destination)
228 -- Handles an incomming task
229 function handle_task(task)
230 routing_table_update(task.sender_id)
231 if task.type == "FIND_NODE" then
232 handle_find_node(task)
233 elseif task.type == "PING" then
237 function handle_find_node(task)
238 simgrid.debug("Received a FIND_NODE from " .. task.sender_id)
239 local answer = find_closest(task.destination)
240 local task_answer = simgrid.task.new("",0, common.COMM_SIZE)
241 task_answer.type = "FIND_NODE_ANSWER"
242 task_answer.sender_id = data.id
243 task_answer.destination = task.destination
244 task_answer.answer = answer
245 task_answer:dsend(tostring(task.sender_id))
247 function handle_ping(task)
248 simgrid.info("Received a PING from " .. task.sender_id)
249 local task_answer = simgrid.task.new("",0, common.COMM_SIZE)
250 task_answer.type = "PING_ANSWER"
251 task_answer.sender_id = data.id
252 task_answer.destination = task.destination
253 task_answer:dsend(tostring(task.sender_id))
255 function merge_answer(m1, m2)
257 for i,v in pairs(m2.nodes) do
258 table.insert(m1.nodes,v)
259 nb_added = nb_added + 1
263 simgrid.platform(arg[1] or "../../platforms/platform.xml")
264 simgrid.application(arg[2] or "kademlia.xml")