Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
bb763bde61f9556790986923f51fa1c7d6e05172
[simgrid.git] / examples / lua / chord / chord.lua
1 -- A SimGrid Lua implementation of the Chord DHT
2
3 require("simgrid")
4
5 nb_bits = 24
6 nb_keys = 2^nb_bits
7 comp_size = 0
8 comm_size = 10
9 timeout = 50
10 max_simulation_time = 1000
11 stabilize_delay = 20
12 fix_fingers_delay = 120
13 check_predecessor_delay = 120
14 lookup_delay = 10
15
16 -- current node (don't worry, globals are duplicated in each process)
17 my_node = {
18   id = my_id,
19   next_finger_to_fix = 1,
20   fingers = {},
21   predecessor = nil,
22   comm_recv = nil
23 }
24
25 -- Main function of each Chord process
26 -- Arguments:
27 -- - my id
28 -- - the id of a guy I know in the system (except for the first node)
29 function node(...)
30
31   -- TODO simplify the parameters
32   local known_id
33   local args = {...}
34   my_node.id = tonumber(args[1])
35   if #args == 4 then
36     known_id = tonumber(args[2])
37   end
38
39   -- initialize the node
40   for i = 1, nb_bits do
41     my_node.fingers[i] = my_node.id
42   end
43   my_node.comm_recv = simgrid.task.irecv(my_node.id)
44
45   -- join the ring
46   local join_success = false
47   if known_id == nil then
48     -- first node
49     create()
50     join_success = true
51   else
52     join_success = join(known_id)
53   end
54
55   -- main loop
56   if join_success then
57
58     local now = simgrid.get_clock()
59     local next_stabilize_date = now + stabilize_delay
60     local next_fix_fingers_date = now + fix_fingers_delay
61     local next_check_predecessor_date = now + check_predecessor_delay
62     local next_lookup_date = now + lookup_delay
63
64     local task, success
65
66     while now < max_simulation_time do
67
68       task, success = simgrid.comm.test(my_node.comm_recv)
69
70       if task then
71         -- I received a task: answer it
72         my_node.comm_recv = simgrid.task.irecv(my_node.id)
73         handle_task(task)
74       elseif failed then
75         -- the communication has failed: nevermind
76         my_node.comm_recv = simgrid.task.irecv(my_node.id)
77       else
78         -- no task was received: do periodic calls
79         if now >= next_stabilize_date then
80           stabilize()
81           next_stabilize_date = simgrid.get_clock() + stabilize_delay
82
83         elseif now >= next_fix_fingers_date then
84           fix_fingers()
85           next_fix_fingers_date = simgrid.get_clock() + fix_fingers_delay
86
87         elseif now >= next_check_predecessor_date then
88           check_predecessor()
89           next_check_predecessor_date = simgrid.get_clock() + check_predecessor_delay
90         elseif now >= next_lookup_date then
91           random_lookup()
92           next_lookup_date = simgrid.get_clock() + lookup_delay
93
94         else
95           -- nothing to do: sleep for a while
96           simgrid.process.sleep(5)
97         end
98       end
99       now = simgrid.get_clock()
100     end -- while
101
102     -- leave the ring
103     leave()
104   end
105 end
106
107 -- Makes the current node leave the ring
108 function leave()
109
110   simgrid.info("Leaving the ring")
111   -- TODO: notify others
112 end
113
114 -- This function is called when the current node receives a task.
115 -- - task: the task received
116 function handle_task(task)
117
118   local type = task.type
119
120   if type == "find successor" then
121
122     simgrid.info("Received a 'find successor' request from " .. task.answer_to ..
123         " for id " .. task.request_id)
124
125     -- is my successor the successor?
126     if is_in_interval(task.request_id, my_node.id + 1, my_node.fingers[1]) then
127
128       simgrid.info("Sending back a 'find successor answer' to " ..
129           task.answer_to .. ": the successor of " .. task.request_id ..
130           " is " .. my_node.fingers[1])
131
132       local ans_task = simgrid.task.new("", comp_size, comm_size)
133       ans_task.type = "find successor answer"
134       ans_task.request_id = task.request_id
135       ans_task.answer = my_node.fingers[1]
136       ans_task:dsend(task.answer_to)
137     else
138       -- forward the request to the closest preceding finger in my table
139
140       simgrid.info("Forwarding the 'find successor' request to my closest preceding finger")
141
142       local next_task = simgrid.task.new("", comp_size, comm_size)
143       next_task.type = "find successor"
144       next_task.request_id = task.request_id
145       next_task.answer_to = task.answer_to
146       next_task:dsend(closest_preceding_node(next_task.request_id))
147     end
148
149   elseif type == "get predecessor" then
150     local ans_task = simgrid.task.new("", comp_size, comm_size)
151     ans_task.type = "get predecessor answer"
152     ans_task.answer = my_node.predecessor
153     ans_task:dsend(task.answer_to)
154
155   elseif type == "notify" then
156     -- someone is telling me that he may be my new predecessor
157     notify(task.request_id)
158
159   elseif type == "predecessor leaving" then
160     -- TODO
161
162   elseif type == "successor_leaving" then
163     -- TODO
164
165   elseif type == "find successor answer" then
166     -- ignoring
167
168   elseif type == "get predecessor answer" then
169     -- ignoring
170
171   else
172     error("Unknown type of task received: " .. task.type)
173   end
174 end
175
176 -- Returns whether an id belongs to the interval [a, b[.
177 -- The parameters are normalized to make sure they are between 0 and nb_keys - 1.
178 -- 1 belongs to [62, 3].
179 -- 1 does not belong to [3, 62].
180 -- 63 belongs to [62, 3].
181 -- 63 does not belong to [3, 62].
182 -- 24 belongs to [21, 29].
183 -- 24 does not belong to [29, 21].
184 function is_in_interval(id, a, b)
185
186   -- normalize the parameters
187   id = id % nb_bits
188   a = a % nb_bits
189   b = b % nb_bits
190  
191   -- make sure a <= b and a <= id
192   if b < a then
193     b = b + nb_keys
194   end
195
196   if id < a then
197     id = id + nb_keys
198   end
199
200   return id <= b
201 end
202
203 -- Returns whether the current node is in the ring.
204 function has_joined()
205
206   return my_node.fingers[1] ~= nil
207 end
208
209 -- Creates a new Chord ring.
210 function create()
211   my_node.predecessor = nil
212   my_node.fingers[1] = my_node.id
213 end
214
215 -- Attemps to join the Chord ring.
216 -- - known_id: id of a node already in the ring
217 -- - return value: true if the join was successful
218 function join(known_id)
219
220   simgrid.info("Joining the ring with id " .. my_node.id .. ", knowing node " .. known_id)
221
222   local successor = remote_find_successor(known_id, my_node.id)
223   if successor == nil then
224     simgrid.info("Cannot join the ring.")
225     return false
226   end
227
228   my_node.predecessor = nil
229   my_node.fingers[1] = successor
230   return true
231 end
232
233 -- Returns the closest preceding finger of an id with respect to the finger
234 -- table of the current node.
235 -- - id: the id to find
236 -- - return value: the closest preceding finger of that id
237 function closest_preceding_node(id)
238
239   for i = nb_bits, 1, -1 do
240     if is_in_interval(my_node.fingers[i], my_node.id + 1, id - 1) then
241       -- finger i is the first one before id
242       return my_node.fingers[i]
243     end
244   end
245 end
246
247 -- Finds the successor of an id
248 -- id: the id to find
249 -- return value: the id of the successor, or nil if the request failed
250 function find_successor(id)
251
252   if is_in_interval(id, my_node.id + 1, my_node.fingers[1]) then
253     -- my successor is the successor
254     return my_node.fingers[1]
255   end
256
257   -- ask to the closest preceding finger in my table
258   local ask_to = closest_preceding_node(id)
259   return remote_find_successor(ask_to, id)
260 end
261
262 -- Asks a remote node the successor of an id.
263 -- ask_to: id of a remote node to ask to
264 -- id: the id to find
265 -- return value: the id of the successor, or nil if the request failed
266 function remote_find_successor(ask_to, id)
267
268   local task = simgrid.task.new("", comp_size, comm_size)
269   task.type = "find successor"
270   task.request_id = id
271   task.answer_to = my_node.id
272
273   simgrid.info("Sending a 'find successor' request to " .. ask_to .. " for id " .. id)
274   if task:send(ask_to, timeout) then
275     -- request successfully sent: wait for an answer
276
277     simgrid.info("Sent the 'find successor' request to " .. ask_to ..
278         " for id " .. id .. ", waiting for the answer")
279
280     while true do
281       task = simgrid.comm.wait(my_node.comm_recv, timeout)
282       my_node.comm_recv = simgrid.task.irecv(my_node.id)
283     
284       if not task then
285         -- failed to receive the answer
286         simgrid.info("Failed to receive the answer to my 'find successor' request")
287         return nil
288       else
289         -- a task was received: is it the expected answer?
290         if task.type ~= "find successor answer" or task.request_id ~= id then
291           -- this is not our answer
292           simgrid.info("Received another request of type " .. task.type)
293           handle_task(task)
294         else
295           -- this is our answer
296           simgrid.info("Received the answer to my 'find successor' request for id " ..
297               id .. ": the successor is " .. task.answer)
298           return task.answer
299         end
300       end
301     end
302   else
303     simgrid.info("Failed to send the 'find successor' request to " .. ask_to ..
304         " for id " .. id)
305   end
306
307   return successor
308 end
309
310 -- Asks a remote node its predecessor.
311 -- ask_to: id of a remote node to ask to
312 -- return value: the id of its predecessor, or nil if the request failed
313 function remote_get_predecessor(ask_to)
314
315   local task = simgrid.task.new("", comp_size, comm_size)
316   task.type = "get predecessor"
317   task.answer_to = my_node.id
318
319   if task:send(ask_to, timeout) then
320     -- request successfully sent: wait for an answer
321     while true do
322       task = simgrid.comm.wait(my_node.comm_recv, timeout)
323       my_node.comm_recv = simgrid.task.irecv(my_node.id)
324     
325       if not task then
326         -- failed to receive the answer
327         return nil
328       else
329         -- a task was received: is it the expected answer?
330         if task.type ~= "get predecessor answer" then
331           -- this is not our answer
332           handle_task(task)
333         else
334           -- this is our answer
335           -- FIXME make sure the message answers to this particular request
336           return task.answer
337         end
338       end
339     end
340   end
341
342   return successor
343 end
344
345 -- Checks the immediate successor of the current node.
346 function stabilize()
347
348   local candidate
349   local successor = my_node.fingers[1]
350   if successor ~= my_node.id then
351     candidate = remote_get_predecessor(successor)
352   else
353     candidate = my_node.predecessor
354   end
355
356   -- this node is a candidate to become my new successor
357   if candidate ~= nil and is_in_interval(candidate, my_node.id + 1, successor - 1) then
358     my_node.fingers[1] = candidate
359   end
360
361   if successor ~= my_node.id then
362     remote_notify(successor, my_node.id)
363   end
364 end
365
366 -- Notifies the current node that its predecessor my have changed
367 -- - candidate_predecessor: the possible new predecessor
368 function notify(candidate_predecessor)
369
370   if my_node.predecessor == nil or is_in_interval(candidate_predecessor,
371       my_node.predecessor + 1, my_node.id - 1) then
372     my_node.predecessor = candidate_predecessor
373   end
374 end
375
376 -- Notifies a remote node that its predecessor my have changed.
377 -- - notify_to
378 -- - candidate the possible new predecessor
379 function remote_notify(notify_to, candidate_predecessor)
380
381   local task = simgrid.task.new("", comp_size, comm_size)
382   task.type = "notify"
383   task.request_id = candidate_predecessor
384   task:dsend(notify_to)
385 end
386
387 -- Refreshes the finger table of the current node.
388 function fix_fingers()
389
390   local i = my_node.next_finger_to_fix
391   local id = find_successor(my_node.id + 2^i)
392   if id ~= nil then
393     if id ~= my_node.fingers[i] then
394       my_node.fingers[i] = id
395     end
396     my_node.next_finger_to_fix = (i % nb_bits) + 1
397   end
398 end
399
400 -- Checks whether the predecessor of the current node has failed.
401 function check_predecessor()
402   -- TODO
403 end
404
405 -- Performs a find successor request to an arbitrary id.
406 function random_lookup()
407
408   find_successor(1337)
409 end
410
411 simgrid.platform(arg[1] or "../../msg/msg_platform.xml")
412 simgrid.application(arg[2] or "../../msg/chord/chord90.xml")
413 simgrid.run()
414