Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update bittorrent example
authorSamuel Lepetit <samuel.lepetit@inria.fr>
Mon, 2 Jul 2012 09:22:04 +0000 (11:22 +0200)
committerSamuel Lepetit <samuel.lepetit@inria.fr>
Mon, 2 Jul 2012 09:22:04 +0000 (11:22 +0200)
examples/bittorrent/Common.java
examples/bittorrent/MessageTask.java
examples/bittorrent/Peer.java

index 8e7e4ca..51cb01d 100644 (file)
@@ -10,6 +10,8 @@ public class Common {
        public static int FILE_PIECES = 10;
        public static int PIECES_BLOCKS = 5;
        
+       public static int BLOCKS_REQUESTED = 2;
+       
        public static int PIECE_COMM_SIZE = 1;
        /**
         * Information message size
index 558515f..dd02eeb 100644 (file)
@@ -22,6 +22,8 @@ public class MessageTask extends Task {
        public int peerId;
        public char bitfield[];
        public int index;
+       public int blockIndex;
+       public int blockLength;
        public boolean stalled;
        /**
         * Constructor, builds a value-less message
@@ -31,10 +33,7 @@ public class MessageTask extends Task {
         * @param peerId
         */
        public MessageTask(Type type, String issuerHostname, String mailbox, int peerId) {
-               this.type = type;
-               this.issuerHostname = issuerHostname;
-               this.mailbox = mailbox;
-               this.peerId = peerId;
+               this(type,issuerHostname,mailbox,peerId,-1,false,-1,-1);
        }
        /**
         * Constructor, builds a new "have/request/piece" message
@@ -45,42 +44,32 @@ public class MessageTask extends Task {
         * @param index
         */
        public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index) {
-               this.type = type;
-               this.issuerHostname = issuerHostname;
-               this.mailbox = mailbox;
-               this.peerId = peerId;
-               this.index = index;
+               this(type,issuerHostname,mailbox,peerId,index,false,-1,-1);
        }
        /**
         * Constructor, builds a new bitfield message
-        * @param type
-        * @param issuerHostname
-        * @param mailbox
-        * @param peerId
-        * @param bitfield
         */
        public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, char bitfield[]) {
-               this.type = type;
-               this.issuerHostname = issuerHostname;
-               this.mailbox = mailbox;
-               this.peerId = peerId;
+               this(type,issuerHostname,mailbox,peerId,-1,false,-1,-1);
                this.bitfield = bitfield;
        }
+       /**
+        * Constructor, build a new "request"  message
+        */
+       public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index, int blockIndex, int blockLength) {
+               this(type,issuerHostname,mailbox,peerId,index,false,blockIndex,blockLength);
+       }
        /**
         * Constructor, build a new "piece" message
-        * @param type
-        * @param issuerHostname
-        * @param mailbox
-        * @param peerId
-        * @param index
-        * @param stalled
         */
-       public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index, boolean stalled) {
+       public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index, boolean stalled, int blockIndex, int blockLength) {
                this.type = type;
                this.issuerHostname = issuerHostname;
                this.mailbox = mailbox;
                this.peerId = peerId;
                this.index = index;
                this.stalled = stalled;
+               this.blockIndex = blockIndex;
+               this.blockLength = blockLength;
        }       
 }
