Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of scm.gforge.inria.fr:/gitroot/simgrid/simgrid
[simgrid.git] / examples / lua / chord / chord.lua
1 -- A SimGrid Lua implementation of the Chord DHT
2
3 require("simgrid")
4
5 nb_bits = 24
6 nb_keys = 2^nb_bits
7 comp_size = 0
8 comm_size = 10
9 timeout = 50
10 max_simulation_time = 1000
11 stabilize_delay = 20
12 fix_fingers_delay = 120
13 check_predecessor_delay = 120
14 lookup_delay = 10
15
16 -- current node (don't worry, globals are duplicated in each simulated process)
17 my_node = {
18   id = my_id,
19   next_finger_to_fix = 1,
20   fingers = {},
21   predecessor = nil,
22   comm_recv = nil
23 }
24
25 -- Main function of each Chord process
26 -- Arguments:
27 -- - my id
28 -- - the id of a guy I know in the system (except for the first node)
29 function node(...)
30
31   -- TODO simplify the deployment file
32   local known_id
33   local args = {...}
34   my_node.id = tonumber(args[1])
35   if #args == 4 then
36     known_id = tonumber(args[2])
37   end
38
39   -- initialize the node
40   for i = 1, nb_bits do
41     my_node.fingers[i] = my_node.id
42   end
43   my_node.comm_recv = simgrid.task.irecv(my_node.id)
44
45   -- join the ring
46   local join_success = false
47   if known_id == nil then
48     -- first node
49     create()
50     join_success = true
51   else
52     join_success = join(known_id)
53   end
54
55   -- main loop
56   if join_success then
57
58     local now = simgrid.get_clock()
59     local next_stabilize_date = now + stabilize_delay
60     local next_fix_fingers_date = now + fix_fingers_delay
61     local next_check_predecessor_date = now + check_predecessor_delay
62     local next_lookup_date = now + lookup_delay
63
64     local task, err
65
66     while now < max_simulation_time do
67
68       task, err = my_node.comm_recv:test()
69
70       if task then
71         -- I received a task: answer it
72         my_node.comm_recv = simgrid.task.irecv(my_node.id)
73         handle_task(task)
74       elseif err then
75         -- the communication has failed: nevermind
76         my_node.comm_recv = simgrid.task.irecv(my_node.id)
77       else
78         -- no task was received: do periodic calls
79         if now >= next_stabilize_date then
80           stabilize()
81           next_stabilize_date = simgrid.get_clock() + stabilize_delay
82
83         elseif now >= next_fix_fingers_date then
84           fix_fingers()
85           next_fix_fingers_date = simgrid.get_clock() + fix_fingers_delay
86
87         elseif now >= next_check_predecessor_date then
88           check_predecessor()
89           next_check_predecessor_date = simgrid.get_clock() + check_predecessor_delay
90
91         elseif now >= next_lookup_date then
92           random_lookup()
93           next_lookup_date = simgrid.get_clock() + lookup_delay
94
95         else
96           -- nothing to do: sleep for a while
97           simgrid.process.sleep(5)
98         end
99       end
100       now = simgrid.get_clock()
101     end -- while
102
103     -- leave the ring
104     leave()
105   end
106 end
107
108 -- Makes the current node leave the ring
109 function leave()
110
111   simgrid.info("Leaving the ring")
112   -- TODO: notify others
113 end
114
115 -- This function is called when the current node receives a task.
116 -- - task: the task received
117 function handle_task(task)
118
119   local type = task.type
120
121   if type == "find successor" then
122
123     simgrid.info("Received a 'find successor' request from " .. task.answer_to ..
124         " for id " .. task.request_id)
125
126     -- is my successor the successor?
127     if is_in_interval(task.request_id, my_node.id + 1, my_node.fingers[1]) then
128
129       simgrid.info("Sending back a 'find successor answer' to " ..
130           task.answer_to .. ": the successor of " .. task.request_id ..
131           " is " .. my_node.fingers[1])
132
133       task.type = "find successor answer"
134       task.answer = my_node.fingers[1]
135       task:dsend(task.answer_to)
136     else
137       -- forward the request to the closest preceding finger in my table
138
139       simgrid.info("Forwarding the 'find successor' request to my closest preceding finger")
140       task:dsend(closest_preceding_node(task.request_id))
141     end
142
143   elseif type == "get predecessor" then
144     task.type = "get predecessor answer"
145     task.answer = my_node.predecessor
146     task:dsend(task.answer_to)
147
148   elseif type == "notify" then
149     -- someone is telling me that he may be my new predecessor
150     notify(task.request_id)
151
152   elseif type == "predecessor leaving" then
153     -- TODO
154
155   elseif type == "successor_leaving" then
156     -- TODO
157
158   elseif type == "find successor answer" then
159     -- ignoring
160
161   elseif type == "get predecessor answer" then
162     -- ignoring
163
164   else
165     error("Unknown type of task received: " .. task.type)
166   end
167 end
168
169 -- Returns whether an id belongs to the interval [a, b[.
170 -- The parameters are normalized to make sure they are between 0 and nb_keys - 1.
171 -- 1 belongs to [62, 3].
172 -- 1 does not belong to [3, 62].
173 -- 63 belongs to [62, 3].
174 -- 63 does not belong to [3, 62].
175 -- 24 belongs to [21, 29].
176 -- 24 does not belong to [29, 21].
177 function is_in_interval(id, a, b)
178
179   -- normalize the parameters
180   id = id % nb_bits
181   a = a % nb_bits
182   b = b % nb_bits
183  
184   -- make sure a <= b and a <= id
185   if b < a then
186     b = b + nb_keys
187   end
188
189   if id < a then
190     id = id + nb_keys
191   end
192
193   return id <= b
194 end
195
196 -- Returns whether the current node is in the ring.
197 function has_joined()
198
199   return my_node.fingers[1] ~= nil
200 end
201
202 -- Creates a new Chord ring.
203 function create()
204   my_node.predecessor = nil
205   my_node.fingers[1] = my_node.id
206 end
207
208 -- Attemps to join the Chord ring.
209 -- - known_id: id of a node already in the ring
210 -- - return value: true if the join was successful
211 function join(known_id)
212
213   simgrid.info("Joining the ring with id " .. my_node.id .. ", knowing node " .. known_id)
214
215   local successor = remote_find_successor(known_id, my_node.id)
216   if successor == nil then
217     simgrid.info("Cannot join the ring.")
218     return false
219   end
220
221   my_node.predecessor = nil
222   my_node.fingers[1] = successor
223   return true
224 end
225
226 -- Returns the closest preceding finger of an id with respect to the finger
227 -- table of the current node.
228 -- - id: the id to find
229 -- - return value: the closest preceding finger of that id
230 function closest_preceding_node(id)
231
232   for i = nb_bits, 1, -1 do
233     if is_in_interval(my_node.fingers[i], my_node.id + 1, id - 1) then
234       -- finger i is the first one before id
235       return my_node.fingers[i]
236     end
237   end
238 end
239
240 -- Finds the successor of an id
241 -- id: the id to find
242 -- return value: the id of the successor, or nil if the request failed
243 function find_successor(id)
244
245   if is_in_interval(id, my_node.id + 1, my_node.fingers[1]) then
246     -- my successor is the successor
247     return my_node.fingers[1]
248   end
249
250   -- ask to the closest preceding finger in my table
251   local ask_to = closest_preceding_node(id)
252   return remote_find_successor(ask_to, id)
253 end
254
255 -- Asks a remote node the successor of an id.
256 -- ask_to: id of a remote node to ask to
257 -- id: the id to find
258 -- return value: the id of the successor, or nil if the request failed
259 function remote_find_successor(ask_to, id)
260
261   local task = simgrid.task.new("", comp_size, comm_size)
262   task.type = "find successor"
263   task.request_id = id
264   task.answer_to = my_node.id
265
266   simgrid.info("Sending a 'find successor' request to " .. ask_to .. " for id " .. id)
267   if task:send(ask_to, timeout) then
268     -- request successfully sent: wait for an answer
269
270     simgrid.info("Sent the 'find successor' request to " .. ask_to ..
271         " for id " .. id .. ", waiting for the answer")
272
273     while true do
274       task = my_node.comm_recv:wait(timeout)
275       my_node.comm_recv = simgrid.task.irecv(my_node.id)
276     
277       if not task then
278         -- failed to receive the answer
279         simgrid.info("Failed to receive the answer to my 'find successor' request")
280         return nil
281       else
282         -- a task was received: is it the expected answer?
283         if task.type ~= "find successor answer" or task.request_id ~= id then
284           -- this is not our answer
285           simgrid.info("Received another request of type " .. task.type)
286           handle_task(task)
287         else
288           -- this is our answer
289           simgrid.info("Received the answer to my 'find successor' request for id " ..
290               id .. ": the successor is " .. task.answer)
291           return task.answer
292         end
293       end
294     end
295   else
296     simgrid.info("Failed to send the 'find successor' request to " .. ask_to ..
297         " for id " .. id)
298   end
299
300   return successor
301 end
302
303 -- Asks a remote node its predecessor.
304 -- ask_to: id of a remote node to ask to
305 -- return value: the id of its predecessor, or nil if the request failed
306 function remote_get_predecessor(ask_to)
307
308   local task = simgrid.task.new("", comp_size, comm_size)
309   task.type = "get predecessor"
310   task.answer_to = my_node.id
311
312   if task:send(ask_to, timeout) then
313     -- request successfully sent: wait for an answer
314     while true do
315       task = my_node.comm_recv:wait(timeout)
316       my_node.comm_recv = simgrid.task.irecv(my_node.id)
317     
318       if not task then
319         -- failed to receive the answer
320         return nil
321       else
322         -- a task was received: is it the expected answer?
323         if task.type ~= "get predecessor answer" then
324           -- this is not our answer
325           handle_task(task)
326         else
327           -- this is our answer
328           -- FIXME make sure the message answers to this particular request
329           return task.answer
330         end
331       end
332     end
333   end
334
335   return successor
336 end
337
338 -- Checks the immediate successor of the current node.
339 function stabilize()
340
341   local candidate
342   local successor = my_node.fingers[1]
343   if successor ~= my_node.id then
344     candidate = remote_get_predecessor(successor)
345   else
346     candidate = my_node.predecessor
347   end
348
349   -- this node is a candidate to become my new successor
350   if candidate ~= nil and is_in_interval(candidate, my_node.id + 1, successor - 1) then
351     my_node.fingers[1] = candidate
352   end
353
354   if successor ~= my_node.id then
355     remote_notify(successor, my_node.id)
356   end
357 end
358
359 -- Notifies the current node that its predecessor my have changed
360 -- - candidate_predecessor: the possible new predecessor
361 function notify(candidate_predecessor)
362
363   if my_node.predecessor == nil or is_in_interval(candidate_predecessor,
364       my_node.predecessor + 1, my_node.id - 1) then
365     my_node.predecessor = candidate_predecessor
366   end
367 end
368
369 -- Notifies a remote node that its predecessor my have changed.
370 -- - notify_to
371 -- - candidate the possible new predecessor
372 function remote_notify(notify_to, candidate_predecessor)
373
374   local task = simgrid.task.new("", comp_size, comm_size)
375   task.type = "notify"
376   task.request_id = candidate_predecessor
377   task:dsend(notify_to)
378 end
379
380 -- Refreshes the finger table of the current node.
381 function fix_fingers()
382
383   local i = my_node.next_finger_to_fix
384   local id = find_successor(my_node.id + 2^i)
385   if id ~= nil then
386     if id ~= my_node.fingers[i] then
387       my_node.fingers[i] = id
388     end
389     my_node.next_finger_to_fix = (i % nb_bits) + 1
390   end
391 end
392
393 -- Checks whether the predecessor of the current node has failed.
394 function check_predecessor()
395   -- TODO
396 end
397
398 -- Performs a find successor request to an arbitrary id.
399 function random_lookup()
400
401   find_successor(1337)
402 end
403
404 simgrid.platform(arg[1] or "../../msg/msg_platform.xml")
405 simgrid.application(arg[2] or "../../msg/chord/chord90.xml")
406 simgrid.run()
407