1 -- A SimGrid Lua implementation of the Bittorrent protocol.
15 TRACKER_QUERY_INTERVAL = 1000,
16 TRACKER_COMM_SIZE = 0.01,
18 GET_PEERS_TIMEOUT = 10000,
20 MAX_UNCHOKED_PEERS = 4,
21 UPDATE_CHOKED_INTERVAL = 50,
31 if #args ~= 2 and #args ~= 3 then
32 simgrid.info("Wrong number of arguments")
35 -- Setting the peer data
37 -- Retrieving the peer id
38 id = tonumber(args[1]),
39 mailbox = tostring(tonumber(args[1])),
40 mailbox_tracker = "tracker" .. args[1],
48 deadline = tonumber(args[2]),
51 simgrid.info("Hi, I'm joining the network with id " .. data.id)
53 -- Checking if the peer is a seed
54 local bitfield_value = false
55 if args[3] == "1" then
56 data.pieces = common.PIECES_COUNT
61 -- Building the peer bitfield and the pieces list
62 for i = 1, common.FILE_PIECES do
63 data.pieces_count[i] = 0
64 data.bitfield[i] = bitfield_value
67 if get_peers_data() == true then
68 data.comm_received = simgrid.task.irecv(data.mailbox)
69 if has_finished() then
77 simgrid.info("Couldn't contact the tracker")
79 simgrid.info("My status is now " .. get_status())
81 -- Peer main loop when it is leeching
83 simgrid.info("Start downloading.")
84 local now = simgrid.get_clock()
85 local next_choked_update = now + common.UPDATE_CHOKED_INTERVAL;
86 -- Send a "handshake" message to all the peers it got
87 -- it couldn't have gotten more than 50 peers anyway)
89 -- Wait for at leaast one bitfield message
92 simgrid.info("Starting main leech loop")
94 while now < data.deadline and data.pieces < common.FILE_PIECES do
95 task, err = data.comm_received:test()
98 data.comm_received = simgrid.task.irecv(data.mailbox)
99 now = simgrid.get_clock()
101 data.comm_received = simgrid.task.irecv(data.mailbox)
103 -- If the user has a pending interesting
104 if data.current_piece ~= -1 then
105 send_interested_to_peers()
107 if table.getn(data.current_pieces) < common.MAX_PIECES then
108 update_current_piece()
111 -- We don't execute the choke algorithm if we don't already have a piece
112 if now >= next_choked_update and data.pieces > 0 then
113 update_choked_peers()
114 next_choked_update = next_choked_update + common.UPDATE_CHOKED_INTERVAL
115 now = simgrid.get_clock()
117 simgrid.process.sleep(1)
118 now = simgrid.get_clock()
123 -- Peer main loop when it is seeding
125 local now = simgrid.get_clock()
126 local next_choked_update = now + common.UPDATE_CHOKED_INTERVAL;
127 simgrid.debug("Start seeding.")
128 -- Start the main seed loop
129 while now < data.deadline do
130 task, err = data.comm_received:test()
133 data.comm_received = simgrid.task.irecv(data.mailbox)
134 now = simgrid.get_clock()
136 data.comm_received = simgrid.task.irecv(data.mailbox)
138 if now >= next_choked_update then
139 update_choked_peers()
140 next_choked_update = next_choked_update + common.UPDATE_CHOKED_INTERVAL
141 now = simgrid.get_clock()
143 simgrid.process.sleep(1)
144 now = simgrid.get_clock()
149 -- Retrieve the peers list from the tracker
150 function get_peers_data()
151 local success = false
152 local now = simgrid.get_clock()
153 local timeout = now + common.GET_PEERS_TIMEOUT
155 local task_send = simgrid.task.new("", 0, common.MESSAGE_SIZE)
156 task_send.type = "REQUEST"
157 task_send.peer_id = data.id
158 task_send.mailbox = data.mailbox_tracker
160 while not(success) and now < timeout do
161 simgrid.debug("Sending a peer request to the tracker.")
162 if task_send:send("tracker") then
166 now = simgrid.get_clock()
168 -- Wait for the answer
169 local comm_received = simgrid.task.irecv(data.mailbox_tracker)
170 while not(success) and now < timeout do
171 local task_received = comm_received:wait(timeout)
172 comm_received = simgrid.task.irecv(data.mailbox_tracker)
173 if task_received then
174 simgrid.info("Received an answer from the tracker with " .. #task_received.peers .. " peers inside")
175 -- Add what we received to our peer list
176 for i,v in pairs(task_received.peers) do
178 --Add the peer to our list and build its data
181 peer_data.bitfield = nil
182 peer_data.mailbox = tostring(v);
183 peer_data.am_interested = false
184 peer_data.interested = false
185 peer_data.choked_upload = true
186 peer_data.choked_download = true
187 data.peers[v] = peer_data
197 -- Returns if the peer has finished downloading the piece
198 function has_finished()
199 for i,v in pairs(data.bitfield) do
206 -- Handle a received message sent by another peer
207 function handle_message(task)
208 local remote_peer = data.peers[task.peer_id]
210 if task.type == "HANDSHAKE" then
211 simgrid.debug("Received a HANDSHAKE message from " .. task.mailbox)
212 -- Check if the peer is in our connection list
213 if data.peers[task.peer_id] == nil then
215 peer_data.mailbox = task.mailbox
216 peer_data.id = task.peer_id
217 peer_data.am_interested = false
218 peer_data.interested = false
219 peer_data.choked_upload = true
220 peer_data.choked_download = true
221 peer_data.bitfield = nil
222 data.peers[task.peer_id] = peer_data
223 send_handshake(task.mailbox)
225 -- Send our bitfield to the peer
226 send_bitfield(task.mailbox)
227 elseif task.type == "BITFIELD" then
228 simgrid.debug("Received a BITFIELD from " .. task.mailbox)
229 -- Update the pieces list
230 update_piece_count_from_bitfield(task.bitfield)
231 -- Update the current piece
232 if data.current_piece == -1 and data.pieces < common.FILE_PIECES then
233 update_current_piece()
235 data.peers[task.peer_id].bitfield = task.bitfield
236 elseif task.type == "INTERESTED" then
237 simgrid.debug("Received an INTERESTED message from " .. task.mailbox)
238 data.peers[task.peer_id].interested = true
239 elseif task.type == "NOTINTERESTED" then
240 simgrid.debug("Received an NOTINTERESTED message from " .. task.mailbox)
241 data.peers[task.peer_id].interested = false
242 elseif task.type == "UNCHOKE" then
243 simgrid.debug("Received an UNCHOKE message from " .. task.mailbox)
244 data.peers[task.peer_id].choked_download = false
245 send_requests_to_peer(data.peers[task.peer_id])
246 elseif task.type == "CHOKE" then
247 simgrid.debug("Recevied a CHOKE message from " .. task.mailbox)
248 data.peers[task.peer_id].choked_download = true
249 elseif task.type == "HAVE" then
250 local remote_peer = data.peers[task.peer_id]
251 if remote_peer == nil or remote_peer.bitfield == nil then
254 simgrid.debug("Received a HAVE message from " .. task.mailbox)
255 data.pieces_count[task.piece] = data.pieces_count[task.piece] + 1
256 -- Send interested message to the peer if he has what we want
257 if not(remote_peer.am_interested) and data.current_pieces[task.piece] ~= nil then
258 remote_peer.am_interested = true
259 send_interested(remote_peer.mailbox)
261 if data.current_pieces[task.piece] ~= nil then
262 send_request(task.mailbox,task.piece)
264 elseif task.type == "REQUEST" then
265 simgrid.debug("Received REQUEST from " .. task.mailbox .. " for " .. task.piece)
266 local remote_peer = data.peers[task.peer_id]
267 if remote_peer.choked_upload == false then
268 if data.bitfield[task.piece] == true then
269 send_piece(task.mailbox,task.piece,false)
272 elseif task.type == "PIECE" then
273 if task.stalled == true then
274 simgrid.debug("The received piece is stalled")
276 simgrid.debug("Received piece " .. task.piece .. " from " .. task.mailbox)
277 if data.bitfield[task.piece] ~= true then
278 data.pieces_requested = data.pieces_requested - 1
279 -- Removing the piece from our piece list
280 data.current_pieces[task.piece] = nil
281 data.bitfield[task.piece] = true
282 data.pieces = data.pieces + 1
283 simgrid.debug("My status is now:" .. get_status())
284 -- Sending the information to all the peers we are connected to
285 send_have(task.piece)
286 -- Sending UNINTERESTED to the peers that doesn't have any more pieces
287 update_interested_after_receive()
292 -- Update the piece the peer is currently interested in.
293 -- There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
294 -- If the peer has less than 3 pieces, he chooses a piece at random.
295 -- If the peer has more than pieces, he downloads the pieces that are the less
297 function update_current_piece()
298 if data.pieces_requested >= (common.FILE_PIECES - data.pieces) then
301 if data.pieces < 3 or true then
303 data.current_piece = math.random(1,common.FILE_PIECES)
304 -- simgrid.info("The new piece is:" .. data.current_piece)
305 until data.bitfield[data.current_piece] ~= true and data.current_pieces[data.current_piece] == nil
306 data.current_pieces[data.current_piece] = true
307 data.pieces_requested = data.pieces_requested + 1
311 -- Updates the list of who has a piece from a bitfield
312 function update_piece_count_from_bitfield(bitfield)
313 for i,v in pairs(bitfield) do
315 data.pieces_count[i] = data.pieces_count[i] + 1
319 -- Wait for the node to receive interesting bitfield messages (ie: non empty)
320 function wait_for_pieces()
321 local finished = false
322 local now = simgrid.get_clock()
324 while now < data.deadline and not(finished) do
325 task = data.comm_received:wait(common.TIMEOUT_MESSAGE)
328 if data.current_piece ~= -1 then
332 data.comm_received = simgrid.task.irecv(data.mailbox)
335 -- Update the list of current choked and unchoked peers, using the
337 function update_choked_peers()
338 data.round = (data.round + 1) % 3
339 -- Remove a peer from the list
340 for i,v in pairs(data.active_peers) do
341 data.active_peers[i] = nil
342 send_choked(v.mailbox)
345 -- Random optimistic unchoking
348 for key, value in pairs(data.peers) do
349 values[#values + 1] = value
351 local peer_choosed = nil
355 peer_choosed = values[math.random(#values)]
356 if peer_choosed.interested ~= true then
360 until peer_choosed ~= nil or j < common.MAXIMUM_PEERS
361 if peer_choosed ~= nil then
362 data.active_peers[peer_choosed.id] = peer_choosed
363 peer_choosed.choked_upload = false
364 send_unchoked(peer_choosed.mailbox)
367 -- TODO: Use the leecher choke algorithm
369 -- Updates our "interested" state about peers: send "not interested" to peers
370 -- that don't have any more pieces we want.
371 function update_interested_after_receive()
372 local interested = false
373 for i,v in pairs(data.peers) do
374 if v.am_interested then
375 for piece,j in pairs(data.current_pieces) do
376 if v.bitfield ~= nil then
377 if v.bitfield[piece] == true then
384 if not(interested) then
385 v.am_interested = false
386 send_not_interested(v.mailbox)
391 -- Find the peers that have the current interested piece and send them
392 -- the "interested" message
393 function send_interested_to_peers()
394 if data.current_piece == -1 then
397 for i,v in pairs(data.peers) do
398 if v.bitfield ~= nil then
399 v.am_interested = true
400 send_interested(v.mailbox)
403 data.current_piece = -1
405 -- Send a "interested" message to a peer.
406 function send_interested(mailbox)
407 simgrid.debug("Sending a INTERESTED to " .. mailbox)
408 local task = new_task("INTERESTED")
411 -- Send a "not interested" message to a peer.
412 function send_not_interested(mailbox)
413 local task = new_task("NOTINTERESTED")
416 -- Send a handshake message to all the peers the peer has
417 function send_handshake_all()
418 for i,v in pairs(data.peers) do
419 local task = new_task("HANDSHAKE")
420 task:dsend(v.mailbox)
423 -- Send a "handshake" message to an user
424 function send_handshake(mailbox)
425 simgrid.debug("Sending a HANDSHAKE to " .. mailbox)
426 local task = new_task("HANDSHAKE")
429 -- Send a "choked" message to a peer
430 function send_choked(mailbox)
431 simgrid.debug("Sending a CHOKE to " .. mailbox)
432 local task = new_task("CHOKE")
435 -- Send a "unchoked" message to a peer
436 function send_unchoked(mailbox)
437 simgrid.debug("Sending a UNCHOKE to " .. mailbox)
438 local task = new_task("UNCHOKE")
441 -- Send a "HAVE" message to all peers we are connected to
442 function send_have(piece)
443 for i,v in pairs(data.peers) do
444 local task = new_task("HAVE")
446 task:dsend(v.mailbox)
449 -- Send request messages to a peer that have unchoked us
450 function send_requests_to_peer(remote_peer)
451 for i,v in pairs(data.current_pieces) do
452 send_request(remote_peer.mailbox,i)
455 -- Send a bitfield message to a peer
456 function send_bitfield(mailbox)
457 simgrid.debug("Sending a BITFIELD to " .. mailbox)
458 local task = new_task("BITFIELD")
459 task.bitfield = data.bitfield
462 -- Send a "request" message to a pair, containing a request for a piece
463 function send_request(mailbox, piece)
464 simgrid.debug("Sending a REQUEST to " .. mailbox .. " for " .. piece)
465 local task = new_task("REQUEST")
469 -- Send a "piece" messageto a pair, containing a piece of the file
470 function send_piece(mailbox, piece, stalled)
471 simgrid.debug("Sending the PIECE " .. piece .. " to " .. mailbox)
472 local task = new_task("PIECE")
474 task.stalled = stalled
477 function new_task(type)
478 local task = simgrid.task.new("", 0, common.MESSAGE_SIZE)
480 task.mailbox = data.mailbox
481 task.peer_id = data.id
484 function get_status()
486 for i,v in pairs(data.bitfield) do