+/*
+ * Copyright (c) 2007-2008 Fabrizio Frioli, Michele Pedrolli
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ * --
+ *
+ * Please send your questions/suggestions to:
+ * {fabrizio.frioli, michele.pedrolli} at studenti dot unitn dot it
+ *
+ */
+
+package example.bittorrent;
+
+import peersim.core.*;
+import peersim.config.*;
+import peersim.edsim.*;
+import peersim.transport.*;
+
+/**
+ * This is the class that implements the BitTorrent module for Peersim
+ */
+public class BitTorrent implements EDProtocol {
+ /**
+ * The size in Megabytes of the file being shared.
+ * @config
+ */
+ private static final String PAR_SIZE="file_size";
+ /**
+ * The Transport used by the the protocol.
+ * @config
+ */
+ private static final String PAR_TRANSPORT="transport";
+ /**
+ * The maximum number of neighbor that a node can have.
+ * @config
+ */
+ private static final String PAR_SWARM="max_swarm_size";
+ /**
+ * The maximum number of peers returned by the tracker when a new
+ * set of peers is requested through a <tt>TRACKER</tt> message.
+ * @config
+ */
+ private static final String PAR_PEERSET_SIZE="peerset_size";
+ /**
+ * Defines how much the network can grow with respect to the <tt>network.size</tt>
+ * when {@link NetworkDynamics} is used.
+ * @config
+ */
+ private static final String PAR_MAX_GROWTH="max_growth";
+ /**
+ * Is the number of requests of the same block sent to different peers.
+ * @config
+ */
+ private static final String PAR_DUP_REQ = "duplicated_requests";
+
+ /**
+ * KEEP_ALIVE message.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int KEEP_ALIVE = 1;
+
+ /**
+ * CHOKE message.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int CHOKE = 2;
+
+ /**
+ * UNCHOKE message.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int UNCHOKE = 3;
+
+ /**
+ * INTERESTED message.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int INTERESTED = 4;
+
+ /**
+ * NOT_INTERESTED message.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int NOT_INTERESTED = 5;
+
+ /**
+ * HAVE message.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int HAVE = 6;
+
+ /**
+ * BITFIELD message.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int BITFIELD = 7;
+
+ /**
+ * REQUEST message.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int REQUEST = 8;
+
+ /**
+ * PIECE message.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int PIECE = 9;
+
+ /**
+ * CANCEL message.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int CANCEL = 10;
+
+ /**
+ * TRACKER message.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int TRACKER = 11;
+
+ /**
+ * PEERSET message.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int PEERSET = 12;
+
+ /**
+ * CHOKE_TIME event.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int CHOKE_TIME = 13;
+
+ /**
+ * OPTUNCHK_TIME event.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int OPTUNCHK_TIME = 14;
+
+ /**
+ * ANTISNUB_TIME event.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int ANTISNUB_TIME = 15;
+
+ /**
+ * CHECKALIVE_TIME event.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int CHECKALIVE_TIME = 16;
+
+ /**
+ * TRACKERALIVE_TIME event.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int TRACKERALIVE_TIME = 17;
+
+ /**
+ * DOWNLOAD_COMPLETED event.
+ * @see SimpleEvent#type "Event types"
+ */
+ private static final int DOWNLOAD_COMPLETED = 18;
+
+ /**
+ * The maxium connection speed of the local node.
+ */
+ int maxBandwidth;
+
+ /**
+ * Stores the neighbors ordered by ID.
+ * @see Element
+ */
+ private example.bittorrent.Element byPeer[];
+
+ /**
+ * Contains the neighbors ordered by bandwidth as needed by the unchocking
+ * algorithm.
+ */
+ private example.bittorrent.Element byBandwidth[];
+
+ /**
+ * The Neighbors list.
+ */
+ private Neighbor cache[];
+
+ /**
+ * Reference to the neighbors that unchocked the local node.
+ */
+ private boolean unchokedBy[];
+
+ /**
+ * Number of neighbors in the cache. When it decreases under 20, a new peerset
+ * is requested to the tracker.
+ */
+ private int nNodes = 0;
+
+ /**
+ * Maximum number of nodes in the network.
+ */
+ private int nMaxNodes;
+
+ /**
+ * The status of the local peer. 0 means that the current peer is a leecher, 1 a seeder.
+ */
+ private int peerStatus;
+
+ /**
+ * Defines how much the network can grow with respect to the <tt>network.size</tt>
+ * when {@link NetworkDynamics} is used.
+ */
+ public int maxGrowth;
+
+ /**
+ * File status of the local node. Contains the blocks owned by the local node.
+ */
+ private int status[];
+
+ /**
+ * Current number of Bitfield request sent. It must be taken into account
+ * before sending another one.
+ */
+ private int nBitfieldSent = 0;
+
+ /**
+ * Current number of pieces in upload from the local peer.
+ */
+ public int nPiecesUp = 0;
+ /**
+ * Current number of pieces in download to the local peer.
+ */
+ public int nPiecesDown = 0;
+
+ /**
+ * Current number of piece completed.
+ */
+ private int nPieceCompleted = 0;
+
+ /**
+ * Current downloading piece ID, the previous lastInterested piece.
+ */
+ int currentPiece = -1;
+
+ /**
+ * Used to compute the average download rates in choking algorithm. Stores the
+ * number of <tt>CHOKE</tt> events.
+ */
+ int n_choke_time = 0;
+
+ /**
+ * Used to send the <tt>TRACKER</tt> message when the local node has 20 neighbors
+ * for the first time.
+ */
+ boolean lock = false;
+
+ /**
+ * Number of peers interested to my pieces.
+ */
+ int numInterestedPeers = 0;
+
+ /**
+ * Last piece for which the local node sent an <tt>INTERESTED</tt> message.
+ */
+ int lastInterested = -1;
+
+ /**
+ * The status of the current piece in download. Length 16, every time the local node
+ * receives a PIECE message, it updates the corrisponding block's cell. The cell
+ * contains the ID for that block of that piece. If an already owned
+ * block is received this is discarded.
+ */
+ private int pieceStatus[];
+
+ /**
+ * Length of the file. Stored as number of pieces (256KB each one).
+ */
+ int nPieces;
+
+ /**
+ * Contains the neighbors's status of the file. Every row represents a
+ * node and every a cell has value O if the neighbor doesn't
+ * have the piece, 1 otherwise. It has {@link #swarmSize} rows and {@link #nPieces}
+ * columns.
+ */
+ int [][]swarm;
+
+ /**
+ * The summation of the swarm's rows. Calculated every time a {@link #BITFIELD} message
+ * is received and updated every time HAVE message is received.
+ */
+ int rarestPieceSet[];
+
+ /**
+ * The five pending block requests.
+ */
+ int pendingRequest[];
+
+ /**
+ * The maximum swarm size (default is 80)
+ */
+ int swarmSize;
+
+ /**
+ * The size of the peerset. This is the number of "friends" nodes
+ * sent from the tracker to each new node (default: 50)
+ */
+ int peersetSize;
+
+ /**
+ * The ID of the current node
+ */
+ private long thisNodeID;
+
+ /**
+ * Number of duplicated requests as specified in the configuration file.
+ * @see BitTorrent#PAR_DUP_REQ
+ */
+ private int numberOfDuplicatedRequests;
+
+ /**
+ * The queue where the requests to serve are stored.
+ * The default dimension of the queue is 20.
+ */
+ Queue requestToServe = null;
+
+ /**
+ * The queue where the out of sequence incoming pieces are stored
+ * waiting for the right moment to be processed.
+ * The default dimension of the queue is 100.
+ */
+ Queue incomingPieces = null;
+
+ /**
+ * The Transport ID.
+ * @see BitTorrent#PAR_TRANSPORT
+ */
+ int tid;
+
+ /**
+ * The reference to the tracker node. If equals to <tt>null</tt>, the local
+ * node is the tracker.
+ */
+ private Node tracker = null;
+
+ /**
+ * The default constructor. Reads the configuration file and initializes the
+ * configuration parameters.
+ * @param prefix the component prefix declared in the configuration file
+ */
+ public BitTorrent(String prefix){ // Used for the tracker's protocol
+ tid = Configuration.getPid(prefix+"."+PAR_TRANSPORT);
+ nPieces = (int)((Configuration.getInt(prefix+"."+PAR_SIZE))*1000000/256000);
+ swarmSize = (int)Configuration.getInt(prefix+"."+PAR_SWARM);
+ peersetSize = (int)Configuration.getInt(prefix+"."+PAR_PEERSET_SIZE);
+ numberOfDuplicatedRequests = (int)Configuration.getInt(prefix+"."+PAR_DUP_REQ);
+ maxGrowth = (int)Configuration.getInt(prefix+"."+PAR_MAX_GROWTH);
+ nMaxNodes = Network.getCapacity()-1;
+ }
+
+ /**
+ * Gets the reference to the tracker node.
+ * @return the reference to the tracker
+ */
+ public Node getTracker(){
+ return tracker;
+ }
+
+ /**
+ * Gets the number of neighbors currently stored in the cache of the local node.
+ * @return the number of neighbors in the cache
+ */
+ public int getNNodes(){
+ return this.nNodes;
+ }
+
+ /**
+ * Sets the reference to the tracker node.
+ * @param t the tracker node
+ */
+ public void setTracker(Node t){
+ tracker = t;
+ }
+
+ /**
+ * Sets the ID of the local node.
+ * @param id the ID of the node
+ */
+ public void setThisNodeID(long id) {
+ this.thisNodeID = id;
+ }
+
+ /**
+ * Gets the ID of the local node.
+ * @return the ID of the local node
+ */
+ public long getThisNodeID(){
+ return this.thisNodeID;
+ }
+
+ /**
+ * Gets the file status of the local node.
+ * @return the file status of the local node
+ */
+ public int[] getFileStatus(){
+ return this.status;
+ }
+
+ /**
+ * Initializes the tracker node. This method
+ * only performs the initialization of the tracker's cache.
+ */
+ public void initializeTracker() {
+ cache = new Neighbor[nMaxNodes+maxGrowth];
+ for(int i=0; i<nMaxNodes+maxGrowth; i++){
+ cache[i]= new Neighbor();
+ }
+ }
+
+ /**
+ * <p>Checks the number of neighbors and if it is equal to 20
+ * sends a TRACKER messages to the tracker, asking for a new
+ * peer set.</p>
+ *
+ * <p>This method *must* be called after every call of {@link #removeNeighbor}
+ * in {@link #processEvent}.
+ * </p>
+ */
+ private void processNeighborListSize(Node node, int pid) {
+ if (nNodes==20) {
+ Object ev;
+ long latency;
+ ev = new SimpleMsg(TRACKER, node);
+ Node tracker = ((BitTorrent)node.getProtocol(pid)).tracker;
+ if(tracker != null){
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node, tracker);
+// EDSimulator.add(latency,ev,tracker,pid);
+ ((Transport) node.getProtocol(tid)).send(node, tracker, ev, pid);
+ }
+ }
+ }
+
+ /**
+ * The standard method that processes incoming events.
+ * @param node reference to the local node for which the event is going to be processed
+ * @param pid BitTorrent's protocol id
+ * @param event the event to process
+ */
+ public void processEvent(Node node, int pid, Object event){
+
+ Object ev;
+ long latency;
+ switch(((SimpleEvent)event).getType()){
+
+ case KEEP_ALIVE: // 1
+ {
+ Node sender = ((IntMsg)event).getSender();
+ int isResponse = ((IntMsg)event).getInt();
+ //System.out.println("process, keep_alive: sender is "+sender.getID()+", local is "+node.getID());
+ Element e = search(sender.getID());
+ if(e!= null){ //if I know the sender
+ cache[e.peer].isAlive();
+ if(isResponse==0 && alive(sender)){
+ Object msg = new IntMsg(KEEP_ALIVE,node,1,0);
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node, sender);
+// EDSimulator.add(latency,msg,sender,pid);
+ ((Transport) node.getProtocol(tid)).send(node, sender, msg, pid);
+ cache[e.peer].justSent();
+ }
+ }
+ else{
+ System.err.println("despite it should never happen, it happened");
+ ev = new BitfieldMsg(BITFIELD, true, false, node, status, nPieces);
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,sender);
+// EDSimulator.add(latency,ev,sender,pid);
+ ((Transport) node.getProtocol(tid)).send(node, sender, ev, pid);
+ nBitfieldSent++;
+ }
+
+ };break;
+
+ case CHOKE: // 2, CHOKE message.
+ {
+ Node sender = ((SimpleMsg)event).getSender();
+ //System.out.println("process, choke: sender is "+sender.getID()+", local is "+node.getID());
+ Element e = search(sender.getID());
+ if(e!= null){ //if I know the sender
+ cache[e.peer].isAlive();
+ unchokedBy[e.peer]= false; // I'm choked by it
+ }
+ else{
+ System.err.println("despite it should never happen, it happened");
+ ev = new BitfieldMsg(BITFIELD, true, false, node, status, nPieces);
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,sender);
+// EDSimulator.add(latency,ev,sender,pid);
+ ((Transport) node.getProtocol(tid)).send(node, sender, ev, pid);
+ nBitfieldSent++;
+ }
+ };break;
+
+ case UNCHOKE: // 3, UNCHOKE message.
+ {
+ Node sender = ((SimpleMsg)event).getSender();
+ //System.out.println("process, unchoke: sender is "+sender.getID()+", local is "+node.getID());
+ Element e = search(sender.getID());
+ if(e != null){ // If I know the sender
+ int senderIndex = e.peer;
+ cache[senderIndex].isAlive();
+ /* I send to it some of the pending requests not yet satisfied. */
+ int t = numberOfDuplicatedRequests;
+ for(int i=4;i>=0 && t>0;i--){
+ if(pendingRequest[i]==-1)
+ break;
+ if(alive(cache[senderIndex].node) && swarm[senderIndex][decode(pendingRequest[i],0)]==1){ //If the sender has that piece
+ ev = new IntMsg(REQUEST, node,pendingRequest[i],0);
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,sender);
+// EDSimulator.add(latency,ev, sender,pid);
+ ((Transport) node.getProtocol(tid)).send(node, sender, ev, pid);
+ cache[senderIndex].justSent();
+ }
+ if(!alive(cache[senderIndex].node)){
+ System.out.println("unchoke1 rm neigh "+ cache[i].node.getID() );
+ removeNeighbor(cache[senderIndex].node);
+ processNeighborListSize(node,pid);
+ return;
+ }
+ t--;
+ }
+ // I request missing blocks to fill the queue
+ int block = getBlock();
+ int piece;
+ while(block != -2){ //while still available request to send
+ if(block < 0){ // No more block to request for the current piece
+ piece = getPiece();
+ if(piece == -1){ // no more piece to request
+ break;
+ }
+ for(int j=0; j<swarmSize; j++){// send the interested message to those
+ // nodes which have that piece
+ lastInterested = piece;
+ if(alive(cache[j].node) && swarm[j][piece]==1){
+ ev = new IntMsg(INTERESTED, node, lastInterested,0);
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,cache[j].node);
+// EDSimulator.add(latency,ev,cache[j].node,pid);
+ ((Transport) node.getProtocol(tid)).send(node, cache[j].node, ev, pid);
+ cache[j].justSent();
+ }
+
+ if(!alive(cache[j].node)){
+ //System.out.println("unchoke2 rm neigh "+ cache[j].node.getID() );
+ removeNeighbor(cache[j].node);
+ processNeighborListSize(node,pid);
+ }
+ }
+ block = getBlock();
+ }
+ else{ // block value referred to a real block
+ if(alive(cache[senderIndex].node) && swarm[senderIndex][decode(block,0)]==1 && addRequest(block)){ // The sender has that block
+ ev = new IntMsg(REQUEST, node, block,0);
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,sender);
+// EDSimulator.add(latency,ev,sender,pid);
+ ((Transport) node.getProtocol(tid)).send(node, sender, ev, pid);
+
+ cache[senderIndex].justSent();
+ }
+ else{
+ if(!alive(cache[senderIndex].node)){
+ System.out.println("unchoke3 rm neigh "+ cache[senderIndex].node.getID() );
+ removeNeighbor(cache[senderIndex].node);
+ processNeighborListSize(node,pid);
+ }
+ return;
+ }
+ block = getBlock();
+ }
+ }
+ unchokedBy[senderIndex] = true; // I add the sender to the list
+ }
+ else // It should never happen.
+ {
+ System.err.println("despite it should never happen, it happened");
+ for(int i=0; i<swarmSize; i++)
+ if(cache[i].node !=null)
+ System.err.println(cache[i].node.getID());
+ ev = new BitfieldMsg(BITFIELD, true, false, node, status, nPieces);
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,sender);
+// EDSimulator.add(latency,ev,sender,pid);
+ ((Transport) node.getProtocol(tid)).send(node, sender, ev, pid);
+ nBitfieldSent++;
+ }
+ };break;
+
+ case INTERESTED: // 4, INTERESTED message.
+ {
+ numInterestedPeers++;
+ Node sender = ((IntMsg)event).getSender();
+ //System.out.println("process, interested: sender is "+sender.getID()+", local is "+node.getID());
+ int value = ((IntMsg)event).getInt();
+ Element e = search(sender.getID());
+ if(e!=null){
+ cache[e.peer].isAlive();
+ cache[e.peer].interested = value;
+ }
+ else{
+ System.err.println("despite it should never happen, it happened");
+ ev = new BitfieldMsg(BITFIELD, true, false, node, status, nPieces);
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,sender);
+// EDSimulator.add(latency,ev,sender,pid);
+ ((Transport) node.getProtocol(tid)).send(node, sender, ev, pid);
+ nBitfieldSent++;
+ }
+
+ }; break;
+
+ case NOT_INTERESTED: // 5, NOT_INTERESTED message.
+ {
+ numInterestedPeers--;
+ Node sender = ((IntMsg)event).getSender();
+ //System.out.println("process, not_interested: sender is "+sender.getID()+", local is "+node.getID());
+ int value = ((IntMsg)event).getInt();
+ Element e = search(sender.getID());
+ if(e!=null){
+ cache[e.peer].isAlive();
+ if(cache[e.peer].interested == value)
+ cache[e.peer].interested = -1; // not interested
+ }
+ }; break;
+
+ case HAVE: // 6, HAVE message.
+ {
+ Node sender = ((IntMsg)event).getSender();
+ //System.out.println("process, have: sender is "+sender.getID()+", local is "+node.getID());
+ int piece = ((IntMsg)event).getInt();
+ Element e = search(sender.getID());
+ if(e!=null){
+ cache[e.peer].isAlive();
+ swarm[e.peer][piece]=1;
+ rarestPieceSet[piece]++;
+ boolean isSeeder = true;
+ for(int i=0; i<nPieces; i++){
+ isSeeder = isSeeder && (swarm[e.peer][i]==1);
+ }
+ e.isSeeder = isSeeder;
+ }
+ else{
+ System.err.println("despite it should never happen, it happened");
+ ev = new BitfieldMsg(BITFIELD, true, false, node, status, nPieces);
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,sender);
+// EDSimulator.add(latency,ev,sender,pid);
+ ((Transport) node.getProtocol(tid)).send(node, sender, ev, pid);
+ nBitfieldSent++;
+ }
+ }; break;
+
+ case BITFIELD: // 7, BITFIELD message
+ {
+ Node sender = ((BitfieldMsg)event).getSender();
+ int []fileStatus = ((BitfieldMsg)event).getArray();
+ /*Response with NACK*/
+ if(!((BitfieldMsg)event).isRequest && !((BitfieldMsg)event).ack){
+ Element e = search(sender.getID());
+ if(e == null) // if is a response with nack that follows a request
+ nBitfieldSent--;
+ // otherwise is a response with ack that follows a duplicate
+ // insertion attempt
+ //System.out.println("process, bitfield_resp_nack: sender is "+sender.getID()+", local is "+node.getID());
+ return;
+ }
+ /*Request with NACK*/
+ if(((BitfieldMsg)event).isRequest && !((BitfieldMsg)event).ack){
+ //System.out.println("process, bitfield_req_nack: sender is "+sender.getID()+", local is "+node.getID());
+ if(alive(sender)){
+ Element e = search(sender.getID());
+ ev = new BitfieldMsg(BITFIELD, false, true, node, status, nPieces); //response with ack
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,sender);
+// EDSimulator.add(latency,ev,sender,pid);
+ ((Transport) node.getProtocol(tid)).send(node, sender, ev, pid);
+ cache[e.peer].justSent();
+ }
+ }
+ /*Response with ACK*/
+ if(!((BitfieldMsg)event).isRequest && ((BitfieldMsg)event).ack){
+ nBitfieldSent--;
+ //System.out.println("process, bitfield_resp_ack: sender is "+sender.getID()+", local is "+node.getID());
+ if(alive(sender)){
+ if(addNeighbor(sender)){
+ Element e = search(sender.getID());
+ cache[e.peer].isAlive();
+ swarm[e.peer] = fileStatus;
+ boolean isSeeder = true;
+ for(int i=0; i<nPieces; i++){
+ rarestPieceSet[i]+= fileStatus[i];
+ isSeeder = isSeeder && (fileStatus[i]==1);
+ }
+ e.isSeeder = isSeeder;
+
+ if(nNodes==10 && !lock){ // I begin to request pieces
+ lock = true;
+ int piece = getPiece();
+ if(piece == -1)
+ return;
+ lastInterested = piece;
+ currentPiece = lastInterested;
+ ev = new IntMsg(INTERESTED, node, lastInterested,0);
+ for(int i=0; i<swarmSize; i++){// send the interested message to those
+ // nodes which have that piece
+ if(alive(cache[i].node) && swarm[i][piece]==1){
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,cache[i].node);
+// EDSimulator.add(latency,ev,cache[i].node,pid);
+ ((Transport) node.getProtocol(tid)).send(node, cache[i].node, ev, pid);
+ cache[i].justSent();
+ }
+ }
+
+ }
+
+ }
+ }
+ else
+ System.out.println("Sender "+sender.getID()+" not alive");
+ }
+ /*Request with ACK*/
+ if(((BitfieldMsg)event).isRequest && ((BitfieldMsg)event).ack){
+ //System.out.println("process, bitfield_req_ack: sender is "+sender.getID()+", local is "+node.getID());
+ if(alive(sender)){
+ if(addNeighbor(sender)){
+ Element e = search(sender.getID());
+ cache[e.peer].isAlive();
+ swarm[e.peer] = fileStatus;
+ boolean isSeeder = true;
+ for(int i=0; i<nPieces; i++){
+ rarestPieceSet[i]+= fileStatus[i]; // I update the rarestPieceSet with the pieces of the new node
+ isSeeder = isSeeder && (fileStatus[i]==1); // I check if the new node is a seeder
+ }
+ e.isSeeder = isSeeder;
+ ev = new BitfieldMsg(BITFIELD, false, true, node, status, nPieces); //response with ack
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,sender);
+// EDSimulator.add(latency,ev,sender,pid);
+ ((Transport) node.getProtocol(tid)).send(node, sender, ev, pid);
+ cache[e.peer].justSent();
+ if(nNodes==10 && !lock){ // I begin to request pieces
+ int piece = getPiece();
+ if(piece == -1)
+ return;
+ lastInterested = piece;
+ currentPiece = lastInterested;
+ ev = new IntMsg(INTERESTED, node, lastInterested,0);
+ for(int i=0; i<swarmSize; i++){// send the interested message to those
+ // nodes which have that piece
+ if(alive(cache[i].node) && swarm[i][piece]==1){
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,cache[i].node);
+// EDSimulator.add(latency,ev,cache[i].node,pid);
+ ((Transport) node.getProtocol(tid)).send(node, cache[i].node, ev, pid);
+ cache[i].justSent();
+ }
+ }
+
+ }
+ }
+ else {
+ Element e;
+ if((e = search(sender.getID()))!=null){ // The sender was already in the cache
+ cache[e.peer].isAlive();
+ ev = new BitfieldMsg(BITFIELD, false, true, node, status, nPieces); //response with ack
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,sender);
+// EDSimulator.add(latency,ev,sender,pid);
+ ((Transport) node.getProtocol(tid)).send(node, sender, ev, pid);
+ cache[e.peer].justSent();
+ }
+ else{ // Was not be possible add the sender (nBitfield+nNodes > swarmSize)
+ ev = new BitfieldMsg(BITFIELD, false, false, node, status, nPieces); //response with nack
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,sender);
+// EDSimulator.add(latency,ev,sender,pid);
+ ((Transport) node.getProtocol(tid)).send(node, sender, ev, pid);
+ }
+ }
+
+ }
+ else
+ System.out.println("Sender "+sender.getID()+" not alive");
+ }
+ };break;
+
+ case REQUEST: // 8, REQUEST message.
+ {
+ Object evnt;
+ Node sender = ((IntMsg)event).getSender();
+ int value = ((IntMsg)event).getInt();
+ Element e;
+ BitTorrent senderP;
+ int remoteRate;
+ int localRate;
+ int bandwidth;
+ int downloadTime;
+
+ e = search(sender.getID());
+ if (e==null)
+ return;
+ cache[e.peer].isAlive();
+
+ requestToServe.enqueue(value, sender);
+
+ /*I serve the enqueued requests until 10 uploding pieces or an empty queue*/
+ while(!requestToServe.empty() && nPiecesUp <10){
+ Request req = requestToServe.dequeue();
+ e = search(req.sender.getID());
+ if(e!=null && alive(req.sender)){
+// ev = new IntMsg(PIECE, node, req.id);
+ nPiecesUp++;
+ e.valueUP++;
+ senderP = ((BitTorrent)req.sender.getProtocol(pid));
+ senderP.nPiecesDown++;
+ remoteRate = senderP.maxBandwidth/(senderP.nPiecesUp + senderP.nPiecesDown);
+ localRate = maxBandwidth/(nPiecesUp + nPiecesDown);
+ bandwidth = Math.min(remoteRate, localRate);
+ downloadTime = ((16*8)/(bandwidth))*1000; // in milliseconds
+
+ ev = new IntMsg(PIECE, node, req.id, 16*8 * 1024);
+ ((Transport) node.getProtocol(tid)).send(node, req.sender, ev, pid);
+
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,req.sender);
+// EDSimulator.add(latency+downloadTime,ev,req.sender,pid);
+ cache[e.peer].justSent();
+
+ /*I send to me an event to indicate that the download is completed.
+ This prevent that, when the receiver death occurres, my value nPiecesUp
+ doesn't decrease.*/
+ evnt = new SimpleMsg(DOWNLOAD_COMPLETED, req.sender);
+// EDSimulator.add(latency+downloadTime,evnt,node,pid);
+ ((Transport) node.getProtocol(tid)).send(node, node, evnt, pid);
+ }
+ }
+ }; break;
+
+ case PIECE: // 9, PIECE message.
+ {
+ Node sender = ((IntMsg)event).getSender();
+ /* Set the correct value for the local uploading and remote
+ downloading number of pieces */
+ nPiecesDown--;
+
+ if(peerStatus == 1)// To save CPU cycles
+ return;
+ //System.out.println("process, piece: sender is "+sender.getID()+", local is "+node.getID());
+ Element e = search(sender.getID());
+
+ if(e==null){ //I can't accept a piece not wait
+ return;
+ }
+ e.valueDOWN++;
+
+ cache[e.peer].isAlive();
+
+ int value = ((IntMsg)event).getInt();
+ int piece = decode(value,0);
+ int block = decode(value,1);
+ /* If the block has not been already downloaded and it belongs to
+ the current downloading piece.*/
+ if(piece == currentPiece && decode(pieceStatus[block],0)!= piece){
+ pieceStatus[block] = value;
+ status[piece]++;
+ removeRequest(value);
+ requestNextBlocks(node, pid, e.peer);
+
+ }else{ // Either a future piece or an owned piece
+ if(piece!=currentPiece && status[piece]!=16){ // Piece not owned, will be considered later
+ incomingPieces.enqueue(value, sender);
+ }
+
+ }
+ ev = new IntMsg(CANCEL, node, value,0);
+ /* I send a CANCEL to all nodes to which I previously requested the block*/
+ for(int i=0; i<swarmSize; i++){
+ if(alive(cache[i].node) && unchokedBy[i]==true && swarm[i][decode(block,0)]==1 && cache[i].node != sender){
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,cache[i].node);
+// EDSimulator.add(latency,ev,cache[i].node,pid);
+ ((Transport) node.getProtocol(tid)).send(node, cache[i].node, ev, pid);
+ cache[i].justSent();
+ }
+ }
+
+ if(status[currentPiece]==16){ // if piece completed, I change the currentPiece to the next wanted
+ nPieceCompleted++;
+ ev = new IntMsg(HAVE, node, currentPiece,0);
+ for(int i=0; i<swarmSize; i++){ // I send the HAVE for the piece
+ if(alive(cache[i].node)){
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,cache[i].node);
+// EDSimulator.add(latency,ev,cache[i].node,pid);
+ ((Transport) node.getProtocol(tid)).send(node, cache[i].node, ev, pid);
+ cache[i].justSent();
+ }
+ if(!alive(cache[i].node)){
+ //System.out.println("piece3 rm neigh "+ cache[i].node.getID() );
+ removeNeighbor(cache[i].node);
+ processNeighborListSize(node,pid);
+ }
+ }
+ ev = new IntMsg(NOT_INTERESTED, node, currentPiece,0);
+ for(int i=0; i<swarmSize; i++){ // I send the NOT_INTERESTED to which peer I sent an INTERESTED
+ if(swarm[i][piece]==1 && alive(cache[i].node)){
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,cache[i].node);
+// EDSimulator.add(latency,ev,cache[i].node,pid);
+ ((Transport) node.getProtocol(tid)).send(node, cache[i].node, ev, pid);
+ cache[i].justSent();
+ }
+ if(!alive(cache[i].node)){
+ //System.out.println("piece4 rm neigh "+ cache[i].node.getID() );
+ removeNeighbor(cache[i].node);
+ processNeighborListSize(node,pid);
+ }
+ }
+ if(nPieceCompleted == nPieces){
+ System.out.println("FILE COMPLETED for peer "+node.getID());
+ this.peerStatus = 1;
+ }
+
+ /* I set the currentPiece to the lastInterested. Then I extract
+ the queued received blocks
+ */
+
+ currentPiece = lastInterested;
+ int m = incomingPieces.dim;
+ while(m > 0){ // I process the queue
+ m--;
+ Request temp = incomingPieces.dequeue();
+ int p = decode(temp.id,0); // piece id
+ int b = decode(temp.id,1); // block id
+ Element s = search(temp.sender.getID());
+ if(s==null) // if the node that sent the block in the queue is dead
+ continue;
+ if(p==currentPiece && decode(pieceStatus[b],0)!= p){
+ pieceStatus[b] = temp.id;
+ status[p]++;
+ removeRequest(temp.id);
+ requestNextBlocks(node, pid, s.peer);
+ }
+ else{ // The piece not currently desired will be moved to the tail
+ if(p!= currentPiece) // If not a duplicate block but belongs to another piece
+ incomingPieces.enqueue(temp.id,temp.sender);
+ else // duplicate block
+ requestNextBlocks(node, pid, s.peer);
+ }
+ }
+ }
+ }; break;
+
+ case CANCEL:
+ {
+ Node sender = ((IntMsg)event).getSender();
+ int value = ((IntMsg)event).getInt();
+ requestToServe.remove(sender, value);
+ };break;
+
+ case PEERSET: // PEERSET message
+ {
+ Node sender = ((PeerSetMsg)event).getSender();
+ //System.out.println("process, peerset: sender is "+sender.getID()+", local is "+node.getID());
+ Neighbor n[] = ((PeerSetMsg)event).getPeerSet();
+
+ for(int i=0; i<peersetSize; i++){
+ if( n[i]!=null && alive(n[i].node) && search(n[i].node.getID())==null && nNodes+nBitfieldSent <swarmSize-2) {
+ ev = new BitfieldMsg(BITFIELD, true, true, node, status, nPieces);
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node,n[i].node);
+// EDSimulator.add(latency,ev,n[i].node,pid);
+ ((Transport) node.getProtocol(tid)).send(node, n[i].node, ev, pid);
+
+ nBitfieldSent++;
+ // Here I should call the Neighbor.justSent(), but here
+ // the node is not yet in the cache.
+ }
+ }
+ }; break;
+
+ case TRACKER: // TRACKER message
+ {
+
+ int j=0;
+ Node sender = ((SimpleMsg)event).getSender();
+ //System.out.println("process, tracker: sender is "+sender.getID()+", local is "+node.getID());
+ if(!alive(sender))
+ return;
+ Neighbor tmp[] = new Neighbor[peersetSize];
+ int k=0;
+ if(nNodes <= peersetSize){
+ for(int i=0; i< nMaxNodes+maxGrowth; i++){
+ if(cache[i].node != null && cache[i].node.getID()!= sender.getID()){
+ tmp[k]=cache[i];
+ k++;
+ }
+ }
+ ev = new PeerSetMsg(PEERSET, tmp, node);
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node, sender);
+// EDSimulator.add(latency,ev,sender,pid);
+ ((Transport) node.getProtocol(tid)).send(node,sender, ev, pid);
+ return;
+ }
+
+ while(j < peersetSize){
+ int i = CommonState.r.nextInt(nMaxNodes+maxGrowth);
+ for (int z=0; z<j; z++){
+ if(cache[i].node==null || tmp[z].node.getID() == cache[i].node.getID() || cache[i].node.getID() == sender.getID()){
+ z=0;
+ i= CommonState.r.nextInt(nMaxNodes+maxGrowth);
+ }
+ }
+ if(cache[i].node != null){
+ tmp[j] = cache[i];
+ j++;
+ }
+ }
+ ev = new PeerSetMsg(PEERSET, tmp, node);
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node, sender);
+// EDSimulator.add(latency,ev,sender,pid);
+ ((Transport) node.getProtocol(tid)).send(node,sender, ev, pid);
+ }; break;
+
+ case CHOKE_TIME: //Every 10 secs.
+ {
+ n_choke_time++;
+
+ ev = new SimpleEvent(CHOKE_TIME);
+ EDSimulator.add(10000,ev,node,pid);
+ int j=0;
+ /*I copy the interested nodes in the byBandwidth array*/
+ for(int i=0;i< swarmSize && byPeer[i].peer != -1; i++){
+ if(cache[byPeer[i].peer].interested > 0){
+ byBandwidth[j]=byPeer[i]; //shallow copy
+ j++;
+ }
+ }
+
+ /*It ensures that in the next 20sec, if there are less nodes interested
+ than now, those in surplus will not be ordered. */
+ for(;j<swarmSize;j++){
+ byBandwidth[j]=null;
+ }
+ sortByBandwidth();
+ int optimistic = 3;
+ int luckies[] = new int[3];
+ try{ // It takes the first three neighbors
+ luckies[0] = byBandwidth[0].peer;
+ optimistic--;
+ luckies[1] = byBandwidth[1].peer;
+ optimistic--;
+ luckies[2] = byBandwidth[2].peer;
+ }
+ catch(NullPointerException e){ // If not enough peer in byBandwidth it chooses the other romdomly
+ for(int z = optimistic; z>0;z--){
+ int lucky = CommonState.r.nextInt(nNodes);
+ while(cache[byPeer[lucky].peer].status ==1 && alive(cache[byPeer[lucky].peer].node) &&
+ cache[byPeer[lucky].peer].interested == 0)// until the lucky peer is already unchoked or not interested
+ lucky = CommonState.r.nextInt(nNodes);
+ luckies[3-z]= byPeer[lucky].peer;
+ }
+ }
+ for(int i=0; i<swarmSize; i++){ // I perform the chokes and the unchokes
+ if((i==luckies[0] || i==luckies[1] || i==luckies[2]) && alive(cache[i].node) && cache[i].status != 2){ //the unchokes
+ cache[i].status = 1;
+ ev = new SimpleMsg(UNCHOKE, node);
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node, cache[i].node);
+// EDSimulator.add(latency,ev,cache[i].node,pid);
+ ((Transport) node.getProtocol(tid)).send(node,cache[i].node, ev, pid);
+ cache[i].justSent();
+ //System.out.println("average time, unchoked: "+cache[i].node.getID());
+ }
+ else{ // the chokes
+ if(alive(cache[i].node) && (cache[i].status == 1 || cache[i].status == 2)){
+ cache[i].status = 0;
+ ev = new SimpleMsg(CHOKE, node);
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node, cache[i].node);
+// EDSimulator.add(latency,ev,cache[i].node,pid);
+ ((Transport) node.getProtocol(tid)).send(node,cache[i].node, ev, pid);
+ cache[i].justSent();
+ }
+ }
+ }
+
+ if(n_choke_time%2==0){ //every 20 secs. Used in computing the average download rates
+ for(int i=0; i<nNodes; i++){
+ if(this.peerStatus == 0){ // I'm a leeacher
+ byPeer[i].head20 = byPeer[i].valueDOWN;
+ }
+ else{
+ byPeer[i].head20 = byPeer[i].valueUP;
+ }
+ }
+ }
+ }; break;
+
+ case OPTUNCHK_TIME:
+ {
+
+ //System.out.println("process, optunchk_time");
+
+ ev = new SimpleEvent(OPTUNCHK_TIME);
+ EDSimulator.add(30000,ev,node,pid);
+ int lucky = CommonState.r.nextInt(nNodes);
+ while(cache[byPeer[lucky].peer].status ==1)// until the lucky peer is already unchoked
+ lucky = CommonState.r.nextInt(nNodes);
+ if(!alive(cache[byPeer[lucky].peer].node))
+ return;
+ cache[byPeer[lucky].peer].status = 1;
+ Object msg = new SimpleMsg(UNCHOKE,node);
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node, cache[byPeer[lucky].peer].node);
+// EDSimulator.add(latency,msg,cache[byPeer[lucky].peer].node,pid);
+ ((Transport) node.getProtocol(tid)).send(node,cache[byPeer[lucky].peer].node, msg, pid);
+ cache[byPeer[lucky].peer].justSent();
+ }; break;
+
+ case ANTISNUB_TIME:
+ {
+ if(this.peerStatus == 1) // I'm a seeder, I don't update the event
+ return;
+ //System.out.println("process, antisnub_time");
+ for(int i=0; i<nNodes; i++){
+ if(byPeer[i].valueDOWN >0 && (byPeer[i].valueDOWN - byPeer[i].head60)==0){// No blocks downloaded in 1 min
+ cache[byPeer[i].peer].status = 2; // I'm snubbed by it
+ }
+ byPeer[i].head60 = byPeer[i].valueDOWN;
+ }
+ ev = new SimpleEvent(ANTISNUB_TIME);
+ EDSimulator.add(60000,ev,node,pid);
+ long time = CommonState.getTime();
+ }; break;
+
+ case CHECKALIVE_TIME:
+ {
+
+ //System.out.println("process, checkalive_time");
+
+ long now = CommonState.getTime();
+ for(int i=0; i<swarmSize; i++){
+ /*If are at least 2 minutes (plus 1 sec of tolerance) that
+ I don't send anything to it.*/
+ if(alive(cache[i].node) && (cache[i].lastSent < (now-121000))){
+ Object msg = new IntMsg(KEEP_ALIVE,node,0,0);
+// latency = ((Transport)node.getProtocol(tid)).getLatency(node, cache[i].node);
+// EDSimulator.add(latency,msg,cache[i].node,pid);
+ ((Transport) node.getProtocol(tid)).send(node,cache[i].node, msg, pid);
+ cache[i].justSent();
+ }
+ /*If are at least 2 minutes (plus 1 sec of tolerance) that I don't
+ receive anything from it though I sent a keepalive 2 minutes ago*/
+ else{
+ if(cache[i].lastSeen <(now-121000) && cache[i].node != null && cache[i].lastSent < (now-121000)){
+ System.out.println("process, checkalive_time, rm neigh " + cache[i].node.getID());
+ if(cache[i].node.getIndex() != -1){
+ System.out.println("This should never happen: I remove a node that is not effectively died");
+ }
+ removeNeighbor(cache[i].node);
+ processNeighborListSize(node,pid);
+ }
+ }
+ }
+ ev = new SimpleEvent(CHECKALIVE_TIME);
+ EDSimulator.add(120000,ev,node,pid);
+ }; break;
+
+ case TRACKERALIVE_TIME:
+ {
+ //System.out.println("process, trackeralive_time");
+ if(alive(tracker)){
+ ev = new SimpleEvent(TRACKERALIVE_TIME);
+ EDSimulator.add(1800000,ev,node,pid);
+ }
+ else
+ tracker=null;
+
+ }; break;
+
+ case DOWNLOAD_COMPLETED:
+ {
+ nPiecesUp--;
+ }; break;
+
+ }
+ }
+
+ /**
+ * Given a piece index and a block index it encodes them in an unique integer value.
+ * @param piece the index of the piece to encode.
+ * @param block the index of the block to encode.
+ * @return the encoding of the piece and the block indexes.
+ */
+ private int encode(int piece, int block){
+ return (piece*100)+block;
+
+ }
+ /**
+ * Returns either the piece or the block that contained in the <tt>value</tt> depending
+ * on <tt>part</tt>: 0 means the piece value, 1 the block value.
+ * @param value the ID of the block to decode.
+ * @param part the information to extract from <tt>value</tt>. 0 means the piece index, 1 the block index.
+ * @return the piece or the block index depending about the value of <tt>part</tt>
+ */
+ private int decode(int value, int part){
+ if (value==-1) // Not a true value to decode
+ return -1;
+ if(part == 0) // I'm interested to the piece
+ return value/100;
+ else // I'm interested to the block
+ return value%100;
+ }
+
+ /**
+ * Used by {@link NodeInitializer#choosePieces(int, BitTorrent) NodeInitializer} to set
+ * the number of piece completed from the beginning in according with
+ * the distribution in the configuration file.
+ * @param number the number of piece completed
+ */
+ public void setCompleted(int number){
+ this.nPieceCompleted = number;
+ }
+
+ /**
+ * Sets the status (the set of blocks) of the file for the current node.
+ * Note that a piece is considered <i>completed</i> if the number
+ * of downloaded blocks is 16.
+ * @param index The index of the piece
+ * @param value Number of blocks downloaded for the piece index.
+ */
+ public void setStatus(int index, int value){
+ status[index]=value;
+ }
+
+ /**
+ * Sets the status of the local node.
+ * @param status The status of the node: 1 means seeder, 0 leecher
+ */
+ public void setPeerStatus(int status){
+ this.peerStatus = status;
+ }
+
+ /**
+ * Gets the status of the local node.
+ * @return The status of the local node: 1 means seeder, 0 leecher
+ */
+ public int getPeerStatus(){
+ return peerStatus;
+ }
+
+ /**
+ * Gets the number of blocks for a given piece owned by the local node.
+ * @param index The index of the piece
+ * @return Number of blocks downloaded for the piece index
+ */
+ public int getStatus(int index){
+ return status[index];
+ }
+
+ /**
+ * Sets the maximum bandwdith for the local node.
+ * @param value The value of bandwidth in Kbps
+ */
+ public void setBandwidth(int value){
+ maxBandwidth = value;
+ }
+
+ /**
+ * Checks if a node is still alive in the simulated network.
+ * @param node The node to check
+ * @return true if the node <tt>node</tt> is up, false otherwise
+ * @see peersim.core.GeneralNode#isUp
+ */
+ public boolean alive(Node node){
+ if(node == null)
+ return false;
+ else
+ return node.isUp();
+ }
+
+ /**
+ * Adds a neighbor to the cache of the local node.
+ * The new neighbor is put in the first null position.
+ * @param neighbor The neighbor node to add
+ * @return <tt>false</tt> if the neighbor is already present in the cache (this can happen when the peer requests a
+ * new peer set to the tracker an there is still this neighbor within) or no place is available.
+ * Otherwise, returns true if the node is correctly added to the cache.
+ */
+ public boolean addNeighbor(Node neighbor){
+ if(search(neighbor.getID()) !=null){// if already exists
+ // System.err.println("Node "+neighbor.getID() + " not added, already exist.");
+ return false;
+ }
+ if(this.tracker == null){ // I'm in the tracker's BitTorrent protocol
+ for(int i=0; i< nMaxNodes+maxGrowth; i++){
+ if(cache[i].node == null){
+ cache[i].node = neighbor;
+ cache[i].status = 0; //chocked
+ cache[i].interested = -1; //not interested
+ this.nNodes++;
+
+ //System.err.println("i: " + i +" nMaxNodes: " + nMaxNodes);
+ return true;
+ }
+ }
+ }
+ else{
+ if((nNodes+nBitfieldSent) < swarmSize){
+ //System.out.println("I'm the node " + this.thisNodeID + ", trying to add node "+neighbor.getID());
+ for(int i=0; i<swarmSize; i++){
+ if(cache[i].node == null){
+ cache[i].node = neighbor;
+ cache[i].status = 0; //choked
+ cache[i].interested = -1; // not interested
+ byPeer[nNodes].peer = i;
+ byPeer[nNodes].ID = neighbor.getID();
+ sortByPeer();
+ this.nNodes++;
+ //System.out.println(neighbor.getID()+" added!");
+ return true;
+ }
+ }
+ System.out.println("Node not added, no places available");
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Removes a neighbor from the cache of the local node.
+ * @param neighbor The node to remove
+ * @return true if the node is correctly removed, false otherwise.
+ */
+ public boolean removeNeighbor(Node neighbor) {
+
+ if (neighbor == null)
+ return true;
+
+ // this is the tracker's bittorrent protocol
+ if (this.tracker == null) {
+ for (int i=0; i< (nMaxNodes+maxGrowth); i++) {
+
+ // check the feasibility of the removal
+ if ( (cache[i] != null) && (cache[i].node != null) &&
+ (cache[i].node.getID() == neighbor.getID()) ) {
+ cache[i].node = null;
+ this.nNodes--;
+ return true;
+ }
+ }
+ return false;
+ }
+ // this is the bittorrent protocol of a peer
+ else {
+
+ Element e = search(neighbor.getID());
+
+ if (e != null) {
+ for (int i=0; i<nPieces; i++) {
+ rarestPieceSet[i] -= swarm[e.peer][i];
+ swarm[e.peer][i] = 0;
+ }
+
+ cache[e.peer].node = null;
+ cache[e.peer].status = 0;
+ cache[e.peer].interested = -1;
+ unchokedBy[e.peer] = false;
+ this.nNodes--;
+ e.peer = -1;
+ e.ID = Integer.MAX_VALUE;
+ e.valueUP = 0;
+ e.valueDOWN = 0;
+ e.head20 = 0;
+ e.head60 = 0;
+ sortByPeer();
+
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Adds a request to the pendingRequest queue.
+ * @param block The requested block
+ * @return true if the request has been successfully added to the queue, false otherwise
+ */
+ private boolean addRequest(int block){
+ int i=4;
+ while(i>=0 && pendingRequest[i]!=-1){
+ i--;
+ }
+ if(i>=0){
+ pendingRequest[i] = block;
+ return true;
+ }
+ else { // It should never happen
+ //System.err.println("pendingRequest queue full");
+ return false;
+ }
+ }
+
+ /**
+ * Removes the block with the given <tt>id</tt> from the {@link #pendingRequest} queue
+ * and sorts the queue leaving the empty cell at the left.
+ * @param id the id of the requested block
+ */
+ private void removeRequest(int id){
+ int i = 4;
+ for(; i>=0; i--){
+ if(pendingRequest[i]==id)
+ break;
+ }
+ for(; i>=0; i--){
+ if(i==0)
+ pendingRequest[i] = -1;
+ else
+ pendingRequest[i] = pendingRequest[i-1];
+ }
+ }
+
+ /**
+ * Requests new block until the {@link #pendingRequest} is full to the sender of the just received piece.
+ * It calls {@link #getNewBlock(Node, int)} to implement the <i>strict priority</i> strategy.
+ * @param node the local node
+ * @param pid the BitTorrent protocol id
+ * @param sender the sender of the just received received piece.
+ */
+ private void requestNextBlocks(Node node, int pid, int sender){
+ int block = getNewBlock(node, pid);
+ while(block != -2){
+ if(unchokedBy[sender]==true && alive(cache[sender].node) && addRequest(block)){
+ Object ev = new IntMsg(REQUEST, node, block,0);
+
+// long latency = ((Transport)node.getProtocol(tid)).getLatency(node,cache[sender].node);
+// EDSimulator.add(latency,ev,cache[sender].node,pid);
+
+ ((Transport) node.getProtocol(tid)).send(node,cache[sender].node, ev, pid);
+ cache[sender].justSent();
+ }
+ else{ // I cannot send request
+ if(!alive(cache[sender].node) && cache[sender].node!=null){
+ System.out.println("piece2 rm neigh "+ cache[sender].node.getID() );
+ removeNeighbor(cache[sender].node);
+ processNeighborListSize(node,pid);
+ }
+ return;
+ }
+ block = getNewBlock(node, pid);
+ }
+ }
+
+ /**
+ * It returns the id of the next block to request. Sends <tt>INTERESTED</tt> if the new
+ * block belongs to a new piece.
+ * It uses {@link #getBlock()} to get the next block of a piece and calls {@link #getPiece()}
+ * when all the blocks for the {@link #currentPiece} have been requested.
+ * @param node the local node
+ * @param pid the BitTorrent protocol id
+ * @return -2 if no more places available in the <tt>pendingRequest</tt> queue;<br/>
+ * the value of the next block to request otherwise</p>
+ */
+ private int getNewBlock(Node node, int pid){
+ int block = getBlock();
+ if(block < 0){ // No more block to request for the current piece
+
+ if(block ==-2) // Pending request queue full
+ return -2;
+
+ int newPiece = getPiece();
+ if(newPiece == -1){ // no more piece to request
+ return -2;
+ }
+
+ lastInterested = newPiece;
+ Object ev = new IntMsg(INTERESTED, node, lastInterested,0);
+
+ for(int j=0; j<swarmSize; j++){// send the interested message to those
+ // nodes which have that piece
+ if(alive(cache[j].node) && swarm[j][newPiece]==1){
+// long latency = ((Transport)node.getProtocol(tid)).getLatency(node,cache[j].node);
+// EDSimulator.add(latency,ev,cache[j].node,pid);
+ ((Transport) node.getProtocol(tid)).send(node,cache[j].node, ev, pid);
+ cache[j].justSent();
+ }
+ if(!alive(cache[j].node)){
+ //System.out.println("piece1 rm neigh "+ cache[j].node.getID() );
+
+ removeNeighbor(cache[j].node);
+ processNeighborListSize(node,pid);
+ }
+ }
+ block = getBlock();
+ return block;
+ }
+ else{
+ // block value referred to a real block
+ return block;
+ }
+ }
+
+ /**
+ * Returns the next block to request for the {@link #currentPiece}.
+ * @return an index of a block of the <tt>currentPiece</tt> if there are still
+ * available places in the {@link #pendingRequest} queue;<br/>
+ * -2 if the <tt>pendingRequest</tt> queue is full;<br/>
+ * -1 if no more blocks to request for the current piece.
+ */
+ private int getBlock(){
+ int i=4;
+ while(i>=0 && pendingRequest[i]!=-1){ // i is the first empty position from the head
+ i--;
+ }
+ if(i==-1){// No places in the pendingRequest available
+ //System.out.println("Pendig request queue full!");
+ return -2;
+ }
+ int j;
+ //The queue is not empty & last requested block belongs to lastInterested piece
+ if(i!=4 && decode(pendingRequest[i+1],0)==lastInterested)
+ j=decode(pendingRequest[i+1],1)+1; // the block following the last requested
+ else // I don't know which is the next block, so I search it.
+ j=0;
+ /* I search another block until the current has been already received.
+ * If in pieceStatus at position j there is a block that belongs to
+ * lastInterested piece, means that the block j has been already
+ * received, otherwise I can request it.
+ */
+ while(j<16 && decode(pieceStatus[j],0)==lastInterested){
+ j++;
+ }
+ if(j==16) // No more block to request for lastInterested piece
+ return -1;
+ return encode(lastInterested,j);
+ }
+
+ /**
+ * Returns the next correct piece to download. It choose the piece by using the
+ * <i>random first</i> and <i>rarest first</i> policy. For the beginning 4 pieces
+ * of a file the first one is used then the pieces are chosen using <i>rarest first</i>.
+ * @see "Documentation about the BitTorrent module"
+ * @return the next piece to download. If the whole file has been requested
+ * -1 is returned.
+ */
+ private int getPiece(){
+ int piece = -1;
+ if(nPieceCompleted < 4){ //Uses random first piece
+ piece = CommonState.r.nextInt(nPieces);
+ while(status[piece]==16 || piece == currentPiece) // until the piece is owned
+ piece = CommonState.r.nextInt(nPieces);
+ return piece;
+ }
+ else{ //Uses rarest piece first
+ int j=0;
+ for(; j<nPieces; j++){ // I find the first not owned piece
+ if(status[j]==0){
+ piece = j;
+ if(piece != lastInterested) // teoretically it works because
+ // there should be only one interested
+ // piece not yet downloaded
+ break;
+ }
+ }
+ if(piece==-1){ // Never entered in the previous 'if' statement; for all
+ // pieces an has been sent
+ return -1;
+ }
+
+ int rarestPieces[] = new int[nPieces-j]; // the pieces with the less number of occurences\
+ rarestPieces[0] = j;
+ int nValues = 1; // number of pieces less distributed in the network
+ for(int i=j+1; i<nPieces; i++){ // Finds the rarest piece not owned
+ if(rarestPieceSet[i]< rarestPieceSet[rarestPieces[0]] && status[i]==0){ // if strictly less than the current one
+ rarestPieces[0] = i;
+ nValues = 1;
+ }
+ if(rarestPieceSet[i]==rarestPieceSet[rarestPieces[0]] && status[i]==0){ // if equal
+ rarestPieces[nValues] = i;
+ nValues++;
+ }
+ }
+
+ piece = CommonState.r.nextInt(nValues); // one of the less owned pieces
+ return rarestPieces[piece];
+ }
+ }
+
+ /**
+ * Returns the file's size as number of pieces of 256KB.
+ * @return number of pieces that compose the file.
+ */
+ public int getNPieces(){
+ return nPieces;
+ }
+ /**
+ * Clone method of the class. Returns a deep copy of the BitTorrent class. Used
+ * by the simulation to initialize the {@link peersim.core.Network}
+ * @return the deep copy of the BitTorrent class.
+ */
+ public Object clone(){
+ Object prot = null;
+ try{
+ prot = (BitTorrent)super.clone();
+ }
+ catch(CloneNotSupportedException e){};
+
+ ((BitTorrent)prot).cache = new Neighbor[swarmSize];
+ for(int i=0; i<swarmSize; i++){
+ ((BitTorrent)prot).cache[i] = new Neighbor();
+ }
+
+ ((BitTorrent)prot).byPeer = new Element[swarmSize];
+ for(int i=0; i<swarmSize; i++){
+ ((BitTorrent)prot).byPeer[i] = new Element();
+ }
+
+ ((BitTorrent)prot).unchokedBy = new boolean[swarmSize];
+
+ ((BitTorrent)prot).byBandwidth = new Element[swarmSize];
+ ((BitTorrent)prot).status = new int[nPieces];
+ ((BitTorrent)prot).pieceStatus = new int[16];
+ for(int i=0; i<16;i++)
+ ((BitTorrent)prot).pieceStatus[i] = -1;
+ ((BitTorrent)prot).pendingRequest = new int[5];
+ for(int i=0; i<5;i++)
+ ((BitTorrent)prot).pendingRequest[i] = -1;
+ ((BitTorrent)prot).rarestPieceSet = new int[nPieces];
+ for(int i=0; i<nPieces;i++)
+ ((BitTorrent)prot).rarestPieceSet[i] = 0;
+ ((BitTorrent)prot).swarm = new int[swarmSize][nPieces];
+ ((BitTorrent)prot).requestToServe = new Queue(20);
+ ((BitTorrent)prot).incomingPieces = new Queue(100);
+ return prot;
+ }
+
+ /**
+ * Sorts {@link #byPeer} array by peer's ID. It implements the <i>InsertionSort</i>
+ * algorithm.
+ */
+ public void sortByPeer(){
+ int i;
+
+ for(int j=1; j<swarmSize; j++) // out is dividing line
+ {
+ Element key = new Element();
+ byPeer[j].copyTo(key) ; // remove marked item
+ i = j-1; // start shifts at out
+ while(i>=0 && (byPeer[i].ID > key.ID)) // until one is smaller,
+ {
+ byPeer[i].copyTo(byPeer[i+1]); // shift item right,
+ i--; // go left one position
+ }
+ key.copyTo(byPeer[i+1]); // insert marked item
+ }
+
+ }
+
+ /**
+ * Sorts the array {@link #byBandwidth} using <i>QuickSort</i> algorithm.
+ * <tt>null</tt> elements and seeders are moved to the end of the array.
+ */
+ public void sortByBandwidth() {
+ quicksort(0, swarmSize-1);
+ }
+
+ /**
+ * Used by {@link #sortByBandwidth()}. It's the implementation of the
+ * <i>QuickSort</i> algorithm.
+ * @param left the leftmost index of the array to sort.
+ * @param right the rightmost index of the array to sort.
+ */
+ private void quicksort(int left, int right) {
+ if (right <= left) return;
+ int i = partition(left, right);
+ quicksort(left, i-1);
+ quicksort(i+1, right);
+ }
+
+ /**
+ * Used by {@link #quicksort(int, int)}, partitions the subarray to sort returning
+ * the splitting point as stated by the <i>QuickSort</i> algorithm.
+ * @see "The <i>QuickSort</i> algorithm".
+ */
+ private int partition(int left, int right) {
+ int i = left - 1;
+ int j = right;
+ while (true) {
+ while (greater(byBandwidth[++i], byBandwidth[right])) // find item on left to swap
+ ; // a[right] acts as sentinel
+ while (greater(byBandwidth[right], byBandwidth[--j])) { // find item on right to swap
+ if (j == left) break; // don't go out-of-bounds
+ }
+ if (i >= j) break; // check if pointers cross
+ swap(i, j); // swap two elements into place
+ }
+ swap(i, right); // swap with partition element
+ return i;
+ }
+
+ /**
+ * Aswers to the question "is x > y?". Compares the {@link Element}s given as
+ * parameters. <tt>Element x</tt> is greater than <tt>y</tt> if isn't <tt>null</tt>
+ * and in the last 20 seconds the local node has downloaded ("uploaded" if the local node is a
+ * seeder) more blocks than from <tt>y</tt>.
+ * @param x the first <tt>Element</tt> to compare.
+ * @param y the second <tt>Element</tt> to compare
+ * @return <tt>true</tt> if x > y;<br/>
+ * <tt>false</tt> otherwise.
+ */
+ private boolean greater(Element x, Element y) {
+ /*
+ * Null elements and seeders are shifted at the end
+ * of the array
+ */
+ if (x==null) return false;
+ if (y==null) return true;
+ if (x.isSeeder) return false;
+ if (y.isSeeder) return true;
+
+ // if the local node is a leecher
+ if (peerStatus==0) {
+ if ((x.valueDOWN - x.head20) >
+ (y.valueDOWN -y.head20))
+ return true;
+ else return false;
+ }
+
+ // if peerStatus==1 (the local node is a seeder)
+ else {
+ if ((x.valueUP - x.head20) >
+ (y.valueUP -y.head20))
+ return true;
+ else return false;
+ }
+ }
+
+ /**
+ * Swaps {@link Element} <tt>i</tt> with <tt>j</tt> in the {@link #byBandwidth}.<br/>
+ * Used by {@link #partition(int, int)}
+ * @param i index of the first element to swap
+ * @param j index of the second element to swap
+ */
+ private void swap(int i, int j) {
+ Element swap = byBandwidth[i];
+ byBandwidth[i] = byBandwidth[j];
+ byBandwidth[j] = swap;
+ }
+
+ /** Searches the node with the given ID. It does a dychotomic
+ * search.
+ * @param ID ID of the node to search.
+ * @return the {@link Element} in {@link #byPeer} which represents the node with the
+ * given ID.
+ */
+ public Element search(long ID){
+ int low = 0;
+ int high = swarmSize-1;
+ int p = low+((high-low)/2); //Initial probe position
+ while ( low <= high) {
+ if ( byPeer[p] == null || byPeer[p].ID > ID)
+ high = p - 1;
+ else {
+ if( byPeer[p].ID < ID ) //Wasteful second comparison forced by syntax limitations.
+ low = p + 1;
+ else
+ return byPeer[p];
+ }
+ p = low+((high-low)/2); //Next probe position.
+ }
+ return null;
+ }
+}
+
+/**
+ * This class is used to store the main informations about a neighbors regarding
+ * the calculation of the Downloading/Uploading rates. Is the class of items in
+ * {@link example.bittorrent.BitTorrent#byPeer} and {@link example.bittorrent.BitTorrent#byBandwidth}.
+ */
+class Element{
+ /**
+ * ID of the represented node.
+ */
+ public long ID = Integer.MAX_VALUE;
+ /**
+ * Index position of the node in the {@link example.bittorrent.BitTorrent#cache} array.
+ */
+ public int peer = -1;
+ /**
+ * Number of blocks uploaded to anyone since the beginning.
+ */
+ public int valueUP = 0;
+ /**
+ * Number of blocks downloaded from anyone since the beginning.
+ */
+ public int valueDOWN = 0;
+ /**
+ * Value of either {@link #valueUP} or {@link #valueDOWN} (depending by
+ * {@link example.bittorrent.BitTorrent#peerStatus}) 20 seconds before.
+ */
+ public int head20 = 0;
+ /**
+ * Value of either {@link #valueUP} or {@link #valueDOWN} (depending by
+ * {@link example.bittorrent.BitTorrent#peerStatus}) 60 seconds before.
+ */
+ public int head60 = 0;
+ /**
+ * <tt>true</tt> if the node is a seeder, <tt>false</tt> otherwise.
+ */
+ public boolean isSeeder = false;
+ /**
+ * Makes a deep copy of the Element to <tt>destination</tt>
+ * @param destination Element instance where to make the copy
+ */
+ public void copyTo(Element destination){
+ destination.ID = this.ID;
+ destination.peer = this.peer;
+ destination.valueUP = this.valueUP;
+ destination.valueDOWN = this.valueDOWN;
+ destination.head20 = this.head20;
+ destination.head60 = this.head60;
+ }
+}
+
+/**
+ * This class stores information about the neighbors regarding their status. It is
+ * the type of the items in the {@link example.bittorrent.BitTorrent#cache}.
+ */
+class Neighbor{
+ /**
+ * Reference to the node in the {@link peersim.core.Network}.
+ */
+ public Node node = null;
+ /**
+ * -1 means not interested<br/>
+ * Other values means the last piece number for which the node is interested.
+ */
+ public int interested;
+ /**
+ * 0 means CHOKED<br/>
+ * 1 means UNCHOKED<br/>
+ * 2 means SNUBBED_BY. If this value is set and the node is to be unchocked,
+ * value 2 has the priority.
+ */
+ public int status;
+ /**
+ * Last time a message from the node represented has been received.
+ */
+ public long lastSeen = 0;
+ /**
+ * Last time a message to the node represented has been sent.
+ */
+ public long lastSent = 0;
+
+ /**
+ * Sets the last time the neighbor was seen.
+ */
+ public void isAlive(){
+ long now = CommonState.getTime();
+ this.lastSeen = now;
+ }
+
+ /*
+ * Sets the last time the local peer sent something to the neighbor.
+ */
+ public void justSent(){
+ long now = CommonState.getTime();
+ this.lastSent = now;
+ }
+
+}
+
+/**
+ * Class type of the queues's items in {@link example.bittorrent.BitTorrent#incomingPieces}
+ * and {@link example.bittorrent.BitTorrent#requestToServe}.
+ */
+class Queue{
+ int maxSize;
+ int head = 0;
+ int tail = 0;
+ int dim = 0;
+ Request queue[];
+
+ /**
+ * Public constructor. Creates a queue of size <tt>size</tt>.
+ */
+ public Queue(int size){
+ maxSize = size;
+ queue = new Request[size];
+ for(int i=0; i< size; i++)
+ queue[i]= new Request();
+ }
+
+ /**
+ * Enqueues the request of the block <tt>id</tt> and its <tt>sender</tt>
+ * @param id the id of the block in the request
+ * @param sender a reference to the sender of the request
+ * @return <tt>true</tt> if the request has been correctly added, <tt>false</tt>
+ * otherwise.
+ */
+ public boolean enqueue(int id, Node sender){
+ if(dim < maxSize){
+ queue[tail%maxSize].id = id;
+ queue[tail%maxSize].sender = sender;
+ tail++;
+ dim++;
+ return true;
+ }
+ else return false;
+ }
+
+ /**
+ * Returns the {@link Request} in the head of the queue.
+ * @return the element in the head.<br/>
+ * <tt>null</tt> if the queue is empty.
+ */
+ public Request dequeue(){
+ Request value;
+ if(dim > 0){
+ value = queue[head%maxSize];
+ head++;
+ dim--;
+ return value;
+ }
+ else return null; //empty queue
+ }
+
+ /**
+ * Returns the status of the queue.
+ * @return <tt>true</tt> if the queue is empty, <tt>false</tt>
+ * otherwise.
+ */
+ public boolean empty(){
+ return (dim == 0);
+ }
+
+ /**
+ * Returns <tt>true</tt> if block given as parameter is in.
+ * @param value the id of the block to search.
+ * @return <tt>true</tt> if the block <tt>value</tt> is in the queue, <tt>false</tt>
+ * otherwise.
+ */
+ public boolean contains(int value){
+ if(empty())
+ return false;
+ for(int i=head; i<head+dim; i++){
+ if(queue[i%maxSize].id == value)
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Removes a request from the queue.
+ * @param sender the sender of the request.
+ * @param value the id of the block requested.
+ * @return <tt>true</tt> if the request has been correctly removed, <tt>false</tt>
+ * otherwise.
+ */
+ public boolean remove(Node sender, int value){
+ if(empty())
+ return false;
+ for(int i=head; i<head+dim; i++){
+ if(queue[i%maxSize].id == value && queue[i%maxSize].sender == sender){
+ for(int j=i; j>head; j--){ // Shifts the elements for the removal
+ queue[j%maxSize]= queue[(j-1)%maxSize];
+ }
+ head++;
+ dim--;
+ return true;
+ }
+ }
+ return false;
+ }
+}
+
+/**
+ * This class represent an enqueued request of a block.
+ */
+class Request{
+ /**
+ * The id of the block.
+ */
+ public int id;
+ /**
+ * The sender of the request.
+ */
+ public Node sender;
+}
\ No newline at end of file