Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of git+ssh://scm.gforge.inria.fr//gitroot/simgrid/simgrid into...
[simgrid.git] / examples / lua / bittorrent / peer.lua
1 -- A SimGrid Lua implementation of the Bittorrent protocol.
2
3 require("simgrid")
4 -- Common Constants
5 common = {
6         FILE_SIZE = 5120,
7         FILE_PIECE_SIZE = 512,
8         FILE_PIECES = 10,
9         
10         PIECE_COMM_SIZE = 1,
11         
12         MESSAGE_SIZE = 1,
13         MAXIMUM_PEERS = 50,
14         
15         TRACKER_QUERY_INTERVAL = 1000,
16         TRACKER_COMM_SIZE = 0.01,
17         
18         GET_PEERS_TIMEOUT = 10000,
19         TIMEOUT_MESSAGE = 10,
20         MAX_UNCHOKED_PEERS = 4,
21         UPDATE_CHOKED_INTERVAL = 50,
22         MAX_PIECES = 1,
23 }
24
25
26 -- Peer main function
27 function peer(...)
28         
29         local args = {...}
30         
31         if #args ~= 2 and #args ~= 3 then
32                 simgrid.info("Wrong number of arguments")
33         end
34                         
35         -- Setting the peer data
36         data = {
37                         -- Retrieving the peer id
38                         id = tonumber(args[1]),
39                         mailbox = tostring(tonumber(args[1])),
40                         mailbox_tracker = "tracker" .. args[1],
41                         peers = {},
42                         active_peers = {},
43                         current_pieces = {},
44                         pieces_requested = 0,
45                         bitfield = {},
46                         pieces_count = {},
47                         pieces = 0,
48                         deadline = tonumber(args[2]),
49                         round = 0
50         }
51         simgrid.info("Hi, I'm joining the network with id " .. data.id)
52
53         -- Checking if the peer is a seed
54         local bitfield_value = false
55         if args[3] == "1" then
56                 data.pieces = common.PIECES_COUNT
57                 bitfield_value = true
58         else
59                 data.pieces = 0
60         end
61         -- Building the peer bitfield and the pieces list
62         for i = 1, common.FILE_PIECES do
63                 data.pieces_count[i] = 0        
64                 data.bitfield[i] = bitfield_value
65         end
66         
67         if get_peers_data() == true then
68                 data.comm_received = simgrid.task.irecv(data.mailbox)
69                 if has_finished() then
70                         send_handshake_all()
71                         seed_loop()
72                 else
73                         leech_loop()
74                         seed_loop()
75                 end
76         else
77                 simgrid.info("Couldn't contact the tracker")
78         end
79         simgrid.info("My status is now " .. get_status())
80 end
81 -- Peer main loop when it is leeching
82 function leech_loop()
83         simgrid.info("Start downloading.")
84         local now = simgrid.get_clock()
85         local next_choked_update = now + common.UPDATE_CHOKED_INTERVAL;
86         -- Send a "handshake" message to all the peers it got
87         -- it couldn't have gotten more than 50 peers anyway)
88         send_handshake_all()
89         -- Wait for at leaast one bitfield message
90         wait_for_pieces()
91         
92         simgrid.info("Starting main leech loop")
93         local task, err
94         while now < data.deadline and data.pieces < common.FILE_PIECES do
95                 task, err = data.comm_received:test()
96                 if task then
97                         handle_message(task)
98                         data.comm_received = simgrid.task.irecv(data.mailbox)
99                         now = simgrid.get_clock()
100                 elseif err then
101                         data.comm_received = simgrid.task.irecv(data.mailbox)           
102                 else
103                         -- If the user has a pending interesting
104                         if data.current_piece ~= -1 then
105                                 send_interested_to_peers()
106                         else
107                                 if table.getn(data.current_pieces) < common.MAX_PIECES then
108                                         update_current_piece()
109                                 end
110                         end
111                         -- We don't execute the choke algorithm if we don't already have a piece
112                         if now >= next_choked_update and data.pieces > 0 then
113                                 update_choked_peers()
114                                 next_choked_update = next_choked_update + common.UPDATE_CHOKED_INTERVAL
115                                 now = simgrid.get_clock()
116                         else
117                                 simgrid.process.sleep(1)
118                                 now = simgrid.get_clock()
119                         end
120                 end             
121         end
122 end
123 -- Peer main loop when it is seeding
124 function seed_loop()
125         local now = simgrid.get_clock()
126         local next_choked_update = now + common.UPDATE_CHOKED_INTERVAL;
127         simgrid.debug("Start seeding.")
128         -- Start the main seed loop
129         while now < data.deadline do
130                 task, err = data.comm_received:test()           
131                 if task then
132                         handle_message(task)
133                         data.comm_received = simgrid.task.irecv(data.mailbox)           
134                         now = simgrid.get_clock()
135                 elseif err then
136                         data.comm_received = simgrid.task.irecv(data.mailbox)           
137                 else
138                         if now >= next_choked_update then
139                                 update_choked_peers()
140                                 next_choked_update = next_choked_update + common.UPDATE_CHOKED_INTERVAL
141                                 now = simgrid.get_clock()
142                         else
143                                 simgrid.process.sleep(1)
144                                 now = simgrid.get_clock()
145                         end             
146                 end
147         end
148 end
149 -- Retrieve the peers list from the tracker
150 function get_peers_data() 
151         local success = false
152         local now = simgrid.get_clock()
153         local timeout = now + common.GET_PEERS_TIMEOUT
154         -- Build the task
155         local task_send = simgrid.task.new("", 0, common.MESSAGE_SIZE)
156         task_send.type = "REQUEST"
157         task_send.peer_id = data.id
158         task_send.mailbox = data.mailbox_tracker
159         -- Send the task
160         while not(success) and now < timeout do
161                 simgrid.debug("Sending a peer request to the tracker.")
162                 if task_send:send("tracker") then
163                         success = true
164                 end
165         end
166         now = simgrid.get_clock()
167         success = false
168         -- Wait for the answer
169         local comm_received = simgrid.task.irecv(data.mailbox_tracker)
170         while not(success) and now < timeout do
171                 local task_received = comm_received:wait(timeout)
172                 comm_received = simgrid.task.irecv(data.mailbox_tracker)
173                 if task_received then
174                         simgrid.info("Received an answer from the tracker with " .. #task_received.peers .. " peers inside")
175                         -- Add what we received to our peer list
176                         for i,v in pairs(task_received.peers) do
177                                 if v ~= data.id then
178                                         --Add the peer to our list and build its data
179                                         local peer_data = {}
180                                         peer_data.id = v;
181                                         peer_data.bitfield = nil
182                                         peer_data.mailbox = tostring(v);
183                                         peer_data.am_interested = false
184                                         peer_data.interested = false
185                                         peer_data.choked_upload = true
186                                         peer_data.choked_download = true
187                                         data.peers[v] = peer_data
188                                 end
189                         end
190                 else
191                         success = false
192                 end
193                 success = true
194         end
195         return success; 
196 end
197 -- Returns if the peer has finished downloading the piece
198 function has_finished() 
199         for i,v in pairs(data.bitfield) do
200                 if v == false then
201                         return false
202                 end
203         end
204         return true
205 end
206 -- Handle a received message sent by another peer
207 function handle_message(task)
208         local remote_peer = data.peers[task.peer_id]
209         
210         if task.type == "HANDSHAKE" then
211                 simgrid.debug("Received a HANDSHAKE message from " .. task.mailbox)
212                 -- Check if the peer is in our connection list
213                 if data.peers[task.peer_id] == nil then
214                         local peer_data = {}
215                         peer_data.mailbox = task.mailbox
216                         peer_data.id = task.peer_id
217                         peer_data.am_interested = false
218                         peer_data.interested = false
219                         peer_data.choked_upload = true
220                         peer_data.choked_download = true
221                         peer_data.bitfield = nil
222                         data.peers[task.peer_id] = peer_data
223                         send_handshake(task.mailbox)
224                 end
225                 -- Send our bitfield to the peer
226                 send_bitfield(task.mailbox)
227         elseif task.type == "BITFIELD" then
228                 simgrid.debug("Received a BITFIELD from " .. task.mailbox)
229                 -- Update the pieces list
230                 update_piece_count_from_bitfield(task.bitfield)
231                 -- Update the current piece
232                 if data.current_piece == -1 and data.pieces < common.FILE_PIECES then
233                         update_current_piece()
234                 end
235                 data.peers[task.peer_id].bitfield = task.bitfield
236         elseif task.type == "INTERESTED" then
237                 simgrid.debug("Received an INTERESTED message from " .. task.mailbox)
238                 data.peers[task.peer_id].interested = true
239         elseif task.type == "NOTINTERESTED" then
240                 simgrid.debug("Received an NOTINTERESTED message from " .. task.mailbox)
241                 data.peers[task.peer_id].interested = false
242         elseif task.type == "UNCHOKE" then
243                 simgrid.debug("Received an UNCHOKE message from " .. task.mailbox)
244                 data.peers[task.peer_id].choked_download = false
245                 send_requests_to_peer(data.peers[task.peer_id])
246         elseif task.type == "CHOKE" then
247                 simgrid.debug("Recevied a CHOKE message from " .. task.mailbox)
248                 data.peers[task.peer_id].choked_download = true         
249         elseif task.type == "HAVE" then
250                 local remote_peer = data.peers[task.peer_id] 
251                 if remote_peer == nil or remote_peer.bitfield == nil then
252                         return
253                 end
254                 simgrid.debug("Received a HAVE message from " .. task.mailbox)
255                 data.pieces_count[task.piece] = data.pieces_count[task.piece] + 1
256                 -- Send interested message to the peer if he has what we want
257                 if not(remote_peer.am_interested) and data.current_pieces[task.piece] ~= nil then
258                         remote_peer.am_interested = true
259                         send_interested(remote_peer.mailbox)
260                 end
261                 if data.current_pieces[task.piece] ~= nil then
262                         send_request(task.mailbox,task.piece)
263                 end
264         elseif task.type == "REQUEST" then
265                 simgrid.debug("Received REQUEST from " .. task.mailbox .. " for " .. task.piece)
266                 local remote_peer = data.peers[task.peer_id] 
267                 if remote_peer.choked_upload == false then
268                         if data.bitfield[task.piece] == true then
269                                 send_piece(task.mailbox,task.piece,false)
270                         end
271                 end
272         elseif task.type == "PIECE" then
273                 if task.stalled == true then
274                         simgrid.debug("The received piece is stalled")
275                 else
276                         simgrid.debug("Received piece " .. task.piece .. " from " .. task.mailbox)
277                         if data.bitfield[task.piece] ~= true then
278                                 data.pieces_requested = data.pieces_requested - 1
279                                 -- Removing the piece from our piece list
280                                 data.current_pieces[task.piece] = nil
281                                 data.bitfield[task.piece] = true
282                                 data.pieces = data.pieces + 1
283                                 simgrid.debug("My status is now:" .. get_status())
284                                 -- Sending the information to all the peers we are connected to
285                                 send_have(task.piece)
286                                 -- Sending UNINTERESTED to the peers that doesn't have any more pieces
287                                 update_interested_after_receive()
288                         end
289                 end
290         end
291 end
292 -- Update the piece the peer is currently interested in.
293 -- There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
294 -- If the peer has less than 3 pieces, he chooses a piece at random.
295 -- If the peer has more than pieces, he downloads the pieces that are the less
296 -- replicated
297 function update_current_piece() 
298         if data.pieces_requested >= (common.FILE_PIECES - data.pieces) then
299                 return
300         end
301         if data.pieces < 3 or true then
302                 repeat
303                         data.current_piece = math.random(1,common.FILE_PIECES)
304 --                      simgrid.info("The new piece is:" .. data.current_piece)
305                 until data.bitfield[data.current_piece] ~= true and data.current_pieces[data.current_piece] == nil
306                 data.current_pieces[data.current_piece] = true
307                 data.pieces_requested = data.pieces_requested + 1
308         end             
309         
310 end
311 -- Updates the list of who has a piece from a bitfield
312 function update_piece_count_from_bitfield(bitfield)
313         for i,v in pairs(bitfield) do
314                 if v == true then       
315                         data.pieces_count[i] = data.pieces_count[i] + 1
316                 end
317         end
318 end
319 -- Wait for the node to receive interesting bitfield messages (ie: non empty)
320 function wait_for_pieces() 
321         local finished = false
322         local now = simgrid.get_clock()
323         local task
324         while now < data.deadline and not(finished) do
325                 task = data.comm_received:wait(common.TIMEOUT_MESSAGE)
326                 if task then
327                         handle_message(task)
328                         if data.current_piece ~= -1 then
329                                 finished = true
330                         end     
331                 end
332                 data.comm_received = simgrid.task.irecv(data.mailbox)           
333         end
334 end
335 -- Update the list of current choked and unchoked peers, using the
336 -- choke algorithm
337 function update_choked_peers()
338         data.round = (data.round + 1) % 3
339         -- Remove a peer from the list
340         for i,v in pairs(data.active_peers) do
341                 data.active_peers[i] = nil
342                 send_choked(v.mailbox)
343                 break
344         end
345         -- Random optimistic unchoking
346         if true then
347                 local values = {}
348                 for key, value in pairs(data.peers) do
349                         values[#values + 1] = value
350                 end
351                 local peer_choosed = nil
352                 local j = 0
353
354                 repeat
355                         peer_choosed = values[math.random(#values)]             
356                         if peer_choosed.interested ~= true then
357                                 peer_choosed = nil
358                         end
359                         j = j + 1
360                 until peer_choosed ~= nil or j < common.MAXIMUM_PEERS
361                 if peer_choosed ~= nil then
362                         data.active_peers[peer_choosed.id] = peer_choosed
363                         peer_choosed.choked_upload = false
364                         send_unchoked(peer_choosed.mailbox)
365                 end
366         end
367         -- TODO: Use the leecher choke algorithm
368 end
369 --  Updates our "interested" state about peers: send "not interested" to peers
370 --  that don't have any more pieces we want.
371 function update_interested_after_receive()
372         local interested = false
373         for i,v in pairs(data.peers) do
374                 if v.am_interested then
375                         for piece,j in pairs(data.current_pieces) do
376                                 if v.bitfield ~= nil then
377                                         if v.bitfield[piece] == true then
378                                                 interested = true
379                                                 break
380                                         else
381                                         end
382                                 end
383                         end
384                         if not(interested) then
385                                 v.am_interested = false
386                                 send_not_interested(v.mailbox)
387                         end
388                 end
389         end
390 end
391 -- Find the peers that have the current interested piece and send them
392 -- the "interested" message
393 function send_interested_to_peers()
394         if data.current_piece == -1 then
395                 return
396         end
397         for i,v in pairs(data.peers) do
398                 if v.bitfield ~= nil then
399                         v.am_interested = true
400                         send_interested(v.mailbox)
401                 end
402         end
403         data.current_piece = -1
404 end 
405 -- Send a "interested" message to a peer.
406 function send_interested(mailbox)
407         simgrid.debug("Sending a INTERESTED to " .. mailbox)
408         local task = new_task("INTERESTED")
409         task:dsend(mailbox)
410 end
411 -- Send a "not interested" message to a peer.
412 function send_not_interested(mailbox)
413         local task = new_task("NOTINTERESTED")
414         task:dsend(mailbox)
415 end
416 -- Send a handshake message to all the peers the peer has
417 function send_handshake_all()
418         for i,v in pairs(data.peers) do
419                 local task = new_task("HANDSHAKE")
420                 task:dsend(v.mailbox)
421         end
422 end
423 -- Send a "handshake" message to an user
424 function send_handshake(mailbox)
425         simgrid.debug("Sending a HANDSHAKE to " .. mailbox)
426         local task = new_task("HANDSHAKE")
427         task:dsend(mailbox)             
428 end
429 -- Send a "choked" message to a peer
430 function send_choked(mailbox)
431         simgrid.debug("Sending a CHOKE to " .. mailbox)
432         local task = new_task("CHOKE")
433         task:dsend(mailbox)     
434 end
435 -- Send a "unchoked" message to a peer
436 function send_unchoked(mailbox)
437         simgrid.debug("Sending a UNCHOKE to " .. mailbox)
438         local task = new_task("UNCHOKE")
439         task:dsend(mailbox)     
440 end
441 -- Send a "HAVE" message to all peers we are connected to
442 function send_have(piece)
443         for i,v in pairs(data.peers) do
444                 local task = new_task("HAVE")
445                 task.piece = piece
446                 task:dsend(v.mailbox)
447         end
448 end
449 -- Send request messages to a peer that have unchoked us        
450 function send_requests_to_peer(remote_peer)
451         for i,v in pairs(data.current_pieces) do
452                 send_request(remote_peer.mailbox,i)
453         end
454 end
455 -- Send a bitfield message to a peer
456 function send_bitfield(mailbox)
457         simgrid.debug("Sending a BITFIELD to " .. mailbox)
458         local task = new_task("BITFIELD")
459         task.bitfield = data.bitfield
460         task:dsend(mailbox)
461 end
462 -- Send a "request" message to a pair, containing a request for a piece
463 function send_request(mailbox, piece)
464         simgrid.debug("Sending a REQUEST to " .. mailbox .. " for " .. piece)
465         local task =  new_task("REQUEST")
466         task.piece = piece
467         task:dsend(mailbox)
468 end     
469 -- Send a "piece" messageto a pair, containing a piece of the file
470 function send_piece(mailbox, piece, stalled)
471         simgrid.debug("Sending the PIECE " .. piece .. " to " .. mailbox)
472         local task = new_task("PIECE")
473         task.piece = piece
474         task.stalled = stalled
475         task:dsend(mailbox)     
476 end
477 function new_task(type)
478         local task = simgrid.task.new("", 0, common.MESSAGE_SIZE)
479         task.type = type
480         task.mailbox = data.mailbox
481         task.peer_id = data.id  
482         return task
483 end
484 function get_status()
485         local s = ""
486         for i,v in pairs(data.bitfield) do
487                 if v == true then
488                          s = s .. '1'
489                 else
490                         s = s .. '0'
491                 end
492         end
493         return s
494 end