Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add bittorrent example in Lua
authorSamuel Lepetit <samuel.lepetit@inria.fr>
Fri, 22 Jun 2012 15:01:29 +0000 (17:01 +0200)
committerSamuel Lepetit <samuel.lepetit@inria.fr>
Fri, 22 Jun 2012 15:01:38 +0000 (17:01 +0200)
examples/lua/bittorrent/bittorrent.lua [new file with mode: 0644]
examples/lua/bittorrent/bittorrent.xml [new file with mode: 0644]
examples/lua/bittorrent/peer.lua [new file with mode: 0644]
examples/lua/bittorrent/tracker.lua [new file with mode: 0644]

diff --git a/examples/lua/bittorrent/bittorrent.lua b/examples/lua/bittorrent/bittorrent.lua
new file mode 100644 (file)
index 0000000..26381a7
--- /dev/null
@@ -0,0 +1,10 @@
+-- A SimGrid Lua implementation of the Bittorrent protocol.
+
+require("simgrid")
+       
+require("peer")
+require("tracker")
+
+simgrid.platform(arg[1] or  "../../msg/msg_platform.xml")
+simgrid.application(arg[2] or "bittorrent.xml")
+simgrid.run()
\ No newline at end of file
diff --git a/examples/lua/bittorrent/bittorrent.xml b/examples/lua/bittorrent/bittorrent.xml
new file mode 100644 (file)
index 0000000..58627ee
--- /dev/null
@@ -0,0 +1,40 @@
+<?xml version='1.0'?>
+<!DOCTYPE platform SYSTEM "http://simgrid.gforge.inria.fr/simgrid.dtd">
+<platform version="3">
+
+  <process host="Jacquelin" function="tracker">
+    <argument value="3000" />                  
+  </process>
+
+  <process host="Boivin" function="peer">
+    <argument value="00000002"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+    <argument value="1" />                     <!-- indicates if the peer is a seed at the begining of the simulation -->      
+  </process>
+  <process host="Jean_Yves" function="peer">
+    <argument value="00000003"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+  </process>
+  <process host="TeX" function="peer">
+    <argument value="00000004"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+  </process>
+  <process host="Geoff" function="peer">
+    <argument value="00000005"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+    <argument value="1" />                     <!-- indicates if the peer is a seed at the begining of the simulation -->      
+  </process>
+  <process host="Disney" function="peer">
+    <argument value="00000006"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+  </process>
+  <process host="iRMX" function="peer">
+    <argument value="00000007"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+  </process>
+  <process host="McGee" function="peer">
+    <argument value="00000008"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+  </process>
+
+</platform>
diff --git a/examples/lua/bittorrent/peer.lua b/examples/lua/bittorrent/peer.lua
new file mode 100644 (file)
index 0000000..96a20f3
--- /dev/null
@@ -0,0 +1,494 @@
+-- A SimGrid Lua implementation of the Bittorrent protocol.
+
+require("simgrid")
+-- Common Constants
+common = {
+       FILE_SIZE = 5120,
+       FILE_PIECE_SIZE = 512,
+       FILE_PIECES = 10,
+       
+       PIECE_COMM_SIZE = 1,
+       
+       MESSAGE_SIZE = 1,
+       MAXIMUM_PEERS = 50,
+       
+       TRACKER_QUERY_INTERVAL = 1000,
+       TRACKER_COMM_SIZE = 0.01,
+       
+       GET_PEERS_TIMEOUT = 10000,
+       TIMEOUT_MESSAGE = 10,
+       MAX_UNCHOKED_PEERS = 4,
+       UPDATE_CHOKED_INTERVAL = 50,
+       MAX_PIECES = 1,
+}
+
+
+-- Peer main function
+function peer(...)
+       
+       local args = {...}
+       
+       if #args ~= 2 and #args ~= 3 then
+               simgrid.info("Wrong number of arguments")
+       end
+                       
+       -- Setting the peer data
+       data = {
+                       -- Retrieving the peer id
+                       id = tonumber(args[1]),
+                       mailbox = tostring(tonumber(args[1])),
+                       mailbox_tracker = "tracker" .. args[1],
+                       peers = {},
+                       active_peers = {},
+                       current_pieces = {},
+                       pieces_requested = 0,
+                       bitfield = {},
+                       pieces_count = {},
+                       pieces = 0,
+                       deadline = tonumber(args[2]),
+                       round = 0
+       }
+       simgrid.info("Hi, I'm joining the network with id " .. data.id)
+
+       -- Checking if the peer is a seed
+       local bitfield_value = false
+       if args[3] == "1" then
+               data.pieces = common.PIECES_COUNT
+               bitfield_value = true
+       else
+               data.pieces = 0
+       end
+       -- Building the peer bitfield and the pieces list
+       for i = 1, common.FILE_PIECES do
+               data.pieces_count[i] = 0        
+               data.bitfield[i] = bitfield_value
+       end
+       
+       if get_peers_data() == true then
+               data.comm_received = simgrid.task.irecv(data.mailbox)
+               if has_finished() then
+                       send_handshake_all()
+                       seed_loop()
+               else
+                       leech_loop()
+                       seed_loop()
+               end
+       else
+               simgrid.info("Couldn't contact the tracker")
+       end
+       simgrid.info("My status is now " .. get_status())
+end
+-- Peer main loop when it is leeching
+function leech_loop()
+       simgrid.info("Start downloading.")
+       local now = simgrid.get_clock()
+       local next_choked_update = now + common.UPDATE_CHOKED_INTERVAL;
+       -- Send a "handshake" message to all the peers it got
+       -- it couldn't have gotten more than 50 peers anyway)
+       send_handshake_all()
+       -- Wait for at leaast one bitfield message
+       wait_for_pieces()
+       
+       simgrid.info("Starting main leech loop")
+       local task, err
+       while now < data.deadline and data.pieces < common.FILE_PIECES do
+               task, err = data.comm_received:test()
+               if task then
+                       handle_message(task)
+                       data.comm_received = simgrid.task.irecv(data.mailbox)
+                       now = simgrid.get_clock()
+               elseif err then
+                       data.comm_received = simgrid.task.irecv(data.mailbox)           
+               else
+                       -- If the user has a pending interesting
+                       if data.current_piece ~= -1 then
+                               send_interested_to_peers()
+                       else
+                               if table.getn(data.current_pieces) < common.MAX_PIECES then
+                                       update_current_piece()
+                               end
+                       end
+                       -- We don't execute the choke algorithm if we don't already have a piece
+                       if now >= next_choked_update and data.pieces > 0 then
+                               update_choked_peers()
+                               next_choked_update = next_choked_update + common.UPDATE_CHOKED_INTERVAL
+                               now = simgrid.get_clock()
+                       else
+                               simgrid.process.sleep(1)
+                               now = simgrid.get_clock()
+                       end
+               end             
+       end
+end
+-- Peer main loop when it is seeding
+function seed_loop()
+       local now = simgrid.get_clock()
+       local next_choked_update = now + common.UPDATE_CHOKED_INTERVAL;
+       simgrid.debug("Start seeding.")
+       -- Start the main seed loop
+       while now < data.deadline do
+               task, err = data.comm_received:test()           
+               if task then
+                       handle_message(task)
+                       data.comm_received = simgrid.task.irecv(data.mailbox)           
+                       now = simgrid.get_clock()
+               elseif err then
+                       data.comm_received = simgrid.task.irecv(data.mailbox)           
+               else
+                       if now >= next_choked_update then
+                               update_choked_peers()
+                               next_choked_update = next_choked_update + common.UPDATE_CHOKED_INTERVAL
+                               now = simgrid.get_clock()
+                       else
+                               simgrid.process.sleep(1)
+                               now = simgrid.get_clock()
+                       end             
+               end
+       end
+end
+-- Retrieve the peers list from the tracker
+function get_peers_data() 
+       local success = false
+       local now = simgrid.get_clock()
+       local timeout = now + common.GET_PEERS_TIMEOUT
+       -- Build the task
+       local task_send = simgrid.task.new("", 0, common.MESSAGE_SIZE)
+       task_send.type = "REQUEST"
+       task_send.peer_id = data.id
+       task_send.mailbox = data.mailbox_tracker
+       -- Send the task
+       while not(success) and now < timeout do
+               simgrid.debug("Sending a peer request to the tracker.")
+               if task_send:send("tracker") then
+                       success = true
+               end
+       end
+       now = simgrid.get_clock()
+       success = false
+       -- Wait for the answer
+       local comm_received = simgrid.task.irecv(data.mailbox_tracker)
+       while not(success) and now < timeout do
+               local task_received = comm_received:wait(timeout)
+               comm_received = simgrid.task.irecv(data.mailbox_tracker)
+               if task_received then
+                       simgrid.info("Received an answer from the tracker with " .. #task_received.peers .. " peers inside")
+                       -- Add what we received to our peer list
+                       for i,v in pairs(task_received.peers) do
+                               if v ~= data.id then
+                                       --Add the peer to our list and build its data
+                                       local peer_data = {}
+                                       peer_data.id = v;
+                                       peer_data.bitfield = nil
+                                       peer_data.mailbox = tostring(v);
+                                       peer_data.am_interested = false
+                                       peer_data.interested = false
+                                       peer_data.choked_upload = true
+                                       peer_data.choked_download = true
+                                       data.peers[v] = peer_data
+                               end
+                       end
+               else
+                       success = false
+               end
+               success = true
+       end
+       return success; 
+end
+-- Returns if the peer has finished downloading the piece
+function has_finished() 
+       for i,v in pairs(data.bitfield) do
+               if v == false then
+                       return false
+               end
+       end
+       return true
+end
+-- Handle a received message sent by another peer
+function handle_message(task)
+       local remote_peer = data.peers[task.peer_id]
+       
+       if task.type == "HANDSHAKE" then
+               simgrid.debug("Received a HANDSHAKE message from " .. task.mailbox)
+               -- Check if the peer is in our connection list
+               if data.peers[task.peer_id] == nil then
+                       local peer_data = {}
+                       peer_data.mailbox = task.mailbox
+                       peer_data.id = task.peer_id
+                       peer_data.am_interested = false
+                       peer_data.interested = false
+                       peer_data.choked_upload = true
+                       peer_data.choked_download = true
+                       peer_data.bitfield = nil
+                       data.peers[task.peer_id] = peer_data
+                       send_handshake(task.mailbox)
+               end
+               -- Send our bitfield to the peer
+               send_bitfield(task.mailbox)
+       elseif task.type == "BITFIELD" then
+               simgrid.debug("Received a BITFIELD from " .. task.mailbox)
+               -- Update the pieces list
+               update_piece_count_from_bitfield(task.bitfield)
+               -- Update the current piece
+               if data.current_piece == -1 and data.pieces < common.FILE_PIECES then
+                       update_current_piece()
+               end
+               data.peers[task.peer_id].bitfield = task.bitfield
+       elseif task.type == "INTERESTED" then
+               simgrid.debug("Received an INTERESTED message from " .. task.mailbox)
+               data.peers[task.peer_id].interested = true
+       elseif task.type == "NOTINTERESTED" then
+               simgrid.debug("Received an NOTINTERESTED message from " .. task.mailbox)
+               data.peers[task.peer_id].interested = false
+       elseif task.type == "UNCHOKE" then
+               simgrid.debug("Received an UNCHOKE message from " .. task.mailbox)
+               data.peers[task.peer_id].choked_download = false
+               send_requests_to_peer(data.peers[task.peer_id])
+       elseif task.type == "CHOKE" then
+               simgrid.debug("Recevied a CHOKE message from " .. task.mailbox)
+               data.peers[task.peer_id].choked_download = true         
+       elseif task.type == "HAVE" then
+               local remote_peer = data.peers[task.peer_id] 
+               if remote_peer == nil or remote_peer.bitfield == nil then
+                       return
+               end
+               simgrid.debug("Received a HAVE message from " .. task.mailbox)
+               data.pieces_count[task.piece] = data.pieces_count[task.piece] + 1
+               -- Send interested message to the peer if he has what we want
+               if not(remote_peer.am_interested) and data.current_pieces[task.piece] ~= nil then
+                       remote_peer.am_interested = true
+                       send_interested(remote_peer.mailbox)
+               end
+               if data.current_pieces[task.piece] ~= nil then
+                       send_request(task.mailbox,task.piece)
+               end
+       elseif task.type == "REQUEST" then
+               simgrid.debug("Received REQUEST from " .. task.mailbox .. " for " .. task.piece)
+               local remote_peer = data.peers[task.peer_id] 
+               if remote_peer.choked_upload == false then
+                       if data.bitfield[task.piece] == true then
+                               send_piece(task.mailbox,task.piece,false)
+                       end
+               end
+       elseif task.type == "PIECE" then
+               if task.stalled == true then
+                       simgrid.debug("The received piece is stalled")
+               else
+                       simgrid.debug("Received piece " .. task.piece .. " from " .. task.mailbox)
+                       if data.bitfield[task.piece] ~= true then
+                               data.pieces_requested = data.pieces_requested - 1
+                               -- Removing the piece from our piece list
+                               data.current_pieces[task.piece] = nil
+                               data.bitfield[task.piece] = true
+                               data.pieces = data.pieces + 1
+                               simgrid.debug("My status is now:" .. get_status())
+                               -- Sending the information to all the peers we are connected to
+                               send_have(task.piece)
+                               -- Sending UNINTERESTED to the peers that doesn't have any more pieces
+                               update_interested_after_receive()
+                       end
+               end
+       end
+end
+-- Update the piece the peer is currently interested in.
+-- There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
+-- If the peer has less than 3 pieces, he chooses a piece at random.
+-- If the peer has more than pieces, he downloads the pieces that are the less
+-- replicated
+function update_current_piece() 
+       if data.pieces_requested >= (common.FILE_PIECES - data.pieces) then
+               return
+       end
+       if data.pieces < 3 or true then
+               repeat
+                       data.current_piece = math.random(1,common.FILE_PIECES)
+--                     simgrid.info("The new piece is:" .. data.current_piece)
+               until data.bitfield[data.current_piece] ~= true and data.current_pieces[data.current_piece] == nil
+               data.current_pieces[data.current_piece] = true
+               data.pieces_requested = data.pieces_requested + 1
+       end             
+       
+end
+-- Updates the list of who has a piece from a bitfield
+function update_piece_count_from_bitfield(bitfield)
+       for i,v in pairs(bitfield) do
+               if v == true then       
+                       data.pieces_count[i] = data.pieces_count[i] + 1
+               end
+       end
+end
+-- Wait for the node to receive interesting bitfield messages (ie: non empty)
+function wait_for_pieces() 
+       local finished = false
+       local now = simgrid.get_clock()
+       local task
+       while now < data.deadline and not(finished) do
+               task = data.comm_received:wait(common.TIMEOUT_MESSAGE)
+               if task then
+                       handle_message(task)
+                       if data.current_piece ~= -1 then
+                               finished = true
+                       end     
+               end
+               data.comm_received = simgrid.task.irecv(data.mailbox)           
+       end
+end
+-- Update the list of current choked and unchoked peers, using the
+-- choke algorithm
+function update_choked_peers()
+       data.round = (data.round + 1) % 3
+       -- Remove a peer from the list
+       for i,v in pairs(data.active_peers) do
+               data.active_peers[i] = nil
+               send_choked(v.mailbox)
+               break
+       end
+       -- Random optimistic unchoking
+       if true then
+               local values = {}
+               for key, value in pairs(data.peers) do
+                       values[#values + 1] = value
+               end
+               local peer_choosed = nil
+               local j = 0
+
+               repeat
+                       peer_choosed = values[math.random(#values)]             
+                       if peer_choosed.interested ~= true then
+                               peer_choosed = nil
+                       end
+                       j = j + 1
+               until peer_choosed ~= nil or j < common.MAXIMUM_PEERS
+               if peer_choosed ~= nil then
+                       data.active_peers[peer_choosed.id] = peer_choosed
+                       peer_choosed.choked_upload = false
+                       send_unchoked(peer_choosed.mailbox)
+               end
+       end
+       -- TODO: Use the leecher choke algorithm
+end
+--  Updates our "interested" state about peers: send "not interested" to peers
+--  that don't have any more pieces we want.
+function update_interested_after_receive()
+       local interested = false
+       for i,v in pairs(data.peers) do
+               if v.am_interested then
+                       for piece,j in pairs(data.current_pieces) do
+                               if v.bitfield ~= nil then
+                                       if v.bitfield[piece] == true then
+                                               interested = true
+                                               break
+                                       else
+                                       end
+                               end
+                       end
+                       if not(interested) then
+                               v.am_interested = false
+                               send_not_interested(v.mailbox)
+                       end
+               end
+       end
+end
+-- Find the peers that have the current interested piece and send them
+-- the "interested" message
+function send_interested_to_peers()
+       if data.current_piece == -1 then
+               return
+       end
+       for i,v in pairs(data.peers) do
+               if v.bitfield ~= nil then
+                       v.am_interested = true
+                       send_interested(v.mailbox)
+               end
+       end
+       data.current_piece = -1
+end 
+-- Send a "interested" message to a peer.
+function send_interested(mailbox)
+       simgrid.debug("Sending a INTERESTED to " .. mailbox)
+       local task = new_task("INTERESTED")
+       task:dsend(mailbox)
+end
+-- Send a "not interested" message to a peer.
+function send_not_interested(mailbox)
+       local task = new_task("NOTINTERESTED")
+       task:dsend(mailbox)
+end
+-- Send a handshake message to all the peers the peer has
+function send_handshake_all()
+       for i,v in pairs(data.peers) do
+               local task = new_task("HANDSHAKE")
+               task:dsend(v.mailbox)
+       end
+end
+-- Send a "handshake" message to an user
+function send_handshake(mailbox)
+       simgrid.debug("Sending a HANDSHAKE to " .. mailbox)
+       local task = new_task("HANDSHAKE")
+       task:dsend(mailbox)             
+end
+-- Send a "choked" message to a peer
+function send_choked(mailbox)
+       simgrid.debug("Sending a CHOKE to " .. mailbox)
+       local task = new_task("CHOKE")
+       task:dsend(mailbox)     
+end
+-- Send a "unchoked" message to a peer
+function send_unchoked(mailbox)
+       simgrid.debug("Sending a UNCHOKE to " .. mailbox)
+       local task = new_task("UNCHOKE")
+       task:dsend(mailbox)     
+end
+-- Send a "HAVE" message to all peers we are connected to
+function send_have(piece)
+       for i,v in pairs(data.peers) do
+               local task = new_task("HAVE")
+               task.piece = piece
+               task:dsend(v.mailbox)
+       end
+end
+-- Send request messages to a peer that have unchoked us       
+function send_requests_to_peer(remote_peer)
+       for i,v in pairs(data.current_pieces) do
+               send_request(remote_peer.mailbox,i)
+       end
+end
+-- Send a bitfield message to a peer
+function send_bitfield(mailbox)
+       simgrid.debug("Sending a BITFIELD to " .. mailbox)
+       local task = new_task("BITFIELD")
+       task.bitfield = data.bitfield
+       task:dsend(mailbox)
+end
+-- Send a "request" message to a pair, containing a request for a piece
+function send_request(mailbox, piece)
+       simgrid.debug("Sending a REQUEST to " .. mailbox .. " for " .. piece)
+       local task =  new_task("REQUEST")
+       task.piece = piece
+       task:dsend(mailbox)
+end    
+-- Send a "piece" messageto a pair, containing a piece of the file
+function send_piece(mailbox, piece, stalled)
+       simgrid.debug("Sending the PIECE " .. piece .. " to " .. mailbox)
+       local task = new_task("PIECE")
+       task.piece = piece
+       task.stalled = stalled
+       task:dsend(mailbox)     
+end
+function new_task(type)
+       local task = simgrid.task.new("", 0, common.MESSAGE_SIZE)
+       task.type = type
+       task.mailbox = data.mailbox
+       task.peer_id = data.id  
+       return task
+end
+function get_status()
+       local s = ""
+       for i,v in pairs(data.bitfield) do
+               if v == true then
+                        s = s .. '1'
+               else
+                       s = s .. '0'
+               end
+       end
+       return s
+end
\ No newline at end of file
diff --git a/examples/lua/bittorrent/tracker.lua b/examples/lua/bittorrent/tracker.lua
new file mode 100644 (file)
index 0000000..e662990
--- /dev/null
@@ -0,0 +1,62 @@
+-- A SimGrid Lua implementation of the Bittorrent protocol.
+
+require("simgrid")
+
+common_tracker = {
+       MAXIMUM_PEERS = 50
+}
+
+
+
+function tracker(...)
+       tracker_data = {
+               peers_list = {},
+               deadline = 0,
+               comm_received = nil
+       }
+  -- Check the arguments
+  local args = {...}
+  if #args ~= 1 then
+       simgrid.info("Wrong number of arguments for the tracker")
+  end
+  -- Initialization of the random generator
+  math.randomseed(42)
+  -- Retrieve the end time
+  tracker_data.deadline = tonumber(args[1])
+  
+  simgrid.info("Tracker launched")
+  
+  local now = simgrid.get_clock()
+  
+  tracker_data.comm_received = simgrid.task.irecv("tracker")
+  while now < tracker_data.deadline do
+       task, err = tracker_data.comm_received:test()
+       if task then
+               simgrid.debug("Received a request from " .. task.mailbox)
+               tracker_data.comm_received = simgrid.task.irecv("tracker")
+               -- Sending peers to the peer
+               local peers = {}
+               local i = 0 
+               if #tracker_data.peers_list > 0 then
+                       i = math.random(1,#tracker_data.peers_list)
+               end
+               while #peers < #tracker_data.peers_list and #peers < common_tracker.MAXIMUM_PEERS do
+                       table.insert(peers,tracker_data.peers_list[i])  
+                       i = (i + 1) % #tracker_data.peers_list
+               end
+               task.type = "ANSWER"
+               task.peers = peers
+               -- Add the peer to our peer list
+               table.insert(tracker_data.peers_list,task.peer_id)
+               -- Setting the interval
+               task.interval = TRACKER_QUERY_INTERVAL
+               -- Sending the task back to the peer
+               task:dsend(task.mailbox)
+       else
+               simgrid.process.sleep(1)
+               now = simgrid.get_clock()
+       end
+  end
+  
+  simgrid.info("Tracker is leaving")
+end