-- A SimGrid Lua implementation of the Bittorrent protocol. require("simgrid") -- Common Constants common = { FILE_SIZE = 5120, FILE_PIECE_SIZE = 512, FILE_PIECES = 10, PIECE_COMM_SIZE = 1, MESSAGE_SIZE = 1, MAXIMUM_PEERS = 50, TRACKER_QUERY_INTERVAL = 1000, TRACKER_COMM_SIZE = 0.01, GET_PEERS_TIMEOUT = 10000, TIMEOUT_MESSAGE = 10, MAX_UNCHOKED_PEERS = 4, UPDATE_CHOKED_INTERVAL = 50, MAX_PIECES = 1, } -- Peer main function function peer(...) local args = {...} if #args ~= 2 and #args ~= 3 then simgrid.info("Wrong number of arguments") end -- Setting the peer data data = { -- Retrieving the peer id id = tonumber(args[1]), mailbox = tostring(tonumber(args[1])), mailbox_tracker = "tracker" .. args[1], peers = {}, active_peers = {}, current_pieces = {}, pieces_requested = 0, bitfield = {}, pieces_count = {}, pieces = 0, deadline = tonumber(args[2]), round = 0 } simgrid.info("Hi, I'm joining the network with id " .. data.id) -- Checking if the peer is a seed local bitfield_value = false if args[3] == "1" then data.pieces = common.PIECES_COUNT bitfield_value = true else data.pieces = 0 end -- Building the peer bitfield and the pieces list for i = 1, common.FILE_PIECES do data.pieces_count[i] = 0 data.bitfield[i] = bitfield_value end if get_peers_data() == true then data.comm_received = simgrid.task.irecv(data.mailbox) if has_finished() then send_handshake_all() seed_loop() else leech_loop() seed_loop() end else simgrid.info("Couldn't contact the tracker") end simgrid.info("My status is now " .. get_status()) end -- Peer main loop when it is leeching function leech_loop() simgrid.info("Start downloading.") local now = simgrid.get_clock() local next_choked_update = now + common.UPDATE_CHOKED_INTERVAL; -- Send a "handshake" message to all the peers it got -- it couldn't have gotten more than 50 peers anyway) send_handshake_all() -- Wait for at leaast one bitfield message wait_for_pieces() simgrid.info("Starting main leech loop") local task, err while now < data.deadline and data.pieces < common.FILE_PIECES do task, err = data.comm_received:test() if task then handle_message(task) data.comm_received = simgrid.task.irecv(data.mailbox) now = simgrid.get_clock() elseif err then data.comm_received = simgrid.task.irecv(data.mailbox) else -- If the user has a pending interesting if data.current_piece ~= -1 then send_interested_to_peers() else if table.getn(data.current_pieces) < common.MAX_PIECES then update_current_piece() end end -- We don't execute the choke algorithm if we don't already have a piece if now >= next_choked_update and data.pieces > 0 then update_choked_peers() next_choked_update = next_choked_update + common.UPDATE_CHOKED_INTERVAL now = simgrid.get_clock() else simgrid.process.sleep(1) now = simgrid.get_clock() end end end end -- Peer main loop when it is seeding function seed_loop() local now = simgrid.get_clock() local next_choked_update = now + common.UPDATE_CHOKED_INTERVAL; simgrid.debug("Start seeding.") -- Start the main seed loop while now < data.deadline do task, err = data.comm_received:test() if task then handle_message(task) data.comm_received = simgrid.task.irecv(data.mailbox) now = simgrid.get_clock() elseif err then data.comm_received = simgrid.task.irecv(data.mailbox) else if now >= next_choked_update then update_choked_peers() next_choked_update = next_choked_update + common.UPDATE_CHOKED_INTERVAL now = simgrid.get_clock() else simgrid.process.sleep(1) now = simgrid.get_clock() end end end end -- Retrieve the peers list from the tracker function get_peers_data() local success = false local now = simgrid.get_clock() local timeout = now + common.GET_PEERS_TIMEOUT -- Build the task local task_send = simgrid.task.new("", 0, common.MESSAGE_SIZE) task_send.type = "REQUEST" task_send.peer_id = data.id task_send.mailbox = data.mailbox_tracker -- Send the task while not(success) and now < timeout do simgrid.debug("Sending a peer request to the tracker.") if task_send:send("tracker") then success = true end end now = simgrid.get_clock() success = false -- Wait for the answer local comm_received = simgrid.task.irecv(data.mailbox_tracker) while not(success) and now < timeout do local task_received = comm_received:wait(timeout) comm_received = simgrid.task.irecv(data.mailbox_tracker) if task_received then simgrid.info("Received an answer from the tracker with " .. #task_received.peers .. " peers inside") -- Add what we received to our peer list for i,v in pairs(task_received.peers) do if v ~= data.id then --Add the peer to our list and build its data local peer_data = {} peer_data.id = v; peer_data.bitfield = nil peer_data.mailbox = tostring(v); peer_data.am_interested = false peer_data.interested = false peer_data.choked_upload = true peer_data.choked_download = true data.peers[v] = peer_data end end else success = false end success = true end return success; end -- Returns if the peer has finished downloading the piece function has_finished() for i,v in pairs(data.bitfield) do if v == false then return false end end return true end -- Handle a received message sent by another peer function handle_message(task) local remote_peer = data.peers[task.peer_id] if task.type == "HANDSHAKE" then simgrid.debug("Received a HANDSHAKE message from " .. task.mailbox) -- Check if the peer is in our connection list if data.peers[task.peer_id] == nil then local peer_data = {} peer_data.mailbox = task.mailbox peer_data.id = task.peer_id peer_data.am_interested = false peer_data.interested = false peer_data.choked_upload = true peer_data.choked_download = true peer_data.bitfield = nil data.peers[task.peer_id] = peer_data send_handshake(task.mailbox) end -- Send our bitfield to the peer send_bitfield(task.mailbox) elseif task.type == "BITFIELD" then simgrid.debug("Received a BITFIELD from " .. task.mailbox) -- Update the pieces list update_piece_count_from_bitfield(task.bitfield) -- Update the current piece if data.current_piece == -1 and data.pieces < common.FILE_PIECES then update_current_piece() end data.peers[task.peer_id].bitfield = task.bitfield elseif task.type == "INTERESTED" then simgrid.debug("Received an INTERESTED message from " .. task.mailbox) data.peers[task.peer_id].interested = true elseif task.type == "NOTINTERESTED" then simgrid.debug("Received an NOTINTERESTED message from " .. task.mailbox) data.peers[task.peer_id].interested = false elseif task.type == "UNCHOKE" then simgrid.debug("Received an UNCHOKE message from " .. task.mailbox) data.peers[task.peer_id].choked_download = false send_requests_to_peer(data.peers[task.peer_id]) elseif task.type == "CHOKE" then simgrid.debug("Recevied a CHOKE message from " .. task.mailbox) data.peers[task.peer_id].choked_download = true elseif task.type == "HAVE" then local remote_peer = data.peers[task.peer_id] if remote_peer == nil or remote_peer.bitfield == nil then return end simgrid.debug("Received a HAVE message from " .. task.mailbox) data.pieces_count[task.piece] = data.pieces_count[task.piece] + 1 -- Send interested message to the peer if he has what we want if not(remote_peer.am_interested) and data.current_pieces[task.piece] ~= nil then remote_peer.am_interested = true send_interested(remote_peer.mailbox) end if data.current_pieces[task.piece] ~= nil then send_request(task.mailbox,task.piece) end elseif task.type == "REQUEST" then simgrid.debug("Received REQUEST from " .. task.mailbox .. " for " .. task.piece) local remote_peer = data.peers[task.peer_id] if remote_peer.choked_upload == false then if data.bitfield[task.piece] == true then send_piece(task.mailbox,task.piece,false) end end elseif task.type == "PIECE" then if task.stalled == true then simgrid.debug("The received piece is stalled") else simgrid.debug("Received piece " .. task.piece .. " from " .. task.mailbox) if data.bitfield[task.piece] ~= true then data.pieces_requested = data.pieces_requested - 1 -- Removing the piece from our piece list data.current_pieces[task.piece] = nil data.bitfield[task.piece] = true data.pieces = data.pieces + 1 simgrid.debug("My status is now:" .. get_status()) -- Sending the information to all the peers we are connected to send_have(task.piece) -- Sending UNINTERESTED to the peers that doesn't have any more pieces update_interested_after_receive() end end end end -- Update the piece the peer is currently interested in. -- There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole : -- If the peer has less than 3 pieces, he chooses a piece at random. -- If the peer has more than pieces, he downloads the pieces that are the less -- replicated function update_current_piece() if data.pieces_requested >= (common.FILE_PIECES - data.pieces) then return end if data.pieces < 3 or true then repeat data.current_piece = math.random(1,common.FILE_PIECES) -- simgrid.info("The new piece is:" .. data.current_piece) until data.bitfield[data.current_piece] ~= true and data.current_pieces[data.current_piece] == nil data.current_pieces[data.current_piece] = true data.pieces_requested = data.pieces_requested + 1 end end -- Updates the list of who has a piece from a bitfield function update_piece_count_from_bitfield(bitfield) for i,v in pairs(bitfield) do if v == true then data.pieces_count[i] = data.pieces_count[i] + 1 end end end -- Wait for the node to receive interesting bitfield messages (ie: non empty) function wait_for_pieces() local finished = false local now = simgrid.get_clock() local task while now < data.deadline and not(finished) do task = data.comm_received:wait(common.TIMEOUT_MESSAGE) if task then handle_message(task) if data.current_piece ~= -1 then finished = true end end data.comm_received = simgrid.task.irecv(data.mailbox) end end -- Update the list of current choked and unchoked peers, using the -- choke algorithm function update_choked_peers() data.round = (data.round + 1) % 3 -- Remove a peer from the list for i,v in pairs(data.active_peers) do data.active_peers[i] = nil send_choked(v.mailbox) break end -- Random optimistic unchoking if true then local values = {} for key, value in pairs(data.peers) do values[#values + 1] = value end local peer_choosed = nil local j = 0 repeat peer_choosed = values[math.random(#values)] if peer_choosed.interested ~= true then peer_choosed = nil end j = j + 1 until peer_choosed ~= nil or j < common.MAXIMUM_PEERS if peer_choosed ~= nil then data.active_peers[peer_choosed.id] = peer_choosed peer_choosed.choked_upload = false send_unchoked(peer_choosed.mailbox) end end -- TODO: Use the leecher choke algorithm end -- Updates our "interested" state about peers: send "not interested" to peers -- that don't have any more pieces we want. function update_interested_after_receive() local interested = false for i,v in pairs(data.peers) do if v.am_interested then for piece,j in pairs(data.current_pieces) do if v.bitfield ~= nil then if v.bitfield[piece] == true then interested = true break else end end end if not(interested) then v.am_interested = false send_not_interested(v.mailbox) end end end end -- Find the peers that have the current interested piece and send them -- the "interested" message function send_interested_to_peers() if data.current_piece == -1 then return end for i,v in pairs(data.peers) do if v.bitfield ~= nil then v.am_interested = true send_interested(v.mailbox) end end data.current_piece = -1 end -- Send a "interested" message to a peer. function send_interested(mailbox) simgrid.debug("Sending a INTERESTED to " .. mailbox) local task = new_task("INTERESTED") task:dsend(mailbox) end -- Send a "not interested" message to a peer. function send_not_interested(mailbox) local task = new_task("NOTINTERESTED") task:dsend(mailbox) end -- Send a handshake message to all the peers the peer has function send_handshake_all() for i,v in pairs(data.peers) do local task = new_task("HANDSHAKE") task:dsend(v.mailbox) end end -- Send a "handshake" message to an user function send_handshake(mailbox) simgrid.debug("Sending a HANDSHAKE to " .. mailbox) local task = new_task("HANDSHAKE") task:dsend(mailbox) end -- Send a "choked" message to a peer function send_choked(mailbox) simgrid.debug("Sending a CHOKE to " .. mailbox) local task = new_task("CHOKE") task:dsend(mailbox) end -- Send a "unchoked" message to a peer function send_unchoked(mailbox) simgrid.debug("Sending a UNCHOKE to " .. mailbox) local task = new_task("UNCHOKE") task:dsend(mailbox) end -- Send a "HAVE" message to all peers we are connected to function send_have(piece) for i,v in pairs(data.peers) do local task = new_task("HAVE") task.piece = piece task:dsend(v.mailbox) end end -- Send request messages to a peer that have unchoked us function send_requests_to_peer(remote_peer) for i,v in pairs(data.current_pieces) do send_request(remote_peer.mailbox,i) end end -- Send a bitfield message to a peer function send_bitfield(mailbox) simgrid.debug("Sending a BITFIELD to " .. mailbox) local task = new_task("BITFIELD") task.bitfield = data.bitfield task:dsend(mailbox) end -- Send a "request" message to a pair, containing a request for a piece function send_request(mailbox, piece) simgrid.debug("Sending a REQUEST to " .. mailbox .. " for " .. piece) local task = new_task("REQUEST") task.piece = piece task:dsend(mailbox) end -- Send a "piece" messageto a pair, containing a piece of the file function send_piece(mailbox, piece, stalled) simgrid.debug("Sending the PIECE " .. piece .. " to " .. mailbox) local task = new_task("PIECE") task.piece = piece task.stalled = stalled task:dsend(mailbox) end function new_task(type) local task = simgrid.task.new("", 0, common.MESSAGE_SIZE) task.type = type task.mailbox = data.mailbox task.peer_id = data.id return task end function get_status() local s = "" for i,v in pairs(data.bitfield) do if v == true then s = s .. '1' else s = s .. '0' end end return s end