Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
7166c004eb628ba4b93a7482c71280041e6dd464
[simgrid.git] / examples / lua / bittorrent / peer.lua
1 -- Copyright (c) 2012, 2014. The SimGrid Team.
2 -- All rights reserved.
3
4 -- This program is free software; you can redistribute it and/or modify it
5 -- under the terms of the license (GNU LGPL) which comes with this package.
6
7 -- A SimGrid Lua implementation of the Bittorrent protocol.
8
9 require("simgrid")
10 -- Common Constants
11 common = {
12         FILE_SIZE = 5120,
13         FILE_PIECE_SIZE = 512,
14         FILE_PIECES = 10,
15         
16         PIECE_COMM_SIZE = 1,
17         
18         MESSAGE_SIZE = 1,
19         MAXIMUM_PEERS = 50,
20         
21         TRACKER_QUERY_INTERVAL = 1000,
22         TRACKER_COMM_SIZE = 0.01,
23         
24         GET_PEERS_TIMEOUT = 10000,
25         TIMEOUT_MESSAGE = 10,
26         MAX_UNCHOKED_PEERS = 4,
27         UPDATE_CHOKED_INTERVAL = 50,
28         MAX_PIECES = 1,
29 }
30
31
32 -- Peer main function
33 function peer(...)
34         
35         local args = {...}
36         
37         if #args ~= 2 and #args ~= 3 then
38                 simgrid.info("Wrong number of arguments")
39         end
40                         
41         -- Setting the peer data
42         data = {
43                         -- Retrieving the peer id
44                         id = tonumber(args[1]),
45                         mailbox = tostring(tonumber(args[1])),
46                         mailbox_tracker = "tracker" .. args[1],
47                         peers = {},
48                         active_peers = {},
49                         current_pieces = {},
50                         pieces_requested = 0,
51                         bitfield = {},
52                         pieces_count = {},
53                         pieces = 0,
54                         deadline = tonumber(args[2]),
55                         round = 0
56         }
57         simgrid.info("Hi, I'm joining the network with id " .. data.id)
58
59         -- Checking if the peer is a seed
60         local bitfield_value = false
61         if args[3] == "1" then
62                 data.pieces = common.PIECES_COUNT
63                 bitfield_value = true
64         else
65                 data.pieces = 0
66         end
67         -- Building the peer bitfield and the pieces list
68         for i = 1, common.FILE_PIECES do
69                 data.pieces_count[i] = 0        
70                 data.bitfield[i] = bitfield_value
71         end
72         
73         if get_peers_data() == true then
74                 data.comm_received = simgrid.task.irecv(data.mailbox)
75                 if has_finished() then
76                         send_handshake_all()
77                         seed_loop()
78                 else
79                         leech_loop()
80                         seed_loop()
81                 end
82         else
83                 simgrid.info("Couldn't contact the tracker")
84         end
85         simgrid.info("My status is now " .. get_status())
86 end
87 -- Peer main loop when it is leeching
88 function leech_loop()
89         simgrid.info("Start downloading.")
90         local now = simgrid.get_clock()
91         local next_choked_update = now + common.UPDATE_CHOKED_INTERVAL;
92         -- Send a "handshake" message to all the peers it got
93         -- it couldn't have gotten more than 50 peers anyway)
94         send_handshake_all()
95         -- Wait for at leaast one bitfield message
96         wait_for_pieces()
97         
98         simgrid.info("Starting main leech loop")
99         local task, err
100         while now < data.deadline and data.pieces < common.FILE_PIECES do
101         task, err = data.comm_received:test()
102         if task then
103             handle_message(task)
104             data.comm_received = simgrid.task.irecv(data.mailbox)
105             now = simgrid.get_clock()
106         elseif err then
107             data.comm_received = simgrid.task.irecv(data.mailbox)
108         else
109             -- If the user has a pending interesting
110             if data.current_piece ~= -1 then
111                 send_interested_to_peers()
112             else
113                 if #data.current_pieces < common.MAX_PIECES then
114                     update_current_piece()
115                 end
116             end
117             -- We don't execute the choke algorithm if we don't already have a piece
118             if now >= next_choked_update and data.pieces > 0 then
119                 update_choked_peers()
120                 next_choked_update = next_choked_update + common.UPDATE_CHOKED_INTERVAL
121                 now = simgrid.get_clock()
122             else
123                 simgrid.process.sleep(1)
124                 now = simgrid.get_clock()
125             end
126         end
127         end
128 end
129 -- Peer main loop when it is seeding
130 function seed_loop()
131         local now = simgrid.get_clock()
132         local next_choked_update = now + common.UPDATE_CHOKED_INTERVAL;
133         simgrid.debug("Start seeding.")
134         -- Start the main seed loop
135         while now < data.deadline do
136                 task, err = data.comm_received:test()           
137                 if task then
138                         handle_message(task)
139                         data.comm_received = simgrid.task.irecv(data.mailbox)           
140                         now = simgrid.get_clock()
141                 elseif err then
142                         data.comm_received = simgrid.task.irecv(data.mailbox)           
143                 else
144                         if now >= next_choked_update then
145                                 update_choked_peers()
146                                 next_choked_update = next_choked_update + common.UPDATE_CHOKED_INTERVAL
147                                 now = simgrid.get_clock()
148                         else
149                                 simgrid.process.sleep(1)
150                                 now = simgrid.get_clock()
151                         end             
152                 end
153         end
154 end
155 -- Retrieve the peers list from the tracker
156 function get_peers_data() 
157         local success = false
158         local now = simgrid.get_clock()
159         local timeout = now + common.GET_PEERS_TIMEOUT
160         -- Build the task
161         local task_send = simgrid.task.new("", 0, common.MESSAGE_SIZE)
162         task_send.type = "REQUEST"
163         task_send.peer_id = data.id
164         task_send.mailbox = data.mailbox_tracker
165         -- Send the task
166         while not(success) and now < timeout do
167                 simgrid.debug("Sending a peer request to the tracker.")
168                 if task_send:send("tracker") then
169                         success = true
170                 end
171         end
172         now = simgrid.get_clock()
173         success = false
174         -- Wait for the answer
175         local comm_received = simgrid.task.irecv(data.mailbox_tracker)
176         while not(success) and now < timeout do
177                 local task_received = comm_received:wait(timeout)
178                 comm_received = simgrid.task.irecv(data.mailbox_tracker)
179                 if task_received then
180                         simgrid.info("Received an answer from the tracker with " .. #task_received.peers .. " peers inside")
181                         -- Add what we received to our peer list
182                         for i,v in pairs(task_received.peers) do
183                                 if v ~= data.id then
184                                         --Add the peer to our list and build its data
185                                         local peer_data = {}
186                                         peer_data.id = math.tointeger(v);
187                                         peer_data.bitfield = nil
188                                         peer_data.mailbox = math.tointeger(v);
189                                         peer_data.am_interested = false
190                                         peer_data.interested = false
191                                         peer_data.choked_upload = true
192                                         peer_data.choked_download = true
193                     data.peers[v] = peer_data
194                     simgrid.info("Added " .. v)
195                                 end
196                         end
197             mt = {}
198             mt.__len = function(obj)
199                 local len = 0;
200                 for j,k in pairs(obj) do
201                     len = len+1
202                 end
203                 return len
204             end
205             setmetatable(data.peers, mt)
206                 else
207                         success = false
208                 end
209                 success = true
210         end
211         return success; 
212 end
213 -- Returns if the peer has finished downloading the piece
214 function has_finished() 
215         for i,v in pairs(data.bitfield) do
216                 if v == false then
217                         return false
218                 end
219         end
220         return true
221 end
222 -- Handle a received message sent by another peer
223 function handle_message(task)
224         local remote_peer = data.peers[task.peer_id]
225         
226         if task.type == "HANDSHAKE" then
227                 simgrid.debug("Received a HANDSHAKE message from " .. task.mailbox)
228                 -- Check if the peer is in our connection list
229                 if data.peers[task.peer_id] == nil then
230                         local peer_data = {}
231                         peer_data.mailbox = task.mailbox
232                         peer_data.id = task.peer_id
233                         peer_data.am_interested = false
234                         peer_data.interested = false
235                         peer_data.choked_upload = true
236                         peer_data.choked_download = true
237                         peer_data.bitfield = nil
238                         data.peers[task.peer_id] = peer_data
239                         send_handshake(task.mailbox)
240                 end
241                 -- Send our bitfield to the peer
242                 send_bitfield(task.mailbox)
243         elseif task.type == "BITFIELD" then
244                 simgrid.debug("Received a BITFIELD from " .. task.mailbox)
245                 -- Update the pieces list
246                 update_piece_count_from_bitfield(task.bitfield)
247                 -- Update the current piece
248                 if data.current_piece == -1 and data.pieces < common.FILE_PIECES then
249                         update_current_piece()
250                 end
251                 data.peers[task.peer_id].bitfield = task.bitfield
252         elseif task.type == "INTERESTED" then
253                 simgrid.debug("Received an INTERESTED message from " .. task.mailbox)
254                 data.peers[task.peer_id].interested = true
255         elseif task.type == "NOTINTERESTED" then
256                 simgrid.debug("Received an NOTINTERESTED message from " .. task.mailbox)
257                 data.peers[task.peer_id].interested = false
258         elseif task.type == "UNCHOKE" then
259                 simgrid.debug("Received an UNCHOKE message from " .. task.mailbox)
260                 data.peers[task.peer_id].choked_download = false
261                 send_requests_to_peer(data.peers[task.peer_id])
262         elseif task.type == "CHOKE" then
263                 simgrid.debug("Recevied a CHOKE message from " .. task.mailbox)
264                 data.peers[task.peer_id].choked_download = true         
265         elseif task.type == "HAVE" then
266                 local remote_peer = data.peers[task.peer_id] 
267                 if remote_peer == nil or remote_peer.bitfield == nil then
268                         return
269                 end
270                 simgrid.debug("Received a HAVE message from " .. task.mailbox)
271                 data.pieces_count[task.piece] = data.pieces_count[task.piece] + 1
272                 -- Send interested message to the peer if he has what we want
273                 if not(remote_peer.am_interested) and data.current_pieces[task.piece] ~= nil then
274                         remote_peer.am_interested = true
275                         send_interested(remote_peer.mailbox)
276                 end
277                 if data.current_pieces[task.piece] ~= nil then
278                         send_request(task.mailbox,task.piece)
279                 end
280         elseif task.type == "REQUEST" then
281                 simgrid.debug("Received REQUEST from " .. task.mailbox .. " for " .. task.piece)
282                 local remote_peer = data.peers[task.peer_id] 
283                 if remote_peer.choked_upload == false then
284                         if data.bitfield[task.piece] == true then
285                                 send_piece(task.mailbox,task.piece,false)
286                         end
287                 end
288         elseif task.type == "PIECE" then
289         task.piece = math.tointeger(task.piece)
290                 if task.stalled == true then
291                         simgrid.debug("The received piece is stalled")
292                 else
293                         simgrid.debug("Received piece " .. task.piece .. " from " .. task.mailbox)
294                         if data.bitfield[task.piece] ~= true then
295                                 data.pieces_requested = data.pieces_requested - 1
296                                 -- Removing the piece from our piece list
297                                 data.current_pieces[task.piece] = nil
298                                 data.bitfield[task.piece] = true
299                                 data.pieces = data.pieces + 1
300                                 simgrid.debug("My status is now:" .. get_status())
301                                 -- Sending the information to all the peers we are connected to
302                                 send_have(task.piece)
303                                 -- Sending UNINTERESTED to the peers that doesn't have any more pieces
304                                 update_interested_after_receive()
305                         end
306                 end
307         end
308 end
309 -- Update the piece the peer is currently interested in.
310 -- There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
311 -- If the peer has less than 3 pieces, he chooses a piece at random.
312 -- If the peer has more than pieces, he downloads the pieces that are the less
313 -- replicated
314 function update_current_piece() 
315         if data.pieces_requested >= (common.FILE_PIECES - data.pieces) then
316                 return
317         end
318         if data.pieces < 3 or true then
319                 repeat
320                         data.current_piece = math.random(1,common.FILE_PIECES)
321 --                      simgrid.info("The new piece is:" .. data.current_piece)
322                 until data.bitfield[data.current_piece] ~= true and data.current_pieces[data.current_piece] == nil
323                 data.current_pieces[data.current_piece] = true
324                 data.pieces_requested = data.pieces_requested + 1
325         end             
326         
327 end
328 -- Updates the list of who has a piece from a bitfield
329 function update_piece_count_from_bitfield(bitfield)
330         for i,v in pairs(bitfield) do
331                 if v == true then       
332                         data.pieces_count[i] = data.pieces_count[i] + 1
333                 end
334         end
335 end
336 -- Wait for the node to receive interesting bitfield messages (ie: non empty)
337 function wait_for_pieces() 
338         local finished = false
339         local now = simgrid.get_clock()
340         local task
341         while now < data.deadline and not(finished) do
342                 task = data.comm_received:wait(common.TIMEOUT_MESSAGE)
343                 if task then
344                         handle_message(task)
345                         if data.current_piece ~= -1 then
346                                 finished = true
347                         end     
348                 end
349                 data.comm_received = simgrid.task.irecv(data.mailbox)           
350         end
351 end
352 -- Update the list of current choked and unchoked peers, using the
353 -- choke algorithm
354 function update_choked_peers()
355         data.round = (data.round + 1) % 3
356         -- Remove a peer from the list
357         for i,v in pairs(data.active_peers) do
358                 data.active_peers[i] = nil
359                 send_choked(v.mailbox)
360                 break
361         end
362         -- Random optimistic unchoking
363         if true then
364                 local values = {}
365                 for key, value in pairs(data.peers) do
366                         values[#values + 1] = value
367                 end
368                 local peer_choosed = nil
369                 local j = 0
370
371                 repeat
372                         peer_choosed = values[math.random(#values)]             
373                         if peer_choosed.interested ~= true then
374                                 peer_choosed = nil
375                         end
376                         j = j + 1
377                 until peer_choosed ~= nil or j < common.MAXIMUM_PEERS
378                 if peer_choosed ~= nil then
379                         data.active_peers[peer_choosed.id] = peer_choosed
380                         peer_choosed.choked_upload = false
381                         send_unchoked(peer_choosed.mailbox)
382                 end
383         end
384         -- TODO: Use the leecher choke algorithm
385 end
386 --  Updates our "interested" state about peers: send "not interested" to peers
387 --  that don't have any more pieces we want.
388 function update_interested_after_receive()
389         local interested = false
390         for i,v in pairs(data.peers) do
391                 if v.am_interested then
392                         for piece,j in pairs(data.current_pieces) do
393                                 if v.bitfield ~= nil then
394                                         if v.bitfield[piece] == true then
395                                                 interested = true
396                                                 break
397                                         else
398                                         end
399                                 end
400                         end
401                         if not(interested) then
402                                 v.am_interested = false
403                                 send_not_interested(v.mailbox)
404                         end
405                 end
406         end
407 end
408 -- Find the peers that have the current interested piece and send them
409 -- the "interested" message
410 function send_interested_to_peers()
411         if data.current_piece == -1 then
412                 return
413         end
414         for i,v in pairs(data.peers) do
415                 if v.bitfield ~= nil then
416                         v.am_interested = true
417                         send_interested(v.mailbox)
418                 end
419         end
420         data.current_piece = -1
421 end 
422 -- Send a "interested" message to a peer.
423 function send_interested(mailbox)
424         simgrid.debug("Sending a INTERESTED to " .. mailbox)
425         local task = new_task("INTERESTED")
426         task:dsend(mailbox)
427 end
428 -- Send a "not interested" message to a peer.
429 function send_not_interested(mailbox)
430     simgrid.info("Sending a send_not_interested")
431         local task = new_task("NOTINTERESTED")
432         task:dsend(mailbox)
433 end
434 -- Send a handshake message to all the peers the peer has
435 function send_handshake_all()
436         for i,v in pairs(data.peers) do
437                 local task = new_task("HANDSHAKE")
438                 task:dsend(v.mailbox)
439         end
440 end
441 -- Send a "handshake" message to an user
442 function send_handshake(mailbox)
443         simgrid.debug("Sending a HANDSHAKE to " .. mailbox)
444         local task = new_task("HANDSHAKE")
445         task:dsend(mailbox)             
446 end
447 -- Send a "choked" message to a peer
448 function send_choked(mailbox)
449         simgrid.debug("Sending a CHOKE to " .. mailbox)
450         local task = new_task("CHOKE")
451         task:dsend(mailbox)     
452 end
453 -- Send a "unchoked" message to a peer
454 function send_unchoked(mailbox)
455         simgrid.debug("Sending a UNCHOKE to " .. mailbox)
456         local task = new_task("UNCHOKE")
457         task:dsend(mailbox)     
458 end
459 -- Send a "HAVE" message to all peers we are connected to
460 function send_have(piece)
461       simgrid.debug("Sending a HAVE message")
462         for i,v in pairs(data.peers) do
463                 local task = new_task("HAVE")
464                 task.piece = piece
465                 task:dsend(v.mailbox)
466         end
467 end
468 -- Send request messages to a peer that have unchoked us        
469 function send_requests_to_peer(remote_peer)
470     simgrid.debug("Sending a request to peer " .. remote_peer.mailbox)
471         for i,v in pairs(data.current_pieces) do
472                 send_request(remote_peer.mailbox,i)
473         end
474 end
475 -- Send a bitfield message to a peer
476 function send_bitfield(mailbox)
477         simgrid.debug("Sending a BITFIELD to " .. mailbox)
478         local task = new_task("BITFIELD")
479         task.bitfield = data.bitfield
480         task:dsend(mailbox)
481 end
482 -- Send a "request" message to a pair, containing a request for a piece
483 function send_request(mailbox, piece)
484         simgrid.debug("Sending a REQUEST to " .. mailbox .. " for " .. piece)
485         local task =  new_task("REQUEST")
486         task.piece = piece
487         task:dsend(mailbox)
488 end     
489 -- Send a "piece" messageto a pair, containing a piece of the file
490 function send_piece(mailbox, piece, stalled)
491         simgrid.debug("Sending the PIECE " .. piece .. " to " .. mailbox)
492         local task = new_task("PIECE")
493         task.piece = piece
494         task.stalled = stalled
495         task:dsend(mailbox)     
496 end
497 function new_task(type)
498         local task = simgrid.task.new(type, 0, common.MESSAGE_SIZE)
499         task.type = type
500         task.mailbox = data.mailbox
501         task.peer_id = data.id  
502         return task
503 end
504 function get_status()
505         local s = ""
506         for i,v in pairs(data.bitfield) do
507                 if v == true then
508                          s = s .. '1'
509                 else
510                         s = s .. '0'
511                 end
512         end
513         return s
514 end