Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix xmls for lua examples
[simgrid.git] / examples / lua / kademlia / kademlia.lua
1 -- A SimGrid Lua implementation of the Kademlia protocol.
2
3 -- Copyright (c) 2012, 2014. The SimGrid Team.
4 -- All rights reserved.
5
6 -- This program is free software; you can redistribute it and/or modify it
7 -- under the terms of the license (GNU LGPL) which comes with this package.
8
9 require("simgrid")
10
11 -- Common constants
12 common = {
13         COMM_SIZE = 1,
14         RANDOM_LOOKUP_INTERVAL = 100,
15         ALPHA = 3,
16         IDENTIFIER_SIZE = 32,
17         BUCKET_SIZE = 20,
18         MAX_JOIN_TRIALS = 4,
19         FIND_NODE_TIMEOUT = 10,
20         FIND_NODE_GLOBAL_TIMEOUT = 50,
21         PING_TIMEOUT = 35,
22         MAX_STEPS = 10,
23         JOIN_BUCKETS_QUERIES = 5
24 }
25 require("tools")
26 -- Routing table
27 require("routing_table")
28
29 data = {
30         id = -1,
31         mailbox = "",
32         deadline = 0,
33         comm = nil,
34         find_node_succedded = 0,
35         find_node_failed = 0
36
37 }
38
39 function node(...)
40         local args = {...}
41
42         if #args ~= 2 and #args ~= 3 then
43                 simgrid.info("Wrong argument count: " .. #args)
44                 return
45         end
46         data.id = tonumber(args[1])
47         routing_table_init(data.id)
48         data.mailbox = tostring(data.id)
49         if #args == 3 then
50                 data.deadline = tonumber(args[3])
51                 simgrid.info("Hi, I'm going to join the network with the id " .. data.id .. " !")
52                 if join_network(tonumber(args[2])) then
53                         main_loop()
54                 else
55                         simgrid.info("Couldn't join the network")
56                 end
57         else
58                 data.deadline = tonumber(args[2])
59                 routing_table_update(data.id)
60                 data.comm = simgrid.task.irecv(data.mailbox)
61                 main_loop()
62         end
63         simgrid.info(data.find_node_succedded .. "/" .. (data.find_node_succedded + data.find_node_failed) .. " FIND_NODE have succedded");
64         simgrid.process.sleep(10000)
65 end
66 function main_loop()
67         local next_lookup_time = simgrid.get_clock() + common.RANDOM_LOOKUP_INTERVAL
68         local now = simgrid.get_clock()
69         while now < data.deadline do
70                 task,err = data.comm:test()
71                 if task then
72                         handle_task(task)
73                         data.comm = simgrid.task.irecv(data.mailbox)
74                 elseif err then
75                         data.comm = simgrid.task.irecv(data.mailbox)
76                 else
77                         if now >= next_lookup_time then
78                                 random_lookup()
79                                 next_lookup_time = next_lookup_time + common.RANDOM_LOOKUP_INTERVAL
80                                 now = simgrid.get_clock()
81                         else
82                                 simgrid.process.sleep(1)
83                                 now = simgrid.get_clock()
84                         end
85                 end
86         end
87 end
88 function random_lookup()
89         find_node(0,true)
90 end
91 function join_network(id_known)
92         local answer_got = false
93         local time_begin = simgrid.get_clock()
94
95         simgrid.debug("Joining the network knowing " .. id_known)
96
97         routing_table_update(data.id)
98         routing_table_update(id_known)
99
100         -- Send a FIND_NODE to the node we know
101         send_find_node(id_known,data.id)
102         -- Wait for the answer
103         local trials = 0
104
105         data.comm = simgrid.task.irecv(data.mailbox)
106
107         repeat
108                 task,err = data.comm:test()
109                 if task then
110                         if task.type == "FIND_NODE_ANSWER" then
111                                 answer_got = true
112                                 local answer = task.answer
113                                 -- Add the nodes we received to our routing table
114                                 for i,v in pairs(answer.nodes) do
115                                         routing_table_update(v.id)
116                                 end
117                         else
118                                 handle_task(task)
119                         end
120                         data.comm = simgrid.task.irecv(data.mailbox)
121                 elseif err then
122                         data.comm = simgrid.task.irecv(data.mailbox)
123                 else
124                         simgrid.process.sleep(1)
125                 end
126         until answer_got or trials >= common.MAX_JOIN_TRIALS
127         -- Second step: Send a FIND_NODE in a node in each bucket
128         local bucket_id = find_bucket(id_known).id
129         local i = 0
130         while ((bucket_id - i) > 0 or (bucket_id + i) <= common.IDENTIFIER_SIZE) and i < common.JOIN_BUCKETS_QUERIES do
131                 if bucket_id - i > 0 then
132                         local id_in_bucket = get_id_in_prefix(data.id,bucket_id - i)
133                         find_node(id_in_bucket,false)
134                 end
135                 if bucket_id + i <= common.IDENTIFIER_SIZE then
136                         local id_in_bucket = get_id_in_prefix(data.id,bucket_id + i)
137                         find_node(id_in_bucket,false)
138                 end
139                 i = i + 1
140         end
141         return answer_got
142 end
143 -- Send a request to find a node in the node's routing table.
144 function find_node(destination, counts)
145         local queries, answers
146         local total_answers = 0
147         total_queries = 0
148         local nodes_added = 0
149         local destination_found = false
150         local steps = 0
151         local timeout
152         local global_timeout = simgrid.get_clock() + common.FIND_NODE_GLOBAL_TIMEOUT
153         -- Build a list of the closest nodes we already know.
154         local node_list = find_closest(destination)
155
156         simgrid.debug("Doing a FIND_NODE on " .. destination)
157         repeat
158                 answers = 0
159                 queries = send_find_node_to_best(node_list)
160                 nodes_added = 0
161                 timeout = simgrid.get_clock() + common.FIND_NODE_TIMEOUT
162                 steps = steps + 1
163                 repeat
164                         task, err = data.comm:test()
165                         if task then
166                                 if task.type == "FIND_NODE_ANSWER" and task.answer.destination == destination then
167                                         routing_table_update(task.sender_id)
168                                         for i,v in pairs(task.answer.nodes) do
169                                                 routing_table_update(v.id)
170                                         end
171                                         nodes_added = merge_answer(node_list,task.answer)
172                                 else
173                                         handle_task(task)
174                                 end
175                                 data.comm = simgrid.task.irecv(data.mailbox)
176                         elseif err then
177                                 data.comm = simgrid.task.irecv(data.mailbox)
178                         else
179                                 simgrid.process.sleep(1)
180                         end
181
182                 until answers >= queries or simgrid.get_clock() >= timeout
183                 if (#node_list.nodes > 0) then
184                         destination_found = (node_list.nodes[1].distance == 0)
185                 else
186                         destination_found = false
187                 end
188         until destination_found or (nodes_added > 0 and answers == 0) or simgrid.get_clock() >= global_timeout or steps >= common.MAX_STEPS
189         if destination_found then
190                 simgrid.debug("Find node on " .. destination .. " succedded")
191                 if counts then
192                         data.find_node_succedded = data.find_node_succedded + 1
193                 end
194         else
195                 simgrid.debug("Find node on " .. destination .. " failed")
196                 if counts then
197                         data.find_node_failed = data.find_node_failed + 1
198                 end
199         end
200
201         return destination_found
202 end
203 -- Sends a "FIND_NODE" request (task) to a node we know.
204 function send_find_node(id, destination)
205         simgrid.debug("Sending a FIND_NODE to " .. id .. " to find " .. destination);
206
207         local task = simgrid.task.new("",0, common.COMM_SIZE)
208         task.type = "FIND_NODE"
209         task.sender_id = data.id
210         task.destination = destination
211
212         task:dsend(tostring(id))
213
214 end
215 -- Sends a "FIND_NODE" request to the best "alpha" nodes in a node
216 -- list
217 function send_find_node_to_best(node_list)
218         destination = node_list.destination
219         local i = 1
220         while i <= common.ALPHA and i < #node_list.nodes do
221                 if data.id ~= node_list.nodes[i].id then
222                         send_find_node(node_list.nodes[i].id,destination)
223                 end
224                 i = i + 1
225         end
226         return i - 1
227 end
228 -- Handles an incomming task
229 function handle_task(task)
230         routing_table_update(task.sender_id)
231         if task.type == "FIND_NODE" then
232                 handle_find_node(task)
233         elseif task.type == "PING" then
234                 handle_ping(task)
235         end
236 end
237 function handle_find_node(task)
238         simgrid.debug("Received a FIND_NODE from " .. task.sender_id)
239         local answer = find_closest(task.destination)
240         local task_answer = simgrid.task.new("",0, common.COMM_SIZE)
241         task_answer.type = "FIND_NODE_ANSWER"
242         task_answer.sender_id = data.id
243         task_answer.destination = task.destination
244         task_answer.answer = answer
245         task_answer:dsend(tostring(task.sender_id))
246 end
247 function handle_ping(task)
248         simgrid.info("Received a PING from " .. task.sender_id)
249         local task_answer = simgrid.task.new("",0, common.COMM_SIZE)
250         task_answer.type = "PING_ANSWER"
251         task_answer.sender_id = data.id
252         task_answer.destination = task.destination
253         task_answer:dsend(tostring(task.sender_id))
254 end
255 function merge_answer(m1, m2)
256         local nb_added = 0
257         for i,v in pairs(m2.nodes) do
258                 table.insert(m1.nodes,v)
259                 nb_added = nb_added + 1
260         end
261         return nb_added
262 end
263 simgrid.platform(arg[1] or  "../../platforms/platform.xml")
264 simgrid.application(arg[2] or "kademlia.xml")
265 simgrid.run()