Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge pull request #2 from mquinson/master
[simgrid.git] / examples / java / bittorrent / Peer.java
index 32c2d30..53fe246 100644 (file)
@@ -1,9 +1,9 @@
-/*
- * Copyright 2006-2012. The SimGrid Team. All rights reserved. 
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. 
- */
+/* Copyright (c) 2006-2014, 2016. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
 package bittorrent;
 
 import java.util.ArrayList;
@@ -11,680 +11,621 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
+import org.simgrid.msg.Msg;
 import org.simgrid.msg.Comm;
 import org.simgrid.msg.Host;
-import org.simgrid.msg.Msg;
-import org.simgrid.msg.MsgException;
+import org.simgrid.msg.Task;
 import org.simgrid.msg.Process;
 import org.simgrid.msg.RngStream;
-import org.simgrid.msg.Task;
+import org.simgrid.msg.MsgException;
 
-/**
- * Main class for peers execution
- */
 public class Peer extends Process {
-       protected int round = 0;
-       
-       protected double beginReceiveTime;
-       protected double deadline;
-       
-       protected static RngStream stream = new RngStream();
-       
-       protected int id;
-       protected String mailbox;
-       protected String mailboxTracker;
-       protected String hostname;
-       protected int pieces = 0;
-       protected char[] bitfield = new char[Common.FILE_PIECES];
-       protected char[][] bitfieldBlocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
-       
-       protected short[] piecesCount = new short[Common.FILE_PIECES];
-       
-       protected int piecesRequested = 0;
-       
-       protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
-       protected int currentPiece = -1;
-
-       protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();        
-       protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
-       
-       protected Comm commReceived = null;
-
-       public Peer(Host host, String name, String[]args) {
-               super(host,name,args);
-       }       
-       
-       @Override
-       public void main(String[] args) throws MsgException {
-               //Check arguments
-               if (args.length != 3 && args.length != 2) {
-                       Msg.info("Wrong number of arguments");
-               }
-               if (args.length == 3) {
-                       init(Integer.valueOf(args[0]),true);
-               }
-               else {
-                       init(Integer.valueOf(args[0]),false);
-               }
-               //Retrieve the deadline
-               deadline = Double.valueOf(args[1]);
-               if (deadline < 0) {
-                       Msg.info("Wrong deadline supplied");
-                       return;
-               }
-               Msg.info("Hi, I'm joining the network with id " + id);
-               //Getting peer data from the tracker
-               if (getPeersData()) {
-                       Msg.debug("Got " + peers.size() + " peers from the tracker");
-                       Msg.debug("Here is my current status: " + getStatus());
-                       beginReceiveTime = Msg.getClock();                      
-                       if (hasFinished()) {
-                               pieces = Common.FILE_PIECES;
-                               sendHandshakeAll();
-                               seedLoop();
-                       }
-                       else {
-                               leechLoop();
-                               seedLoop();
-                       }
-               }
-               else {
-                       Msg.info("Couldn't contact the tracker.");
-               }
-               Msg.info("Here is my current status: " + getStatus());
-       }
-       /**
-        * Peer main loop when it is leeching.
-        */
-       private void leechLoop() {
-               double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
-               Msg.debug("Start downloading.");
-               /**
-                * Send a "handshake" message to all the peers it got
-                * (it couldn't have gotten more than 50 peers anyway)
-                */
-               sendHandshakeAll();
-               //Wait for at least one "bitfield" message.
-               waitForPieces();
-               Msg.debug("Starting main leech loop");
-               while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
-                       if (commReceived == null) {
-                               commReceived = Task.irecv(mailbox);
-                       }
-                       try {
-                               if (commReceived.test()) {
-                                       handleMessage(commReceived.getTask());
-                                       commReceived = null;
-                               }
-                               else {
-                                       //If the user has a pending interesting
-                                       if (currentPiece != -1) {
-                                               sendInterestedToPeers();
-                                       }
-                                       else {
-                                               if (currentPieces.size() < Common.MAX_PIECES) {
-                                                       updateCurrentPiece();
-                                               }
-                                       }
-                                       //We don't execute the choke algorithm if we don't already have a piece
-                                       if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
-                                               updateChokedPeers();
-                                               nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
-                                       }
-                                       else {
-                                               waitFor(1);
-                                       }
-                               }
-                       }
-                       catch (MsgException e) {
-                               commReceived = null;                            
-                       }
-               }
-       }
-       
-       /**
-        * Peer main loop when it is seeding
-        */
-       private void seedLoop() {
-               double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
-               Msg.debug("Start seeding.");
-               //start the main seed loop
-               while (Msg.getClock() < deadline) {
-                       if (commReceived == null) {
-                               commReceived = Task.irecv(mailbox);
-                       }
-                       try {
-                               if (commReceived.test()) {
-                                       handleMessage(commReceived.getTask());
-                                       commReceived = null;
-                               }
-                               else {
-                                       if (Msg.getClock() >= nextChokedUpdate) {
-                                               updateChokedPeers();
-                                               //TODO: Change the choked peer algorithm when seeding
-                                               nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
-                                       }
-                                       else {
-                                               waitFor(1);
-                                       }
-                               }
-                       }
-                       catch (MsgException e) {
-                               commReceived = null;                            
-                       }
-
-               }
-       }
-       
-       /**
-        * Initialize the various peer data
-        * @param id id of the peer to take in the network
-        * @param seed indicates if the peer is a seed
-        */
-       private void init(int id, boolean seed) {
-               this.id = id;
-               this.mailbox = Integer.toString(id);
-               this.mailboxTracker = "tracker_" + Integer.toString(id);
-               if (seed) {
-                       for (int i = 0; i < bitfield.length; i++) {
-                               bitfield[i] = '1';
-                               for (int j = 0; j < bitfieldBlocks[i].length; j++) {
-                                       bitfieldBlocks[i][j] = '1';
-                               }
-                       }
-               }
-               else {
-                       for (int i = 0; i < bitfield.length; i++) {
-                               bitfield[i] = '0';
-                               for (int j = 0; j < bitfieldBlocks[i].length; j++) {
-                                       bitfieldBlocks[i][j] = '0'      ;
-                               }
-                       }                       
-               }
-               this.hostname = host.getName();
-       }
-       /**
-        * Retrieves the peer list from the tracker
-        */
-       private boolean getPeersData() {
-               
-               boolean success = false, sendSuccess = false;
-               double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
-               //Build the task to send to the tracker
-               TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
-                       
-               while (!sendSuccess && Msg.getClock() < timeout) {
-                       try {
-                               Msg.debug("Sending a peer request to the tracker.");
-                               taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
-                               sendSuccess = true;
-                       }
-                       catch (MsgException e) {
-                               
-                       }
-               }
-               while (!success && Msg.getClock() < timeout) {
-                       commReceived = Task.irecv(this.mailboxTracker);
-                       try {
-                               commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
-                               if (commReceived.getTask() instanceof TrackerTask) {
-                                       TrackerTask task = (TrackerTask)commReceived.getTask();
-                                       for (Integer peerId: task.peers) {
-                                               if (peerId != this.id) {
-                                                       peers.put(peerId, new Connection(peerId));
-                                               }       
-                                       }
-                                       success = true;
-                               }
-                       }
-                       catch (MsgException e) {
-                               
-                       }
-                       commReceived = null;
-               }
-               commReceived = null;
-               return success;
-       }
-       /**
-        * Handle a received message sent by another peer
-        * @param task task received.
-        */
-       void handleMessage(Task task) {
-               MessageTask message = (MessageTask)task;
-               Connection remotePeer = peers.get(message.peerId);
-               switch (message.type) {
-                       case HANDSHAKE:
-                               Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
-                               //Check if the peer is in our connection list
-                               if (remotePeer == null) {
-                                       peers.put(message.peerId, new Connection(message.peerId));
-                                       sendHandshake(message.mailbox);
-                               }
-                               //Send our bitfield to the pair
-                               sendBitfield(message.mailbox);
-                       break;
-                       case BITFIELD:
-                               Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
-                               //update the pieces list
-                               updatePiecesCountFromBitfield(message.bitfield);
-                               //Update the current piece
-                               if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
-                                       updateCurrentPiece();
-                               }                               
-                               remotePeer.bitfield  = message.bitfield.clone();
-                       break;
-                       case INTERESTED:
-                               Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
-                               assert remotePeer != null;
-                               remotePeer.interested = true;
-                       break;
-                       case NOTINTERESTED:
-                               Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
-                               assert remotePeer != null;
-                               remotePeer.interested = false;
-                       break;
-                       case UNCHOKE:
-                               Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
-                               assert remotePeer != null;
-                               remotePeer.chokedDownload = false;
-                               activePeers.put(remotePeer.id,remotePeer);
-                               sendRequestsToPeer(remotePeer);
-                       break;
-                       case CHOKE:
-                               Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
-                               assert remotePeer != null;
-                               remotePeer.chokedDownload = true;
-                               activePeers.remove(remotePeer.id);
-                       break;
-                       case HAVE:
-                               if (remotePeer.bitfield == null) {
-                                       return;
-                               }
-                               Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
-                               assert message.index >= 0 && message.index < Common.FILE_PIECES;
-                               assert remotePeer.bitfield != null;
-                               remotePeer.bitfield[message.index] = '1';
-                               piecesCount[message.index]++; 
-                               //Send interested message to the peer if he has what we want
-                               if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
-                                       remotePeer.amInterested = true;
-                                       sendInterested(remotePeer.mailbox);
-                               }
-                               
-                               if (currentPieces.contains(message.index)) {
-                                       int blockIndex = getFirstBlock(message.index);                  
-                                       int blockLength = Common.PIECES_BLOCKS - blockIndex ;
-                                       blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;            
-                                       sendRequest(message.mailbox,message.index,blockIndex,blockLength);
-                               }
-                       break;
-                       case REQUEST:
-                               assert message.index >= 0 && message.index < Common.FILE_PIECES;
-                               if (!remotePeer.chokedUpload) {
-                                       Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
-                                       if (bitfield[message.index] == '1') {
-                                               sendPiece(message.mailbox,message.index,false,message.blockIndex,message.blockLength);  
-                                       }
-                                       else {
-                                               Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
-                                       }
-                               }
-                       break;
-                       case PIECE:
-                               if (message.stalled) {
-                                       Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ") is stalled");
-                               }
-                               else {
-                                       Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
-                                       if (bitfield[message.index] == '0') {
-                                               updateBitfieldBlocks(message.index,message.blockIndex,message.blockLength);
-                                               if (pieceComplete(message.index)) {
-                                                       piecesRequested--;
-                                                       //Removing the piece from our piece list.
-                                                       if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
-                                                       }
-                                                       //Setting the fact that we have the piece
-                                                       bitfield[message.index] = '1';
-                                                       pieces++;
-                                                       Msg.debug("My status is now " + getStatus());
-                                                       //Sending the information to all the peers we are connected to
-                                                       sendHave(message.index);
-                                                       //sending UNINTERESTED to peers that doesn't have what we want.
-                                                       updateInterestedAfterReceive();
-                                               }
-                                       }
-                                       else {
-                                               Msg.debug("However, we already have it.");
-                                       }
-                               }
-                       break;
-               }
-               if (remotePeer != null) {
-                       remotePeer.addSpeedValue(1 / (Msg.getClock() - beginReceiveTime));
-               }
-               beginReceiveTime = Msg.getClock();
-       }
-       /**
-        * Wait for the node to receive interesting bitfield messages (ie: non empty)
-        * to be received
-        */
-       void waitForPieces() {
-               boolean finished = false;
-               while (Msg.getClock() < deadline && !finished) {
-                       if (commReceived == null) {
-                               commReceived = Task.irecv(mailbox);
-                       }
-                       try {
-                               commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
-                               handleMessage(commReceived.getTask());
-                               if (currentPiece != -1) {
-                                       finished = true;
-                               }
-                               commReceived = null;
-                       }
-                       catch (MsgException e) {
-                               commReceived = null;
-                       }
-               }
-       }
-       
-       private boolean hasFinished() {
-               for (int i = 0; i < bitfield.length; i++) {
-                       if (bitfield[i] == '1') {
-                               return true;
-                       }
-               }
-               return false;
-       }
-       /**
-        * Updates the list of who has a piece from a bitfield
-        * @param bitfield bitfield
-        */
-       private void updatePiecesCountFromBitfield(char bitfield[]) {
-               for (int i = 0; i < Common.FILE_PIECES; i++) {
-                       if (bitfield[i] == '1') {
-                               piecesCount[i]++;
-                       }
-               }
-       }
-       /**
-        * Update the piece the peer is currently interested in.
-        * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
-        * If the peer has less than 3 pieces, he chooses a piece at random.
-        * If the peer has more than pieces, he downloads the pieces that are the less
-        * replicated
-        */
-       void updateCurrentPiece() {
-               if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
-                       return;
-               }
-               if (true || pieces < 3) {
-                       int i = 0, peerPiece;
-                       do {
-                               currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
-                               i++;
-                       } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
-               }
-               else {
-                       //trivial min algorithm.
-                       //TODO
-               }
-               currentPieces.add(currentPiece);
-               Msg.debug("New interested piece: " + currentPiece);
-               assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
-       }
-       /**
-        * Update the list of current choked and unchoked peers, using the
-        * choke algorithm
-        */
-       private void updateChokedPeers() {
-               round = (round + 1) % 3;
-               if (peers.size() == 0) {
-                       return;
-               }
-               //remove a peer from the list
-               Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
-               if (it.hasNext()) {
-                       Entry<Integer,Connection> e = it.next();
-                       Connection peerChoked = e.getValue();
-                       peerChoked.chokedUpload = true;
-                       sendChoked(peerChoked.mailbox);
-                       activePeers.remove(e.getKey());
-               }
-               Connection peerChoosed = null;
-               //Separate the case from when the peer is seeding.
-               if (pieces == Common.FILE_PIECES) {
-                       //Find the last unchoked peer.
-                       double unchokeTime = deadline + 1;
-                       for (Connection connection : peers.values()) {
-                               if (connection.lastUnchoke < unchokeTime && connection.interested) {
-                                       peerChoosed = connection;
-                                       unchokeTime = connection.lastUnchoke;
-                               }
-                       }
-               }
-               else {
-                       //Random optimistic unchoking
-                       if (round == 0) {
-                               int j = 0, i;
-                               do {
-                                       i = 0;
-                                       int idChosen = stream.randInt(0,peers.size() - 1);
-                                       for (Connection connection : peers.values()) {
-                                               if (i == idChosen) {
-                                                       peerChoosed = connection;
-                                                       break;
-                                               }
-                                               i++;
-                                       } //TODO: Not really the best way ever
-                                       if (!peerChoosed.interested) {
-                                               peerChoosed = null;
-                                       }
-                                       j++;
-                               } while (peerChoosed == null && j < 
-       Common.MAXIMUM_PEERS);
-                       }
-                       else {
-                               Connection fastest = null;
-                               double fastestSpeed = 0;
-                               for (Connection c : peers.values()) {
-                                       if (c.peerSpeed > fastestSpeed && c.interested && c.chokedUpload) {
-                                               fastest = c;
-                                               fastestSpeed = c.peerSpeed;
-                                       }
-                               }
-                               peerChoosed = fastest;
-                       }
-               }
-               if (peerChoosed != null) {
-                       activePeers.put(peerChoosed.id,peerChoosed);
-                       peerChoosed.chokedUpload = false;
-                       peerChoosed.lastUnchoke = Msg.getClock();
-                       sendUnchoked(peerChoosed.mailbox);
-               }
-       }
-       /**     
-        * Updates our "interested" state about peers: send "not interested" to peers
-        * that don't have any more pieces we want.
-        */
-       private void updateInterestedAfterReceive() {
-               boolean interested;
-               for (Connection connection : peers.values()) {
-                       interested = false;
-                       if (connection.amInterested) {
-                               for (Integer piece : currentPieces) {
-                                       if (connection.bitfield[piece] == '1') {
-                                               interested = true;
-                                               break;
-                                       }
-                               }       
-                               if (!interested) {
-                                       connection.amInterested = false;
-                                       sendNotInterested(connection.mailbox);
-                               }
-                       }
-               }
-       }
-       private void updateBitfieldBlocks(int index, int blockIndex, int blockLength) {
-               for (int i = blockIndex; i < (blockIndex + blockLength); i++) {
-                       bitfieldBlocks[index][i] = '1';
-               }
-       }
-       /**
-        * Returns if a piece is complete in the peer's bitfield.
-        * @param index the index of the piece.
-        */
-       private boolean pieceComplete(int index) {
-               for (int i = 0; i < bitfieldBlocks[index].length; i++) {
-                       if (bitfieldBlocks[index][i] == '0') {
-                               return false;
-                       }
-               }
-               return true;
-       }
-       /**
-        * Returns the first block of a piece that we don't have. 
-        */
-       private int getFirstBlock(int piece) {
-               int blockIndex = -1;
-               for (int i = 0; i < Common.PIECES_BLOCKS; i++) {
-                       if (bitfieldBlocks[piece][i] == '0') {
-                               blockIndex = i;
-                               break;
-                       }
-               }       
-               return blockIndex;
-       }
-       /**
-        * Send request messages to a peer that have unchoked us
-        * @param remotePeer peer data to the peer we want to send the request
-        */
-       private void sendRequestsToPeer(Connection remotePeer) {
-               if (remotePeer.bitfield == null) {
-                       return;
-               }
-               for (Integer piece : currentPieces) {
-                       //Getting the block to send.    
-                       int blockIndex = -1, blockLength = 0;
-                       blockIndex = getFirstBlock(piece);                      
-                       blockLength = Common.PIECES_BLOCKS - blockIndex ;
-                       blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;            
-                       if (remotePeer.bitfield[piece] == '1') {
-                               sendRequest(remotePeer.mailbox, piece, blockIndex, blockLength);
-                       }                       
-               }
-       }       
-       /**
-        * Find the peers that have the current interested piece and send them
-        * the "interested" message
-        */
-       private void sendInterestedToPeers() {
-               if (currentPiece == -1) {
-                       return;
-               }
-               for (Connection connection : peers.values()) {
-                       if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
-                               connection.amInterested = true;                         
-                               MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
-                               task.dsend(connection.mailbox);                         
-                       }
-               }
-               currentPiece = -1;
-               piecesRequested++;
-       }
-       /**
-        * Send a "interested" message to a peer.
-        */
-       private void sendInterested(String mailbox) {
-               MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
-               task.dsend(mailbox);                                            
-       }
-       /**
-        * Send a "not interested" message to a peer
-        * @param mailbox mailbox destination mailbox
-        */
-       private void sendNotInterested(String mailbox) {
-               MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
-               task.dsend(mailbox);                            
-       }
-       /**
-        * Send a handshake message to all the peers the peer has.
-        * @param peer peer data
-        */
-       private void sendHandshakeAll() {
-               for (Connection remotePeer : peers.values()) {
-                       MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
-                       id);
-                       task.dsend(remotePeer.mailbox);
-               }
-       }
-       /**
-        * Send a "handshake" message to an user
-        * @param mailbox mailbox where to we send the message
-        */
-       private void sendHandshake(String mailbox) {
-               Msg.debug("Sending a HANDSHAKE to " + mailbox);
-               MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
-               task.dsend(mailbox);            
-       }
-       /**
-        * Send a "choked" message to a peer
-        */
-       private void sendChoked(String mailbox) {
-               Msg.debug("Sending a CHOKE to " + mailbox);
-               MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
-               task.dsend(mailbox);
-       }
-       /**
-        * Send a "unchoked" message to a peer
-        */
-       private void sendUnchoked(String mailbox) {
-               Msg.debug("Sending a UNCHOKE to " + mailbox);
-               MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
-               task.dsend(mailbox);
-       }
-       /**
-        * Send a "HAVE" message to all peers we are connected to
-        */
-       private void sendHave(int piece) {
-               Msg.debug("Sending HAVE message to all my peers");
-               for (Connection remotePeer : peers.values()) {
-                       MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
-                       task.dsend(remotePeer.mailbox);
-               }
-       }
-       /**
-        * Send a bitfield message to all the peers the peer has.
-        * @param peer peer data
-        */
-       private void sendBitfield(String mailbox) {
-               Msg.debug("Sending a BITFIELD to " + mailbox);
-               MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
-               task.dsend(mailbox);
-       }
-       /**
-        * Send a "request" message to a pair, containing a request for a piece
-        */
-       private void sendRequest(String mailbox, int piece, int blockIndex, int blockLength) {
-               Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece + " and blocks " + blockIndex + "," + (blockIndex + blockLength));
-               MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece, blockIndex, blockLength);
-               task.dsend(mailbox);
-       }
-       /**
-        * Send a "piece" message to a pair, containing a piece of the file
-        */
-       private void sendPiece(String mailbox, int piece, boolean stalled, int blockIndex, int blockLength) {
-               Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
-               MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled, blockIndex, blockLength);
-               task.dsend(mailbox);
-       }
-       
-       private String getStatus() {
-               String s = "";
-               for (int i = 0; i < Common.FILE_PIECES; i++) {
-                       s = s + bitfield[i];
-               }
-               return s;
-       }
+  protected int round = 0;
+  protected double beginReceiveTime;
+  protected double deadline;
+  protected static RngStream stream = new RngStream();
+  protected int id;
+  protected String mailbox;
+  protected String mailboxTracker;
+  protected String hostname;
+  protected int pieces = 0;
+  protected char[] bitfield = new char[Common.FILE_PIECES];
+  protected char[][] bitfieldBlocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
+  protected short[] piecesCount = new short[Common.FILE_PIECES];
+  protected int piecesRequested = 0;
+  protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
+  protected int currentPiece = -1;
+  protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();  
+  protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
+  protected Comm commReceived = null;
+
+  public Peer(Host host, String name, String[]args) {
+    super(host,name,args);
+  }
+
+  @Override
+  public void main(String[] args) throws MsgException {
+    //Check arguments
+    if (args.length != 3 && args.length != 2) {
+      Msg.info("Wrong number of arguments");
+    }
+    if (args.length == 3) {
+      init(Integer.valueOf(args[0]),true);
+    } else {
+      init(Integer.valueOf(args[0]),false);
+    }
+    //Retrieve the deadline
+    deadline = Double.valueOf(args[1]);
+    if (deadline < 0) {
+      Msg.info("Wrong deadline supplied");
+      return;
+    }
+    Msg.info("Hi, I'm joining the network with id " + id);
+    //Getting peer data from the tracker
+    if (getPeersData()) {
+      Msg.debug("Got " + peers.size() + " peers from the tracker");
+      Msg.debug("Here is my current status: " + getStatus());
+      beginReceiveTime = Msg.getClock();      
+      if (hasFinished()) {
+        pieces = Common.FILE_PIECES;
+        sendHandshakeAll();
+        seedLoop();
+      } else {
+        leechLoop();
+        seedLoop();
+      }
+    } else {
+      Msg.info("Couldn't contact the tracker.");
+    }
+    Msg.info("Here is my current status: " + getStatus());
+  }
+
+  private void leechLoop() {
+    double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
+    Msg.debug("Start downloading.");
+    // Send a "handshake" message to all the peers it got(it couldn't have gotten more than 50 peers anyway)
+    sendHandshakeAll();
+    //Wait for at least one "bitfield" message.
+    waitForPieces();
+    Msg.debug("Starting main leech loop");
+    while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
+      if (commReceived == null) {
+        commReceived = Task.irecv(mailbox);
+      }
+      try {
+        if (commReceived.test()) {
+          handleMessage(commReceived.getTask());
+          commReceived = null;
+        } else {
+          //If the user has a pending interesting
+          if (currentPiece != -1) {
+            sendInterestedToPeers();
+          } else {
+            if (currentPieces.size() < Common.MAX_PIECES) {
+              updateCurrentPiece();
+            }
+          }
+          //We don't execute the choke algorithm if we don't already have a piece
+          if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
+            updateChokedPeers();
+            nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
+          } else {
+            waitFor(1);
+          }
+        }
+      }
+      catch (MsgException e) {
+        commReceived = null;
+      }
+    }
+  }
+
+  private void seedLoop() {
+    double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
+    Msg.debug("Start seeding.");
+    //start the main seed loop
+    while (Msg.getClock() < deadline) {
+      if (commReceived == null) {
+        commReceived = Task.irecv(mailbox);
+      }
+      try {
+        if (commReceived.test()) {
+          handleMessage(commReceived.getTask());
+          commReceived = null;
+        } else {
+          if (Msg.getClock() >= nextChokedUpdate) {
+            updateChokedPeers();
+            //TODO: Change the choked peer algorithm when seeding
+            nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
+          } else {
+            waitFor(1);
+          }
+        }
+      }
+      catch (MsgException e) {
+        commReceived = null;
+      }
+    }
+  }
+
+  /**
+   * @brief Initialize the various peer data
+   * @param id id of the peer to take in the network
+   * @param seed indicates if the peer is a seed
+   */
+  private void init(int id, boolean seed) {
+    this.id = id;
+    this.mailbox = Integer.toString(id);
+    this.mailboxTracker = "tracker_" + Integer.toString(id);
+    if (seed) {
+      for (int i = 0; i < bitfield.length; i++) {
+        bitfield[i] = '1';
+        for (int j = 0; j < bitfieldBlocks[i].length; j++) {
+          bitfieldBlocks[i][j] = '1';
+        }
+      }
+    } else {
+      for (int i = 0; i < bitfield.length; i++) {
+        bitfield[i] = '0';
+        for (int j = 0; j < bitfieldBlocks[i].length; j++) {
+          bitfieldBlocks[i][j] = '0'  ;
+        }
+      }
+    }
+    this.hostname = getHost().getName();
+  }
+
+  private boolean getPeersData() {
+    boolean success = false, sendSuccess = false;
+    double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
+    //Build the task to send to the tracker
+    TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
+
+    while (!sendSuccess && Msg.getClock() < timeout) {
+      try {
+        Msg.debug("Sending a peer request to the tracker.");
+        taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
+        sendSuccess = true;
+      }
+      catch (MsgException e) {
+      }
+    }
+    while (!success && Msg.getClock() < timeout) {
+      commReceived = Task.irecv(this.mailboxTracker);
+      try {
+        commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
+        if (commReceived.getTask() instanceof TrackerTask) {
+          TrackerTask task = (TrackerTask)commReceived.getTask();
+          for (Integer peerId: task.peers) {
+            if (peerId != this.id) {
+              peers.put(peerId, new Connection(peerId));
+            }
+          }
+          success = true;
+        }
+      }
+      catch (MsgException e) {}
+      commReceived = null;
+    }
+    commReceived = null;
+    return success;
+  }
+
+  void handleMessage(Task task) {
+    MessageTask message = (MessageTask)task;
+    Connection remotePeer = peers.get(message.peerId);
+    switch (message.type) {
+      case HANDSHAKE:
+        Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
+        //Check if the peer is in our connection list
+        if (remotePeer == null) {
+          peers.put(message.peerId, new Connection(message.peerId));
+          sendHandshake(message.mailbox);
+        }
+        //Send our bitfield to the pair
+        sendBitfield(message.mailbox);
+      break;
+      case BITFIELD:
+        Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
+        //update the pieces list
+        updatePiecesCountFromBitfield(message.bitfield);
+        //Update the current piece
+        if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
+          updateCurrentPiece();
+        }
+        remotePeer.bitfield  = message.bitfield.clone();
+      break;
+      case INTERESTED:
+        Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
+        assert remotePeer != null;
+        remotePeer.interested = true;
+      break;
+      case NOTINTERESTED:
+        Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
+        assert remotePeer != null;
+        remotePeer.interested = false;
+      break;
+      case UNCHOKE:
+        Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
+        assert remotePeer != null;
+        remotePeer.chokedDownload = false;
+        activePeers.put(remotePeer.id,remotePeer);
+        sendRequestsToPeer(remotePeer);
+      break;
+      case CHOKE:
+        Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
+        assert remotePeer != null;
+        remotePeer.chokedDownload = true;
+        activePeers.remove(remotePeer.id);
+      break;
+      case HAVE:
+        if (remotePeer.bitfield == null) {
+          return;
+        }
+        Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
+        assert message.index >= 0 && message.index < Common.FILE_PIECES;
+        assert remotePeer.bitfield != null;
+        remotePeer.bitfield[message.index] = '1';
+        piecesCount[message.index]++; 
+        //Send interested message to the peer if he has what we want
+        if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
+          remotePeer.amInterested = true;
+          sendInterested(remotePeer.mailbox);
+        }
+        
+        if (currentPieces.contains(message.index)) {
+          int blockIndex = getFirstBlock(message.index);      
+          int blockLength = Common.PIECES_BLOCKS - blockIndex ;
+          blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;    
+          sendRequest(message.mailbox,message.index,blockIndex,blockLength);
+        }
+      break;
+      case REQUEST:
+        assert message.index >= 0 && message.index < Common.FILE_PIECES;
+        if (!remotePeer.chokedUpload) {
+          Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " 
+                    + message.peerId);
+          if (bitfield[message.index] == '1') {
+            sendPiece(message.mailbox,message.index,false,message.blockIndex,message.blockLength);  
+          } else {
+            Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname 
+                      + ") but he is choked" );
+          }
+        }
+      break;
+      case PIECE:
+        if (message.stalled) {
+          Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname 
+                    + ") is stalled");
+        } else {
+          Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" 
+                    + message.issuerHostname + ")");
+          if (bitfield[message.index] == '0') {
+            updateBitfieldBlocks(message.index,message.blockIndex,message.blockLength);
+            if (pieceComplete(message.index)) {
+              piecesRequested--;
+              //Removing the piece from our piece list.
+              currentPieces.remove((Object)Integer.valueOf(message.index));
+              //Setting the fact that we have the piece
+              bitfield[message.index] = '1';
+              pieces++;
+              Msg.debug("My status is now " + getStatus());
+              //Sending the information to all the peers we are connected to
+              sendHave(message.index);
+              //sending UNINTERESTED to peers that doesn't have what we want.
+              updateInterestedAfterReceive();
+            }
+          } else {
+            Msg.debug("However, we already have it.");
+          }
+        }
+      break;
+    }
+    if (remotePeer != null) {
+      remotePeer.addSpeedValue(1 / (Msg.getClock() - beginReceiveTime));
+    }
+    beginReceiveTime = Msg.getClock();
+  }
+
+  void waitForPieces() {
+    boolean finished = false;
+    while (Msg.getClock() < deadline && !finished) {
+      if (commReceived == null) {
+        commReceived = Task.irecv(mailbox);
+      }
+      try {
+        commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
+        handleMessage(commReceived.getTask());
+        if (currentPiece != -1) {
+          finished = true;
+        }
+        commReceived = null;
+      }
+      catch (MsgException e) {
+        commReceived = null;
+      }
+    }
+  }
+
+  private boolean hasFinished() {
+    for (int i = 0; i < bitfield.length; i++) {
+      if (bitfield[i] == '1') {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * @brief Updates the list of who has a piece from a bitfield
+   * @param bitfield bitfield
+   */
+  private void updatePiecesCountFromBitfield(char bitfield[]) {
+    for (int i = 0; i < Common.FILE_PIECES; i++) {
+      if (bitfield[i] == '1') {
+        piecesCount[i]++;
+      }
+    }
+  }
+
+  /**
+   * Update the piece the peer is currently interested in.
+   * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
+   * If the peer has less than 3 pieces, he chooses a piece at random.
+   * If the peer has more than pieces, he downloads the pieces that are the less
+   * replicated
+   */
+  void updateCurrentPiece() {
+    if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
+      return;
+    }
+    if (true || pieces < 3) {
+      int peerPiece;
+      do {
+        currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
+      } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
+    }
+    else {
+      //trivial min algorithm.
+      //TODO
+    }
+    currentPieces.add(currentPiece);
+    Msg.debug("New interested piece: " + currentPiece);
+    assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
+  }
+
+  // Update the list of current choked and unchoked peers, using the choke algorithm
+  private void updateChokedPeers() {
+    round = (round + 1) % 3;
+    if (peers.size() == 0) {
+      return;
+    }
+    //remove a peer from the list
+    Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
+    if (it.hasNext()) {
+      Entry<Integer,Connection> e = it.next();
+      Connection peerChoked = e.getValue();
+      peerChoked.chokedUpload = true;
+      sendChoked(peerChoked.mailbox);
+      activePeers.remove(e.getKey());
+    }
+    Connection peerChoosed = null;
+    //Separate the case from when the peer is seeding.
+    if (pieces == Common.FILE_PIECES) {
+      //Find the last unchoked peer.
+      double unchokeTime = deadline + 1;
+      for (Connection connection : peers.values()) {
+        if (connection.lastUnchoke < unchokeTime && connection.interested) {
+          peerChoosed = connection;
+          unchokeTime = connection.lastUnchoke;
+        }
+      }
+    } else {
+      //Random optimistic unchoking
+      if (round == 0) {
+        int j = 0, i;
+        do {
+          i = 0;
+          int idChosen = stream.randInt(0,peers.size() - 1);
+          for (Connection connection : peers.values()) {
+            if (i == idChosen) {
+              peerChoosed = connection;
+              break;
+            }
+            i++;
+          } //TODO: Not really the best way ever
+          if (!peerChoosed.interested) {
+            peerChoosed = null;
+          }
+          j++;
+        } while (peerChoosed == null && j < Common.MAXIMUM_PEERS);
+      } else {
+        Connection fastest = null;
+        double fastestSpeed = 0;
+        for (Connection c : peers.values()) {
+          if (c.peerSpeed > fastestSpeed && c.interested && c.chokedUpload) {
+            fastest = c;
+            fastestSpeed = c.peerSpeed;
+          }
+        }
+        peerChoosed = fastest;
+      }
+    }
+    if (peerChoosed != null) {
+      activePeers.put(peerChoosed.id,peerChoosed);
+      peerChoosed.chokedUpload = false;
+      peerChoosed.lastUnchoke = Msg.getClock();
+      sendUnchoked(peerChoosed.mailbox);
+    }
+  }
+
+  // Updates our "interested" state about peers: send "not interested" to peers that don't have any more pieces we want.
+  private void updateInterestedAfterReceive() {
+    boolean interested;
+    for (Connection connection : peers.values()) {
+      interested = false;
+      if (connection.amInterested) {
+        for (Integer piece : currentPieces) {
+          if (connection.bitfield[piece] == '1') {
+            interested = true;
+            break;
+          }
+        }
+        if (!interested) {
+          connection.amInterested = false;
+          sendNotInterested(connection.mailbox);
+        }
+      }
+    }
+  }
+
+  private void updateBitfieldBlocks(int index, int blockIndex, int blockLength) {
+    for (int i = blockIndex; i < (blockIndex + blockLength); i++) {
+      bitfieldBlocks[index][i] = '1';
+    }
+  }
+
+  // Returns if a piece is complete in the peer's bitfield.
+  private boolean pieceComplete(int index) {
+    for (int i = 0; i < bitfieldBlocks[index].length; i++) {
+      if (bitfieldBlocks[index][i] == '0') {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  // Returns the first block of a piece that we don't have. 
+  private int getFirstBlock(int piece) {
+    int blockIndex = -1;
+    for (int i = 0; i < Common.PIECES_BLOCKS; i++) {
+      if (bitfieldBlocks[piece][i] == '0') {
+        blockIndex = i;
+        break;
+      }
+    }
+    return blockIndex;
+  }
+
+  /**
+   * @brief Send request messages to a peer that have unchoked us
+   * @param remotePeer peer data to the peer we want to send the request
+   */
+  private void sendRequestsToPeer(Connection remotePeer) {
+    if (remotePeer.bitfield == null) {
+      return;
+    }
+    for (Integer piece : currentPieces) {
+      //Getting the block to send.  
+      int blockIndex = -1, blockLength = 0;
+      blockIndex = getFirstBlock(piece);      
+      blockLength = Common.PIECES_BLOCKS - blockIndex ;
+      blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;    
+      if (remotePeer.bitfield[piece] == '1') {
+        sendRequest(remotePeer.mailbox, piece, blockIndex, blockLength);
+      }
+    }
+  }
+
+  // Find the peers that have the current interested piece and send them the "interested" message
+  private void sendInterestedToPeers() {
+    if (currentPiece == -1) {
+      return;
+    }
+    for (Connection connection : peers.values()) {
+      if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
+        connection.amInterested = true;        
+        MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
+        task.dsend(connection.mailbox);        
+      }
+    }
+    currentPiece = -1;
+    piecesRequested++;
+  }
+
+  // Send a "interested" message to a peer.
+  private void sendInterested(String mailbox) {
+    MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
+    task.dsend(mailbox);
+  }
+
+  /**
+   * @brief Send a "not interested" message to a peer
+   * @param mailbox mailbox destination mailbox
+   */
+  private void sendNotInterested(String mailbox) {
+    MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
+    task.dsend(mailbox);
+  }
+
+  // Send a handshake message to all the peers the peer has.
+  private void sendHandshakeAll() {
+    for (Connection remotePeer : peers.values()) {
+      MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox, id);
+      task.dsend(remotePeer.mailbox);
+    }
+  }
+
+  /**
+   * @brief Send a "handshake" message to an user
+   * @param mailbox mailbox where to we send the message
+   */
+  private void sendHandshake(String mailbox) {
+    Msg.debug("Sending a HANDSHAKE to " + mailbox);
+    MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
+    task.dsend(mailbox);
+  }
+
+  // Send a "choked" message to a peer
+  private void sendChoked(String mailbox) {
+    Msg.debug("Sending a CHOKE to " + mailbox);
+    MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
+    task.dsend(mailbox);
+  }
+
+  // Send a "unchoked" message to a peer
+  private void sendUnchoked(String mailbox) {
+    Msg.debug("Sending a UNCHOKE to " + mailbox);
+    MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
+    task.dsend(mailbox);
+  }
+
+  // Send a "HAVE" message to all peers we are connected to
+  private void sendHave(int piece) {
+    Msg.debug("Sending HAVE message to all my peers");
+    for (Connection remotePeer : peers.values()) {
+      MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
+      task.dsend(remotePeer.mailbox);
+    }
+  }
+  // Send a bitfield message to all the peers the peer has.
+  private void sendBitfield(String mailbox) {
+    Msg.debug("Sending a BITFIELD to " + mailbox);
+    MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
+    task.dsend(mailbox);
+  }
+  // Send a "request" message to a peer, containing a request for a piece
+  private void sendRequest(String mailbox, int piece, int blockIndex, int blockLength) {
+    Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece + " and blocks " + blockIndex + ","
+              + (blockIndex + blockLength));
+    MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece, blockIndex, 
+                                       blockLength);
+    task.dsend(mailbox);
+  }
+
+  // Send a "piece" message to a peer, containing a piece of the file
+  private void sendPiece(String mailbox, int piece, boolean stalled, int blockIndex, int blockLength) {
+    Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
+    MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled,
+                                       blockIndex, blockLength);
+    task.dsend(mailbox);
+  }
+
+  private String getStatus() {
+    String s = "";
+    for (int i = 0; i < Common.FILE_PIECES; i++) {
+      s = s + bitfield[i];
+    }
+    return s;
+  }
 }
-