Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
model-checker : update tesh
[simgrid.git] / examples / lua / kademlia / kademlia.lua
1 -- A SimGrid Lua implementation of the Kademlia protocol.
2 require("simgrid")
3
4 -- Common constants
5 common = {
6         COMM_SIZE = 1,
7         RANDOM_LOOKUP_INTERVAL = 100,
8         ALPHA = 3,
9         IDENTIFIER_SIZE = 32,
10         BUCKET_SIZE = 20,
11         MAX_JOIN_TRIALS = 4,
12         FIND_NODE_TIMEOUT = 10,
13         FIND_NODE_GLOBAL_TIMEOUT = 50,
14         PING_TIMEOUT = 35,
15         MAX_STEPS = 10,
16         JOIN_BUCKETS_QUERIES = 5
17 }
18 require("tools")
19 -- Routing table 
20 require("routing_table")
21
22 data = {
23         id = -1,
24         mailbox = "",
25         deadline = 0,
26         comm = nil,
27         find_node_succedded = 0,
28         find_node_failed = 0
29         
30 }
31
32 function node(...) 
33         local args = {...}
34         
35         if #args ~= 2 and #args ~= 3 then
36                 simgrid.info("Wrong argument count: " .. #args)
37                 return
38         end
39         data.id = tonumber(args[1])
40         routing_table_init(data.id)
41         data.mailbox = tostring(data.id)
42         if #args == 3 then
43                 data.deadline = tonumber(args[3])
44                 simgrid.info("Hi, I'm going to join the network with the id " .. data.id .. " !")
45                 if join_network(tonumber(args[2])) then
46                         main_loop()
47                 else
48                         simgrid.info("Couldn't join the network")
49                 end
50         else
51                 data.deadline = tonumber(args[2])
52                 routing_table_update(data.id)
53                 data.comm = simgrid.task.irecv(data.mailbox)            
54                 main_loop()
55         end
56         simgrid.info(data.find_node_succedded .. "/" .. (data.find_node_succedded + data.find_node_failed) .. " FIND_NODE have succedded");
57         simgrid.process.sleep(10000)
58 end     
59 function main_loop()
60         local next_lookup_time = simgrid.get_clock() + common.RANDOM_LOOKUP_INTERVAL
61         local now = simgrid.get_clock() 
62         while now < data.deadline do
63                 task,err = data.comm:test()
64                 if task then
65                         handle_task(task)
66                         data.comm = simgrid.task.irecv(data.mailbox)            
67                 elseif err then
68                         data.comm = simgrid.task.irecv(data.mailbox)                            
69                 else
70                         if now >= next_lookup_time then
71                                 random_lookup()
72                                 next_lookup_time = next_lookup_time + common.RANDOM_LOOKUP_INTERVAL
73                                 now = simgrid.get_clock()
74                         else
75                                 simgrid.process.sleep(1)
76                                 now = simgrid.get_clock()
77                         end
78                 end
79         end
80 end
81 function random_lookup()
82         find_node(0,true)
83 end
84 function join_network(id_known)
85         local answer_got = false
86         local time_begin = simgrid.get_clock()
87         
88         simgrid.debug("Joining the network knowing " .. id_known)
89         
90         routing_table_update(data.id)
91         routing_table_update(id_known)
92         
93         -- Send a FIND_NODE to the node we know
94         send_find_node(id_known,data.id)
95         -- Wait for the answer
96         local trials = 0
97         
98         data.comm = simgrid.task.irecv(data.mailbox)
99         
100         repeat 
101                 task,err = data.comm:test()
102                 if task then
103                         if task.type == "FIND_NODE_ANSWER" then
104                                 answer_got = true
105                                 local answer = task.answer
106                                 -- Add the nodes we received to our routing table
107                                 for i,v in pairs(answer.nodes) do
108                                         routing_table_update(v.id)
109                                 end
110                         else
111                                 handle_task(task)                               
112                         end
113                         data.comm = simgrid.task.irecv(data.mailbox)
114                 elseif err then 
115                         data.comm = simgrid.task.irecv(data.mailbox)
116                 else
117                         simgrid.process.sleep(1)
118                 end
119         until answer_got or trials >= common.MAX_JOIN_TRIALS
120         -- Second step: Send a FIND_NODE in a node in each bucket
121         local bucket_id = find_bucket(id_known).id
122         local i = 0
123         while ((bucket_id - i) > 0 or (bucket_id + i) <= common.IDENTIFIER_SIZE) and i < common.JOIN_BUCKETS_QUERIES do
124                 if bucket_id - i > 0 then
125                         local id_in_bucket = get_id_in_prefix(data.id,bucket_id - i)
126                         find_node(id_in_bucket,false)
127                 end
128                 if bucket_id + i <= common.IDENTIFIER_SIZE then
129                         local id_in_bucket = get_id_in_prefix(data.id,bucket_id + i)
130                         find_node(id_in_bucket,false)
131                 end
132                 i = i + 1
133         end
134         return answer_got
135 end
136 -- Send a request to find a node in the node's routing table.
137 function find_node(destination, counts)
138         local queries, answers
139         local total_answers = 0
140         total_queries = 0
141         local nodes_added = 0
142         local destination_found = false
143         local steps = 0
144         local timeout
145         local global_timeout = simgrid.get_clock() + common.FIND_NODE_GLOBAL_TIMEOUT
146         -- Build a list of the closest nodes we already know.
147         local node_list = find_closest(destination)
148         
149         simgrid.debug("Doing a FIND_NODE on " .. destination)
150         repeat
151                 answers = 0
152                 queries = send_find_node_to_best(node_list)
153                 nodes_added = 0
154                 timeout = simgrid.get_clock() + common.FIND_NODE_TIMEOUT
155                 steps = steps + 1
156                 repeat
157                         task, err = data.comm:test()
158                         if task then
159                                 if task.type == "FIND_NODE_ANSWER" and task.answer.destination == destination then
160                                         routing_table_update(task.sender_id)
161                                         for i,v in pairs(task.answer.nodes) do
162                                                 routing_table_update(v.id)
163                                         end
164                                         nodes_added = merge_answer(node_list,task.answer)
165                                 else
166                                         handle_task(task)
167                                 end
168                                 data.comm = simgrid.task.irecv(data.mailbox)                    
169                         elseif err then
170                                 data.comm = simgrid.task.irecv(data.mailbox)
171                         else
172                                 simgrid.process.sleep(1)
173                         end
174                         
175                 until answers >= queries or simgrid.get_clock() >= timeout
176                 if (#node_list.nodes > 0) then
177                         destination_found = (node_list.nodes[1].distance == 0)
178                 else
179                         destination_found = false
180                 end
181         until destination_found or (nodes_added > 0 and answers == 0) or simgrid.get_clock() >= global_timeout or steps >= common.MAX_STEPS
182         if destination_found then
183                 simgrid.debug("Find node on " .. destination .. " succedded")
184                 if counts then
185                         data.find_node_succedded = data.find_node_succedded + 1
186                 end
187         else
188                 simgrid.debug("Find node on " .. destination .. " failed")
189                 if counts then
190                         data.find_node_failed = data.find_node_failed + 1
191                 end
192         end
193
194         return destination_found
195 end
196 -- Sends a "FIND_NODE" request (task) to a node we know.
197 function send_find_node(id, destination)
198         simgrid.debug("Sending a FIND_NODE to " .. id .. " to find " .. destination);
199         
200         local task = simgrid.task.new("",0, common.COMM_SIZE)
201         task.type = "FIND_NODE"
202         task.sender_id = data.id
203         task.destination = destination
204                 
205         task:dsend(tostring(id))
206         
207 end
208 -- Sends a "FIND_NODE" request to the best "alpha" nodes in a node
209 -- list
210 function send_find_node_to_best(node_list)
211         destination = node_list.destination
212         local i = 1
213         while i <= common.ALPHA and i < #node_list.nodes do
214                 if data.id ~= node_list.nodes[i].id then
215                         send_find_node(node_list.nodes[i].id,destination)
216                 end
217                 i = i + 1
218         end
219         return i - 1
220 end
221 -- Handles an incomming task
222 function handle_task(task)
223         routing_table_update(task.sender_id)
224         if task.type == "FIND_NODE" then
225                 handle_find_node(task)
226         elseif task.type == "PING" then
227                 handle_ping(task)
228         end
229 end
230 function handle_find_node(task)
231         simgrid.debug("Received a FIND_NODE from " .. task.sender_id)
232         local answer = find_closest(task.destination)
233         local task_answer = simgrid.task.new("",0, common.COMM_SIZE)
234         task_answer.type = "FIND_NODE_ANSWER"
235         task_answer.sender_id = data.id
236         task_answer.destination = task.destination
237         task_answer.answer = answer
238         task_answer:dsend(tostring(task.sender_id))     
239 end     
240 function handle_ping(task)
241         simgrid.info("Received a PING from " .. task.sender_id)
242         local task_answer = simgrid.task.new("",0, common.COMM_SIZE)
243         task_answer.type = "PING_ANSWER"
244         task_answer.sender_id = data.id
245         task_answer.destination = task.destination
246         task_answer:dsend(tostring(task.sender_id))
247 end
248 function merge_answer(m1, m2)
249         local nb_added = 0
250         for i,v in pairs(m2.nodes) do
251                 table.insert(m1.nodes,v)
252                 nb_added = nb_added + 1
253         end
254         return nb_added
255 end
256 simgrid.platform(arg[1] or  "../../msg/msg_platform.xml")
257 simgrid.application(arg[2] or "kademlia.xml")
258 simgrid.run()