From: Samuel Lepetit Date: Fri, 22 Jun 2012 15:01:29 +0000 (+0200) Subject: Add bittorrent example in Lua X-Git-Tag: v3_8~480 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/7bc0eaf70cf34961d99866cdb9fee612a3d9130a Add bittorrent example in Lua --- diff --git a/examples/lua/bittorrent/bittorrent.lua b/examples/lua/bittorrent/bittorrent.lua new file mode 100644 index 0000000000..26381a7fe0 --- /dev/null +++ b/examples/lua/bittorrent/bittorrent.lua @@ -0,0 +1,10 @@ +-- A SimGrid Lua implementation of the Bittorrent protocol. + +require("simgrid") + +require("peer") +require("tracker") + +simgrid.platform(arg[1] or "../../msg/msg_platform.xml") +simgrid.application(arg[2] or "bittorrent.xml") +simgrid.run() \ No newline at end of file diff --git a/examples/lua/bittorrent/bittorrent.xml b/examples/lua/bittorrent/bittorrent.xml new file mode 100644 index 0000000000..58627ee9c5 --- /dev/null +++ b/examples/lua/bittorrent/bittorrent.xml @@ -0,0 +1,40 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/lua/bittorrent/peer.lua b/examples/lua/bittorrent/peer.lua new file mode 100644 index 0000000000..96a20f3066 --- /dev/null +++ b/examples/lua/bittorrent/peer.lua @@ -0,0 +1,494 @@ +-- A SimGrid Lua implementation of the Bittorrent protocol. + +require("simgrid") +-- Common Constants +common = { + FILE_SIZE = 5120, + FILE_PIECE_SIZE = 512, + FILE_PIECES = 10, + + PIECE_COMM_SIZE = 1, + + MESSAGE_SIZE = 1, + MAXIMUM_PEERS = 50, + + TRACKER_QUERY_INTERVAL = 1000, + TRACKER_COMM_SIZE = 0.01, + + GET_PEERS_TIMEOUT = 10000, + TIMEOUT_MESSAGE = 10, + MAX_UNCHOKED_PEERS = 4, + UPDATE_CHOKED_INTERVAL = 50, + MAX_PIECES = 1, +} + + +-- Peer main function +function peer(...) + + local args = {...} + + if #args ~= 2 and #args ~= 3 then + simgrid.info("Wrong number of arguments") + end + + -- Setting the peer data + data = { + -- Retrieving the peer id + id = tonumber(args[1]), + mailbox = tostring(tonumber(args[1])), + mailbox_tracker = "tracker" .. args[1], + peers = {}, + active_peers = {}, + current_pieces = {}, + pieces_requested = 0, + bitfield = {}, + pieces_count = {}, + pieces = 0, + deadline = tonumber(args[2]), + round = 0 + } + simgrid.info("Hi, I'm joining the network with id " .. data.id) + + -- Checking if the peer is a seed + local bitfield_value = false + if args[3] == "1" then + data.pieces = common.PIECES_COUNT + bitfield_value = true + else + data.pieces = 0 + end + -- Building the peer bitfield and the pieces list + for i = 1, common.FILE_PIECES do + data.pieces_count[i] = 0 + data.bitfield[i] = bitfield_value + end + + if get_peers_data() == true then + data.comm_received = simgrid.task.irecv(data.mailbox) + if has_finished() then + send_handshake_all() + seed_loop() + else + leech_loop() + seed_loop() + end + else + simgrid.info("Couldn't contact the tracker") + end + simgrid.info("My status is now " .. get_status()) +end +-- Peer main loop when it is leeching +function leech_loop() + simgrid.info("Start downloading.") + local now = simgrid.get_clock() + local next_choked_update = now + common.UPDATE_CHOKED_INTERVAL; + -- Send a "handshake" message to all the peers it got + -- it couldn't have gotten more than 50 peers anyway) + send_handshake_all() + -- Wait for at leaast one bitfield message + wait_for_pieces() + + simgrid.info("Starting main leech loop") + local task, err + while now < data.deadline and data.pieces < common.FILE_PIECES do + task, err = data.comm_received:test() + if task then + handle_message(task) + data.comm_received = simgrid.task.irecv(data.mailbox) + now = simgrid.get_clock() + elseif err then + data.comm_received = simgrid.task.irecv(data.mailbox) + else + -- If the user has a pending interesting + if data.current_piece ~= -1 then + send_interested_to_peers() + else + if table.getn(data.current_pieces) < common.MAX_PIECES then + update_current_piece() + end + end + -- We don't execute the choke algorithm if we don't already have a piece + if now >= next_choked_update and data.pieces > 0 then + update_choked_peers() + next_choked_update = next_choked_update + common.UPDATE_CHOKED_INTERVAL + now = simgrid.get_clock() + else + simgrid.process.sleep(1) + now = simgrid.get_clock() + end + end + end +end +-- Peer main loop when it is seeding +function seed_loop() + local now = simgrid.get_clock() + local next_choked_update = now + common.UPDATE_CHOKED_INTERVAL; + simgrid.debug("Start seeding.") + -- Start the main seed loop + while now < data.deadline do + task, err = data.comm_received:test() + if task then + handle_message(task) + data.comm_received = simgrid.task.irecv(data.mailbox) + now = simgrid.get_clock() + elseif err then + data.comm_received = simgrid.task.irecv(data.mailbox) + else + if now >= next_choked_update then + update_choked_peers() + next_choked_update = next_choked_update + common.UPDATE_CHOKED_INTERVAL + now = simgrid.get_clock() + else + simgrid.process.sleep(1) + now = simgrid.get_clock() + end + end + end +end +-- Retrieve the peers list from the tracker +function get_peers_data() + local success = false + local now = simgrid.get_clock() + local timeout = now + common.GET_PEERS_TIMEOUT + -- Build the task + local task_send = simgrid.task.new("", 0, common.MESSAGE_SIZE) + task_send.type = "REQUEST" + task_send.peer_id = data.id + task_send.mailbox = data.mailbox_tracker + -- Send the task + while not(success) and now < timeout do + simgrid.debug("Sending a peer request to the tracker.") + if task_send:send("tracker") then + success = true + end + end + now = simgrid.get_clock() + success = false + -- Wait for the answer + local comm_received = simgrid.task.irecv(data.mailbox_tracker) + while not(success) and now < timeout do + local task_received = comm_received:wait(timeout) + comm_received = simgrid.task.irecv(data.mailbox_tracker) + if task_received then + simgrid.info("Received an answer from the tracker with " .. #task_received.peers .. " peers inside") + -- Add what we received to our peer list + for i,v in pairs(task_received.peers) do + if v ~= data.id then + --Add the peer to our list and build its data + local peer_data = {} + peer_data.id = v; + peer_data.bitfield = nil + peer_data.mailbox = tostring(v); + peer_data.am_interested = false + peer_data.interested = false + peer_data.choked_upload = true + peer_data.choked_download = true + data.peers[v] = peer_data + end + end + else + success = false + end + success = true + end + return success; +end +-- Returns if the peer has finished downloading the piece +function has_finished() + for i,v in pairs(data.bitfield) do + if v == false then + return false + end + end + return true +end +-- Handle a received message sent by another peer +function handle_message(task) + local remote_peer = data.peers[task.peer_id] + + if task.type == "HANDSHAKE" then + simgrid.debug("Received a HANDSHAKE message from " .. task.mailbox) + -- Check if the peer is in our connection list + if data.peers[task.peer_id] == nil then + local peer_data = {} + peer_data.mailbox = task.mailbox + peer_data.id = task.peer_id + peer_data.am_interested = false + peer_data.interested = false + peer_data.choked_upload = true + peer_data.choked_download = true + peer_data.bitfield = nil + data.peers[task.peer_id] = peer_data + send_handshake(task.mailbox) + end + -- Send our bitfield to the peer + send_bitfield(task.mailbox) + elseif task.type == "BITFIELD" then + simgrid.debug("Received a BITFIELD from " .. task.mailbox) + -- Update the pieces list + update_piece_count_from_bitfield(task.bitfield) + -- Update the current piece + if data.current_piece == -1 and data.pieces < common.FILE_PIECES then + update_current_piece() + end + data.peers[task.peer_id].bitfield = task.bitfield + elseif task.type == "INTERESTED" then + simgrid.debug("Received an INTERESTED message from " .. task.mailbox) + data.peers[task.peer_id].interested = true + elseif task.type == "NOTINTERESTED" then + simgrid.debug("Received an NOTINTERESTED message from " .. task.mailbox) + data.peers[task.peer_id].interested = false + elseif task.type == "UNCHOKE" then + simgrid.debug("Received an UNCHOKE message from " .. task.mailbox) + data.peers[task.peer_id].choked_download = false + send_requests_to_peer(data.peers[task.peer_id]) + elseif task.type == "CHOKE" then + simgrid.debug("Recevied a CHOKE message from " .. task.mailbox) + data.peers[task.peer_id].choked_download = true + elseif task.type == "HAVE" then + local remote_peer = data.peers[task.peer_id] + if remote_peer == nil or remote_peer.bitfield == nil then + return + end + simgrid.debug("Received a HAVE message from " .. task.mailbox) + data.pieces_count[task.piece] = data.pieces_count[task.piece] + 1 + -- Send interested message to the peer if he has what we want + if not(remote_peer.am_interested) and data.current_pieces[task.piece] ~= nil then + remote_peer.am_interested = true + send_interested(remote_peer.mailbox) + end + if data.current_pieces[task.piece] ~= nil then + send_request(task.mailbox,task.piece) + end + elseif task.type == "REQUEST" then + simgrid.debug("Received REQUEST from " .. task.mailbox .. " for " .. task.piece) + local remote_peer = data.peers[task.peer_id] + if remote_peer.choked_upload == false then + if data.bitfield[task.piece] == true then + send_piece(task.mailbox,task.piece,false) + end + end + elseif task.type == "PIECE" then + if task.stalled == true then + simgrid.debug("The received piece is stalled") + else + simgrid.debug("Received piece " .. task.piece .. " from " .. task.mailbox) + if data.bitfield[task.piece] ~= true then + data.pieces_requested = data.pieces_requested - 1 + -- Removing the piece from our piece list + data.current_pieces[task.piece] = nil + data.bitfield[task.piece] = true + data.pieces = data.pieces + 1 + simgrid.debug("My status is now:" .. get_status()) + -- Sending the information to all the peers we are connected to + send_have(task.piece) + -- Sending UNINTERESTED to the peers that doesn't have any more pieces + update_interested_after_receive() + end + end + end +end +-- Update the piece the peer is currently interested in. +-- There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole : +-- If the peer has less than 3 pieces, he chooses a piece at random. +-- If the peer has more than pieces, he downloads the pieces that are the less +-- replicated +function update_current_piece() + if data.pieces_requested >= (common.FILE_PIECES - data.pieces) then + return + end + if data.pieces < 3 or true then + repeat + data.current_piece = math.random(1,common.FILE_PIECES) +-- simgrid.info("The new piece is:" .. data.current_piece) + until data.bitfield[data.current_piece] ~= true and data.current_pieces[data.current_piece] == nil + data.current_pieces[data.current_piece] = true + data.pieces_requested = data.pieces_requested + 1 + end + +end +-- Updates the list of who has a piece from a bitfield +function update_piece_count_from_bitfield(bitfield) + for i,v in pairs(bitfield) do + if v == true then + data.pieces_count[i] = data.pieces_count[i] + 1 + end + end +end +-- Wait for the node to receive interesting bitfield messages (ie: non empty) +function wait_for_pieces() + local finished = false + local now = simgrid.get_clock() + local task + while now < data.deadline and not(finished) do + task = data.comm_received:wait(common.TIMEOUT_MESSAGE) + if task then + handle_message(task) + if data.current_piece ~= -1 then + finished = true + end + end + data.comm_received = simgrid.task.irecv(data.mailbox) + end +end +-- Update the list of current choked and unchoked peers, using the +-- choke algorithm +function update_choked_peers() + data.round = (data.round + 1) % 3 + -- Remove a peer from the list + for i,v in pairs(data.active_peers) do + data.active_peers[i] = nil + send_choked(v.mailbox) + break + end + -- Random optimistic unchoking + if true then + local values = {} + for key, value in pairs(data.peers) do + values[#values + 1] = value + end + local peer_choosed = nil + local j = 0 + + repeat + peer_choosed = values[math.random(#values)] + if peer_choosed.interested ~= true then + peer_choosed = nil + end + j = j + 1 + until peer_choosed ~= nil or j < common.MAXIMUM_PEERS + if peer_choosed ~= nil then + data.active_peers[peer_choosed.id] = peer_choosed + peer_choosed.choked_upload = false + send_unchoked(peer_choosed.mailbox) + end + end + -- TODO: Use the leecher choke algorithm +end +-- Updates our "interested" state about peers: send "not interested" to peers +-- that don't have any more pieces we want. +function update_interested_after_receive() + local interested = false + for i,v in pairs(data.peers) do + if v.am_interested then + for piece,j in pairs(data.current_pieces) do + if v.bitfield ~= nil then + if v.bitfield[piece] == true then + interested = true + break + else + end + end + end + if not(interested) then + v.am_interested = false + send_not_interested(v.mailbox) + end + end + end +end +-- Find the peers that have the current interested piece and send them +-- the "interested" message +function send_interested_to_peers() + if data.current_piece == -1 then + return + end + for i,v in pairs(data.peers) do + if v.bitfield ~= nil then + v.am_interested = true + send_interested(v.mailbox) + end + end + data.current_piece = -1 +end +-- Send a "interested" message to a peer. +function send_interested(mailbox) + simgrid.debug("Sending a INTERESTED to " .. mailbox) + local task = new_task("INTERESTED") + task:dsend(mailbox) +end +-- Send a "not interested" message to a peer. +function send_not_interested(mailbox) + local task = new_task("NOTINTERESTED") + task:dsend(mailbox) +end +-- Send a handshake message to all the peers the peer has +function send_handshake_all() + for i,v in pairs(data.peers) do + local task = new_task("HANDSHAKE") + task:dsend(v.mailbox) + end +end +-- Send a "handshake" message to an user +function send_handshake(mailbox) + simgrid.debug("Sending a HANDSHAKE to " .. mailbox) + local task = new_task("HANDSHAKE") + task:dsend(mailbox) +end +-- Send a "choked" message to a peer +function send_choked(mailbox) + simgrid.debug("Sending a CHOKE to " .. mailbox) + local task = new_task("CHOKE") + task:dsend(mailbox) +end +-- Send a "unchoked" message to a peer +function send_unchoked(mailbox) + simgrid.debug("Sending a UNCHOKE to " .. mailbox) + local task = new_task("UNCHOKE") + task:dsend(mailbox) +end +-- Send a "HAVE" message to all peers we are connected to +function send_have(piece) + for i,v in pairs(data.peers) do + local task = new_task("HAVE") + task.piece = piece + task:dsend(v.mailbox) + end +end +-- Send request messages to a peer that have unchoked us +function send_requests_to_peer(remote_peer) + for i,v in pairs(data.current_pieces) do + send_request(remote_peer.mailbox,i) + end +end +-- Send a bitfield message to a peer +function send_bitfield(mailbox) + simgrid.debug("Sending a BITFIELD to " .. mailbox) + local task = new_task("BITFIELD") + task.bitfield = data.bitfield + task:dsend(mailbox) +end +-- Send a "request" message to a pair, containing a request for a piece +function send_request(mailbox, piece) + simgrid.debug("Sending a REQUEST to " .. mailbox .. " for " .. piece) + local task = new_task("REQUEST") + task.piece = piece + task:dsend(mailbox) +end +-- Send a "piece" messageto a pair, containing a piece of the file +function send_piece(mailbox, piece, stalled) + simgrid.debug("Sending the PIECE " .. piece .. " to " .. mailbox) + local task = new_task("PIECE") + task.piece = piece + task.stalled = stalled + task:dsend(mailbox) +end +function new_task(type) + local task = simgrid.task.new("", 0, common.MESSAGE_SIZE) + task.type = type + task.mailbox = data.mailbox + task.peer_id = data.id + return task +end +function get_status() + local s = "" + for i,v in pairs(data.bitfield) do + if v == true then + s = s .. '1' + else + s = s .. '0' + end + end + return s +end \ No newline at end of file diff --git a/examples/lua/bittorrent/tracker.lua b/examples/lua/bittorrent/tracker.lua new file mode 100644 index 0000000000..e662990178 --- /dev/null +++ b/examples/lua/bittorrent/tracker.lua @@ -0,0 +1,62 @@ +-- A SimGrid Lua implementation of the Bittorrent protocol. + +require("simgrid") + +common_tracker = { + MAXIMUM_PEERS = 50 +} + + + +function tracker(...) + tracker_data = { + peers_list = {}, + deadline = 0, + comm_received = nil + } + -- Check the arguments + local args = {...} + if #args ~= 1 then + simgrid.info("Wrong number of arguments for the tracker") + end + -- Initialization of the random generator + math.randomseed(42) + -- Retrieve the end time + tracker_data.deadline = tonumber(args[1]) + + simgrid.info("Tracker launched") + + local now = simgrid.get_clock() + + tracker_data.comm_received = simgrid.task.irecv("tracker") + while now < tracker_data.deadline do + task, err = tracker_data.comm_received:test() + if task then + simgrid.debug("Received a request from " .. task.mailbox) + tracker_data.comm_received = simgrid.task.irecv("tracker") + -- Sending peers to the peer + local peers = {} + local i = 0 + if #tracker_data.peers_list > 0 then + i = math.random(1,#tracker_data.peers_list) + end + while #peers < #tracker_data.peers_list and #peers < common_tracker.MAXIMUM_PEERS do + table.insert(peers,tracker_data.peers_list[i]) + i = (i + 1) % #tracker_data.peers_list + end + task.type = "ANSWER" + task.peers = peers + -- Add the peer to our peer list + table.insert(tracker_data.peers_list,task.peer_id) + -- Setting the interval + task.interval = TRACKER_QUERY_INTERVAL + -- Sending the task back to the peer + task:dsend(task.mailbox) + else + simgrid.process.sleep(1) + now = simgrid.get_clock() + end + end + + simgrid.info("Tracker is leaving") +end