1 -- Copyright (c) 2012, 2014. The SimGrid Team.
2 -- All rights reserved.
4 -- This program is free software; you can redistribute it and/or modify it
5 -- under the terms of the license (GNU LGPL) which comes with this package.
7 -- A SimGrid Lua implementation of the Bittorrent protocol.
13 FILE_PIECE_SIZE = 512,
21 TRACKER_QUERY_INTERVAL = 1000,
22 TRACKER_COMM_SIZE = 0.01,
24 GET_PEERS_TIMEOUT = 10000,
26 MAX_UNCHOKED_PEERS = 4,
27 UPDATE_CHOKED_INTERVAL = 50,
37 if #args ~= 2 and #args ~= 3 then
38 simgrid.info("Wrong number of arguments")
41 -- Setting the peer data
43 -- Retrieving the peer id
44 id = tonumber(args[1]),
45 mailbox = tostring(tonumber(args[1])),
46 mailbox_tracker = "tracker" .. args[1],
54 deadline = tonumber(args[2]),
57 simgrid.info("Hi, I'm joining the network with id " .. data.id)
59 -- Checking if the peer is a seed
60 local bitfield_value = false
61 if args[3] == "1" then
62 data.pieces = common.PIECES_COUNT
67 -- Building the peer bitfield and the pieces list
68 for i = 1, common.FILE_PIECES do
69 data.pieces_count[i] = 0
70 data.bitfield[i] = bitfield_value
73 if get_peers_data() == true then
74 data.comm_received = simgrid.task.irecv(data.mailbox)
75 if has_finished() then
83 simgrid.info("Couldn't contact the tracker")
85 simgrid.info("My status is now " .. get_status())
87 -- Peer main loop when it is leeching
89 simgrid.info("Start downloading.")
90 local now = simgrid.get_clock()
91 local next_choked_update = now + common.UPDATE_CHOKED_INTERVAL;
92 -- Send a "handshake" message to all the peers it got
93 -- it couldn't have gotten more than 50 peers anyway)
95 -- Wait for at leaast one bitfield message
98 simgrid.info("Starting main leech loop")
100 while now < data.deadline and data.pieces < common.FILE_PIECES do
101 task, err = data.comm_received:test()
104 data.comm_received = simgrid.task.irecv(data.mailbox)
105 now = simgrid.get_clock()
107 data.comm_received = simgrid.task.irecv(data.mailbox)
109 -- If the user has a pending interesting
110 if data.current_piece ~= -1 then
111 send_interested_to_peers()
113 if #data.current_pieces < common.MAX_PIECES then
114 update_current_piece()
117 -- We don't execute the choke algorithm if we don't already have a piece
118 if now >= next_choked_update and data.pieces > 0 then
119 update_choked_peers()
120 next_choked_update = next_choked_update + common.UPDATE_CHOKED_INTERVAL
121 now = simgrid.get_clock()
123 simgrid.process.sleep(1)
124 now = simgrid.get_clock()
129 -- Peer main loop when it is seeding
131 local now = simgrid.get_clock()
132 local next_choked_update = now + common.UPDATE_CHOKED_INTERVAL;
133 simgrid.debug("Start seeding.")
134 -- Start the main seed loop
135 while now < data.deadline do
136 task, err = data.comm_received:test()
139 data.comm_received = simgrid.task.irecv(data.mailbox)
140 now = simgrid.get_clock()
142 data.comm_received = simgrid.task.irecv(data.mailbox)
144 if now >= next_choked_update then
145 update_choked_peers()
146 next_choked_update = next_choked_update + common.UPDATE_CHOKED_INTERVAL
147 now = simgrid.get_clock()
149 simgrid.process.sleep(1)
150 now = simgrid.get_clock()
155 -- Retrieve the peers list from the tracker
156 function get_peers_data()
157 local success = false
158 local now = simgrid.get_clock()
159 local timeout = now + common.GET_PEERS_TIMEOUT
161 local task_send = simgrid.task.new("", 0, common.MESSAGE_SIZE)
162 task_send.type = "REQUEST"
163 task_send.peer_id = data.id
164 task_send.mailbox = data.mailbox_tracker
166 while not(success) and now < timeout do
167 simgrid.debug("Sending a peer request to the tracker.")
168 if task_send:send("tracker") then
172 now = simgrid.get_clock()
174 -- Wait for the answer
175 local comm_received = simgrid.task.irecv(data.mailbox_tracker)
176 while not(success) and now < timeout do
177 local task_received = comm_received:wait(timeout)
178 comm_received = simgrid.task.irecv(data.mailbox_tracker)
179 if task_received then
180 simgrid.info("Received an answer from the tracker with " .. #task_received.peers .. " peers inside")
181 -- Add what we received to our peer list
182 for i,v in pairs(task_received.peers) do
184 --Add the peer to our list and build its data
186 peer_data.id = math.tointeger(v);
187 peer_data.bitfield = nil
188 peer_data.mailbox = math.tointeger(v);
189 peer_data.am_interested = false
190 peer_data.interested = false
191 peer_data.choked_upload = true
192 peer_data.choked_download = true
193 data.peers[v] = peer_data
194 simgrid.info("Added " .. v)
198 mt.__len = function(obj)
200 for j,k in pairs(obj) do
205 setmetatable(data.peers, mt)
213 -- Returns if the peer has finished downloading the piece
214 function has_finished()
215 for i,v in pairs(data.bitfield) do
222 -- Handle a received message sent by another peer
223 function handle_message(task)
224 local remote_peer = data.peers[task.peer_id]
226 if task.type == "HANDSHAKE" then
227 simgrid.debug("Received a HANDSHAKE message from " .. task.mailbox)
228 -- Check if the peer is in our connection list
229 if data.peers[task.peer_id] == nil then
231 peer_data.mailbox = task.mailbox
232 peer_data.id = task.peer_id
233 peer_data.am_interested = false
234 peer_data.interested = false
235 peer_data.choked_upload = true
236 peer_data.choked_download = true
237 peer_data.bitfield = nil
238 data.peers[task.peer_id] = peer_data
239 send_handshake(task.mailbox)
241 -- Send our bitfield to the peer
242 send_bitfield(task.mailbox)
243 elseif task.type == "BITFIELD" then
244 simgrid.debug("Received a BITFIELD from " .. task.mailbox)
245 -- Update the pieces list
246 update_piece_count_from_bitfield(task.bitfield)
247 -- Update the current piece
248 if data.current_piece == -1 and data.pieces < common.FILE_PIECES then
249 update_current_piece()
251 data.peers[task.peer_id].bitfield = task.bitfield
252 elseif task.type == "INTERESTED" then
253 simgrid.debug("Received an INTERESTED message from " .. task.mailbox)
254 data.peers[task.peer_id].interested = true
255 elseif task.type == "NOTINTERESTED" then
256 simgrid.debug("Received an NOTINTERESTED message from " .. task.mailbox)
257 data.peers[task.peer_id].interested = false
258 elseif task.type == "UNCHOKE" then
259 simgrid.debug("Received an UNCHOKE message from " .. task.mailbox)
260 data.peers[task.peer_id].choked_download = false
261 send_requests_to_peer(data.peers[task.peer_id])
262 elseif task.type == "CHOKE" then
263 simgrid.debug("Recevied a CHOKE message from " .. task.mailbox)
264 data.peers[task.peer_id].choked_download = true
265 elseif task.type == "HAVE" then
266 local remote_peer = data.peers[task.peer_id]
267 if remote_peer == nil or remote_peer.bitfield == nil then
270 simgrid.debug("Received a HAVE message from " .. task.mailbox)
271 data.pieces_count[task.piece] = data.pieces_count[task.piece] + 1
272 -- Send interested message to the peer if he has what we want
273 if not(remote_peer.am_interested) and data.current_pieces[task.piece] ~= nil then
274 remote_peer.am_interested = true
275 send_interested(remote_peer.mailbox)
277 if data.current_pieces[task.piece] ~= nil then
278 send_request(task.mailbox,task.piece)
280 elseif task.type == "REQUEST" then
281 simgrid.debug("Received REQUEST from " .. task.mailbox .. " for " .. task.piece)
282 local remote_peer = data.peers[task.peer_id]
283 if remote_peer.choked_upload == false then
284 if data.bitfield[task.piece] == true then
285 send_piece(task.mailbox,task.piece,false)
288 elseif task.type == "PIECE" then
289 task.piece = math.tointeger(task.piece)
290 if task.stalled == true then
291 simgrid.debug("The received piece is stalled")
293 simgrid.debug("Received piece " .. task.piece .. " from " .. task.mailbox)
294 if data.bitfield[task.piece] ~= true then
295 data.pieces_requested = data.pieces_requested - 1
296 -- Removing the piece from our piece list
297 data.current_pieces[task.piece] = nil
298 data.bitfield[task.piece] = true
299 data.pieces = data.pieces + 1
300 simgrid.debug("My status is now:" .. get_status())
301 -- Sending the information to all the peers we are connected to
302 send_have(task.piece)
303 -- Sending UNINTERESTED to the peers that doesn't have any more pieces
304 update_interested_after_receive()
309 -- Update the piece the peer is currently interested in.
310 -- There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
311 -- If the peer has less than 3 pieces, he chooses a piece at random.
312 -- If the peer has more than pieces, he downloads the pieces that are the less
314 function update_current_piece()
315 if data.pieces_requested >= (common.FILE_PIECES - data.pieces) then
318 if data.pieces < 3 or true then
320 data.current_piece = math.random(1,common.FILE_PIECES)
321 -- simgrid.info("The new piece is:" .. data.current_piece)
322 until data.bitfield[data.current_piece] ~= true and data.current_pieces[data.current_piece] == nil
323 data.current_pieces[data.current_piece] = true
324 data.pieces_requested = data.pieces_requested + 1
328 -- Updates the list of who has a piece from a bitfield
329 function update_piece_count_from_bitfield(bitfield)
330 for i,v in pairs(bitfield) do
332 data.pieces_count[i] = data.pieces_count[i] + 1
336 -- Wait for the node to receive interesting bitfield messages (ie: non empty)
337 function wait_for_pieces()
338 local finished = false
339 local now = simgrid.get_clock()
341 while now < data.deadline and not(finished) do
342 task = data.comm_received:wait(common.TIMEOUT_MESSAGE)
345 if data.current_piece ~= -1 then
349 data.comm_received = simgrid.task.irecv(data.mailbox)
352 -- Update the list of current choked and unchoked peers, using the
354 function update_choked_peers()
355 data.round = (data.round + 1) % 3
356 -- Remove a peer from the list
357 for i,v in pairs(data.active_peers) do
358 data.active_peers[i] = nil
359 send_choked(v.mailbox)
362 -- Random optimistic unchoking
365 for key, value in pairs(data.peers) do
366 values[#values + 1] = value
368 local peer_choosed = nil
372 peer_choosed = values[math.random(#values)]
373 if peer_choosed.interested ~= true then
377 until peer_choosed ~= nil or j < common.MAXIMUM_PEERS
378 if peer_choosed ~= nil then
379 data.active_peers[peer_choosed.id] = peer_choosed
380 peer_choosed.choked_upload = false
381 send_unchoked(peer_choosed.mailbox)
384 -- TODO: Use the leecher choke algorithm
386 -- Updates our "interested" state about peers: send "not interested" to peers
387 -- that don't have any more pieces we want.
388 function update_interested_after_receive()
389 local interested = false
390 for i,v in pairs(data.peers) do
391 if v.am_interested then
392 for piece,j in pairs(data.current_pieces) do
393 if v.bitfield ~= nil then
394 if v.bitfield[piece] == true then
401 if not(interested) then
402 v.am_interested = false
403 send_not_interested(v.mailbox)
408 -- Find the peers that have the current interested piece and send them
409 -- the "interested" message
410 function send_interested_to_peers()
411 if data.current_piece == -1 then
414 for i,v in pairs(data.peers) do
415 if v.bitfield ~= nil then
416 v.am_interested = true
417 send_interested(v.mailbox)
420 data.current_piece = -1
422 -- Send a "interested" message to a peer.
423 function send_interested(mailbox)
424 simgrid.debug("Sending a INTERESTED to " .. mailbox)
425 local task = new_task("INTERESTED")
428 -- Send a "not interested" message to a peer.
429 function send_not_interested(mailbox)
430 simgrid.info("Sending a send_not_interested")
431 local task = new_task("NOTINTERESTED")
434 -- Send a handshake message to all the peers the peer has
435 function send_handshake_all()
436 for i,v in pairs(data.peers) do
437 local task = new_task("HANDSHAKE")
438 task:dsend(v.mailbox)
441 -- Send a "handshake" message to an user
442 function send_handshake(mailbox)
443 simgrid.debug("Sending a HANDSHAKE to " .. mailbox)
444 local task = new_task("HANDSHAKE")
447 -- Send a "choked" message to a peer
448 function send_choked(mailbox)
449 simgrid.debug("Sending a CHOKE to " .. mailbox)
450 local task = new_task("CHOKE")
453 -- Send a "unchoked" message to a peer
454 function send_unchoked(mailbox)
455 simgrid.debug("Sending a UNCHOKE to " .. mailbox)
456 local task = new_task("UNCHOKE")
459 -- Send a "HAVE" message to all peers we are connected to
460 function send_have(piece)
461 simgrid.debug("Sending a HAVE message")
462 for i,v in pairs(data.peers) do
463 local task = new_task("HAVE")
465 task:dsend(v.mailbox)
468 -- Send request messages to a peer that have unchoked us
469 function send_requests_to_peer(remote_peer)
470 simgrid.debug("Sending a request to peer " .. remote_peer.mailbox)
471 for i,v in pairs(data.current_pieces) do
472 send_request(remote_peer.mailbox,i)
475 -- Send a bitfield message to a peer
476 function send_bitfield(mailbox)
477 simgrid.debug("Sending a BITFIELD to " .. mailbox)
478 local task = new_task("BITFIELD")
479 task.bitfield = data.bitfield
482 -- Send a "request" message to a pair, containing a request for a piece
483 function send_request(mailbox, piece)
484 simgrid.debug("Sending a REQUEST to " .. mailbox .. " for " .. piece)
485 local task = new_task("REQUEST")
489 -- Send a "piece" messageto a pair, containing a piece of the file
490 function send_piece(mailbox, piece, stalled)
491 simgrid.debug("Sending the PIECE " .. piece .. " to " .. mailbox)
492 local task = new_task("PIECE")
494 task.stalled = stalled
497 function new_task(type)
498 local task = simgrid.task.new(type, 0, common.MESSAGE_SIZE)
500 task.mailbox = data.mailbox
501 task.peer_id = data.id
504 function get_status()
506 for i,v in pairs(data.bitfield) do