index 08728b4..68c694e 100644 (file)
@@ -29,7 +29,7 @@ public class Peer extends Process {
        protected String hostname;
        protected int pieces = 0;
        protected char[] bitfield = new char[Common.FILE_PIECES];
-       protected char[][] bitfield_blocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
+       protected char[][] bitfieldBlocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
        
        protected short[] piecesCount = new short[Common.FILE_PIECES];
        
@@ -180,11 +180,17 @@ public class Peer extends Process {
                if (seed) {
                        for (int i = 0; i < bitfield.length; i++) {
                                bitfield[i] = '1';
+                               for (int j = 0; j < bitfieldBlocks[i].length; j++) {
+                                       bitfieldBlocks[i][j] = '1';
+                               }
                        }
                }
                else {
                        for (int i = 0; i < bitfield.length; i++) {
                                bitfield[i] = '0';
+                               for (int j = 0; j < bitfieldBlocks[i].length; j++) {
+                                       bitfieldBlocks[i][j] = '0'      ;
+                               }
                        }                       
                }
                this.hostname = host.getName();
@@ -298,7 +304,10 @@ public class Peer extends Process {
                                }
                                
                                if (currentPieces.contains(message.index)) {
-                                       sendRequest(message.mailbox,message.index);
+                                       int blockIndex = getFirstBlock(message.index);                  
+                                       int blockLength = Common.PIECES_BLOCKS - blockIndex ;
+                                       blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;            
+                                       sendRequest(message.mailbox,message.index,blockIndex,blockLength);
                                }
                        break;
                        case REQUEST:
@@ -306,7 +315,7 @@ public class Peer extends Process {
                                if (!remotePeer.chokedUpload) {
                                        Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
                                        if (bitfield[message.index] == '1') {
-                                               sendPiece(message.mailbox,message.index,false); 
+                                               sendPiece(message.mailbox,message.index,false,message.blockIndex,message.blockLength);  
                                        }
                                        else {
                                                Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
@@ -315,23 +324,26 @@ public class Peer extends Process {
                        break;
                        case PIECE:
                                if (message.stalled) {
-                                       Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
+                                       Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ") is stalled");
                                }
                                else {
                                        Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
                                        if (bitfield[message.index] == '0') {
-                                               piecesRequested--;
-                                               //Removing the piece from our piece list.
-                                               if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
+                                               updateBitfieldBlocks(message.index,message.blockIndex,message.blockLength);
+                                               if (pieceComplete(message.index)) {
+                                                       piecesRequested--;
+                                                       //Removing the piece from our piece list.
+                                                       if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
+                                                       }
+                                                       //Setting the fact that we have the piece
+                                                       bitfield[message.index] = '1';
+                                                       pieces++;
+                                                       Msg.debug("My status is now " + getStatus());
+                                                       //Sending the information to all the peers we are connected to
+                                                       sendHave(message.index);
+                                                       //sending UNINTERESTED to peers that doesn't have what we want.
+                                                       updateInterestedAfterReceive();
                                                }
-                                               //Setting the fact that we have the piece
-                                               bitfield[message.index] = '1';
-                                               pieces++;
-                                               Msg.debug("My status is now " + getStatus());
-                                               //Sending the information to all the peers we are connected to
-                                               sendHave(message.index);
-                                               //sending UNINTERESTED to peers that doesn't have what we want.
-                                               updateInterestedAfterReceive();
                                        }
                                        else {
                                                Msg.debug("However, we already have it.");
@@ -477,14 +489,52 @@ Common.MAXIMUM_PEERS);
                        }
                }
        }
+       private void updateBitfieldBlocks(int index, int blockIndex, int blockLength) {
+               for (int i = blockIndex; i < (blockIndex + blockLength); i++) {
+                       bitfieldBlocks[index][i] = '1';
+               }
+       }
+       /**
+        * Returns if a piece is complete in the peer's bitfield.
+        * @param index the index of the piece.
+        */
+       private boolean pieceComplete(int index) {
+               for (int i = 0; i < bitfieldBlocks[index].length; i++) {
+                       if (bitfieldBlocks[index][i] == '0') {
+                               return false;
+                       }
+               }
+               return true;
+       }
+       /**
+        * Returns the first block of a piece that we don't have. 
+        */
+       private int getFirstBlock(int piece) {
+               int blockIndex = -1;
+               for (int i = 0; i < Common.PIECES_BLOCKS; i++) {
+                       if (bitfieldBlocks[piece][i] == '0') {
+                               blockIndex = i;
+                               break;
+                       }
+               }       
+               return blockIndex;
+       }
        /**
         * Send request messages to a peer that have unchoked us
         * @param remotePeer peer data to the peer we want to send the request
         */
        private void sendRequestsToPeer(Connection remotePeer) {
+               if (remotePeer.bitfield == null) {
+                       return;
+               }
                for (Integer piece : currentPieces) {
-                       if (remotePeer.bitfield != null && remotePeer.bitfield[piece] == '1') {
-                               sendRequest(remotePeer.mailbox, piece);
+                       //Getting the block to send.    
+                       int blockIndex = -1, blockLength = 0;
+                       blockIndex = getFirstBlock(piece);                      
+                       blockLength = Common.PIECES_BLOCKS - blockIndex ;
+                       blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;            
+                       if (remotePeer.bitfield[piece] == '1') {
+                               sendRequest(remotePeer.mailbox, piece, blockIndex, blockLength);
                        }                       
                }
        }       
@@ -579,17 +629,17 @@ Common.MAXIMUM_PEERS);
        /**
         * Send a "request" message to a pair, containing a request for a piece
         */
-       private void sendRequest(String mailbox, int piece) {
-               Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece);
-               MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece);
+       private void sendRequest(String mailbox, int piece, int blockIndex, int blockLength) {
+               Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece + " and blocks " + blockIndex + "," + (blockIndex + blockLength));
+               MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece, blockIndex, blockLength);
                task.dsend(mailbox);
        }
        /**
         * Send a "piece" message to a pair, containing a piece of the file
         */
-       private void sendPiece(String mailbox, int piece, boolean stalled) {
+       private void sendPiece(String mailbox, int piece, boolean stalled, int blockIndex, int blockLength) {
                Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
-               MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled);
+               MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled, blockIndex, blockLength);
                task.dsend(mailbox);
        }