X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/ff6cb26262ba25fefdf1265628265a75d790ebd6..200986a368bbbbb5df459d43cbc7f5ef3d7678db:/contrib/psg/src/example/bittorrent/BitTorrent.java diff --git a/contrib/psg/src/example/bittorrent/BitTorrent.java b/contrib/psg/src/example/bittorrent/BitTorrent.java new file mode 100644 index 0000000000..1604a4d9f7 --- /dev/null +++ b/contrib/psg/src/example/bittorrent/BitTorrent.java @@ -0,0 +1,1989 @@ +/* + * Copyright (c) 2007-2008 Fabrizio Frioli, Michele Pedrolli + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * -- + * + * Please send your questions/suggestions to: + * {fabrizio.frioli, michele.pedrolli} at studenti dot unitn dot it + * + */ + +package example.bittorrent; + +import peersim.core.*; +import peersim.config.*; +import peersim.edsim.*; +import peersim.transport.*; + +/** + * This is the class that implements the BitTorrent module for Peersim + */ +public class BitTorrent implements EDProtocol { + /** + * The size in Megabytes of the file being shared. + * @config + */ + private static final String PAR_SIZE="file_size"; + /** + * The Transport used by the the protocol. + * @config + */ + private static final String PAR_TRANSPORT="transport"; + /** + * The maximum number of neighbor that a node can have. + * @config + */ + private static final String PAR_SWARM="max_swarm_size"; + /** + * The maximum number of peers returned by the tracker when a new + * set of peers is requested through a TRACKER message. + * @config + */ + private static final String PAR_PEERSET_SIZE="peerset_size"; + /** + * Defines how much the network can grow with respect to the network.size + * when {@link NetworkDynamics} is used. + * @config + */ + private static final String PAR_MAX_GROWTH="max_growth"; + /** + * Is the number of requests of the same block sent to different peers. + * @config + */ + private static final String PAR_DUP_REQ = "duplicated_requests"; + + /** + * KEEP_ALIVE message. + * @see SimpleEvent#type "Event types" + */ + private static final int KEEP_ALIVE = 1; + + /** + * CHOKE message. + * @see SimpleEvent#type "Event types" + */ + private static final int CHOKE = 2; + + /** + * UNCHOKE message. + * @see SimpleEvent#type "Event types" + */ + private static final int UNCHOKE = 3; + + /** + * INTERESTED message. + * @see SimpleEvent#type "Event types" + */ + private static final int INTERESTED = 4; + + /** + * NOT_INTERESTED message. + * @see SimpleEvent#type "Event types" + */ + private static final int NOT_INTERESTED = 5; + + /** + * HAVE message. + * @see SimpleEvent#type "Event types" + */ + private static final int HAVE = 6; + + /** + * BITFIELD message. + * @see SimpleEvent#type "Event types" + */ + private static final int BITFIELD = 7; + + /** + * REQUEST message. + * @see SimpleEvent#type "Event types" + */ + private static final int REQUEST = 8; + + /** + * PIECE message. + * @see SimpleEvent#type "Event types" + */ + private static final int PIECE = 9; + + /** + * CANCEL message. + * @see SimpleEvent#type "Event types" + */ + private static final int CANCEL = 10; + + /** + * TRACKER message. + * @see SimpleEvent#type "Event types" + */ + private static final int TRACKER = 11; + + /** + * PEERSET message. + * @see SimpleEvent#type "Event types" + */ + private static final int PEERSET = 12; + + /** + * CHOKE_TIME event. + * @see SimpleEvent#type "Event types" + */ + private static final int CHOKE_TIME = 13; + + /** + * OPTUNCHK_TIME event. + * @see SimpleEvent#type "Event types" + */ + private static final int OPTUNCHK_TIME = 14; + + /** + * ANTISNUB_TIME event. + * @see SimpleEvent#type "Event types" + */ + private static final int ANTISNUB_TIME = 15; + + /** + * CHECKALIVE_TIME event. + * @see SimpleEvent#type "Event types" + */ + private static final int CHECKALIVE_TIME = 16; + + /** + * TRACKERALIVE_TIME event. + * @see SimpleEvent#type "Event types" + */ + private static final int TRACKERALIVE_TIME = 17; + + /** + * DOWNLOAD_COMPLETED event. + * @see SimpleEvent#type "Event types" + */ + private static final int DOWNLOAD_COMPLETED = 18; + + /** + * The maxium connection speed of the local node. + */ + int maxBandwidth; + + /** + * Stores the neighbors ordered by ID. + * @see Element + */ + private example.bittorrent.Element byPeer[]; + + /** + * Contains the neighbors ordered by bandwidth as needed by the unchocking + * algorithm. + */ + private example.bittorrent.Element byBandwidth[]; + + /** + * The Neighbors list. + */ + private Neighbor cache[]; + + /** + * Reference to the neighbors that unchocked the local node. + */ + private boolean unchokedBy[]; + + /** + * Number of neighbors in the cache. When it decreases under 20, a new peerset + * is requested to the tracker. + */ + private int nNodes = 0; + + /** + * Maximum number of nodes in the network. + */ + private int nMaxNodes; + + /** + * The status of the local peer. 0 means that the current peer is a leecher, 1 a seeder. + */ + private int peerStatus; + + /** + * Defines how much the network can grow with respect to the network.size + * when {@link NetworkDynamics} is used. + */ + public int maxGrowth; + + /** + * File status of the local node. Contains the blocks owned by the local node. + */ + private int status[]; + + /** + * Current number of Bitfield request sent. It must be taken into account + * before sending another one. + */ + private int nBitfieldSent = 0; + + /** + * Current number of pieces in upload from the local peer. + */ + public int nPiecesUp = 0; + /** + * Current number of pieces in download to the local peer. + */ + public int nPiecesDown = 0; + + /** + * Current number of piece completed. + */ + private int nPieceCompleted = 0; + + /** + * Current downloading piece ID, the previous lastInterested piece. + */ + int currentPiece = -1; + + /** + * Used to compute the average download rates in choking algorithm. Stores the + * number of CHOKE events. + */ + int n_choke_time = 0; + + /** + * Used to send the TRACKER message when the local node has 20 neighbors + * for the first time. + */ + boolean lock = false; + + /** + * Number of peers interested to my pieces. + */ + int numInterestedPeers = 0; + + /** + * Last piece for which the local node sent an INTERESTED message. + */ + int lastInterested = -1; + + /** + * The status of the current piece in download. Length 16, every time the local node + * receives a PIECE message, it updates the corrisponding block's cell. The cell + * contains the ID for that block of that piece. If an already owned + * block is received this is discarded. + */ + private int pieceStatus[]; + + /** + * Length of the file. Stored as number of pieces (256KB each one). + */ + int nPieces; + + /** + * Contains the neighbors's status of the file. Every row represents a + * node and every a cell has value O if the neighbor doesn't + * have the piece, 1 otherwise. It has {@link #swarmSize} rows and {@link #nPieces} + * columns. + */ + int [][]swarm; + + /** + * The summation of the swarm's rows. Calculated every time a {@link #BITFIELD} message + * is received and updated every time HAVE message is received. + */ + int rarestPieceSet[]; + + /** + * The five pending block requests. + */ + int pendingRequest[]; + + /** + * The maximum swarm size (default is 80) + */ + int swarmSize; + + /** + * The size of the peerset. This is the number of "friends" nodes + * sent from the tracker to each new node (default: 50) + */ + int peersetSize; + + /** + * The ID of the current node + */ + private long thisNodeID; + + /** + * Number of duplicated requests as specified in the configuration file. + * @see BitTorrent#PAR_DUP_REQ + */ + private int numberOfDuplicatedRequests; + + /** + * The queue where the requests to serve are stored. + * The default dimension of the queue is 20. + */ + Queue requestToServe = null; + + /** + * The queue where the out of sequence incoming pieces are stored + * waiting for the right moment to be processed. + * The default dimension of the queue is 100. + */ + Queue incomingPieces = null; + + /** + * The Transport ID. + * @see BitTorrent#PAR_TRANSPORT + */ + int tid; + + /** + * The reference to the tracker node. If equals to null, the local + * node is the tracker. + */ + private Node tracker = null; + + /** + * The default constructor. Reads the configuration file and initializes the + * configuration parameters. + * @param prefix the component prefix declared in the configuration file + */ + public BitTorrent(String prefix){ // Used for the tracker's protocol + tid = Configuration.getPid(prefix+"."+PAR_TRANSPORT); + nPieces = (int)((Configuration.getInt(prefix+"."+PAR_SIZE))*1000000/256000); + swarmSize = (int)Configuration.getInt(prefix+"."+PAR_SWARM); + peersetSize = (int)Configuration.getInt(prefix+"."+PAR_PEERSET_SIZE); + numberOfDuplicatedRequests = (int)Configuration.getInt(prefix+"."+PAR_DUP_REQ); + maxGrowth = (int)Configuration.getInt(prefix+"."+PAR_MAX_GROWTH); + nMaxNodes = Network.getCapacity()-1; + } + + /** + * Gets the reference to the tracker node. + * @return the reference to the tracker + */ + public Node getTracker(){ + return tracker; + } + + /** + * Gets the number of neighbors currently stored in the cache of the local node. + * @return the number of neighbors in the cache + */ + public int getNNodes(){ + return this.nNodes; + } + + /** + * Sets the reference to the tracker node. + * @param t the tracker node + */ + public void setTracker(Node t){ + tracker = t; + } + + /** + * Sets the ID of the local node. + * @param id the ID of the node + */ + public void setThisNodeID(long id) { + this.thisNodeID = id; + } + + /** + * Gets the ID of the local node. + * @return the ID of the local node + */ + public long getThisNodeID(){ + return this.thisNodeID; + } + + /** + * Gets the file status of the local node. + * @return the file status of the local node + */ + public int[] getFileStatus(){ + return this.status; + } + + /** + * Initializes the tracker node. This method + * only performs the initialization of the tracker's cache. + */ + public void initializeTracker() { + cache = new Neighbor[nMaxNodes+maxGrowth]; + for(int i=0; iChecks the number of neighbors and if it is equal to 20 + * sends a TRACKER messages to the tracker, asking for a new + * peer set.

+ * + *

This method *must* be called after every call of {@link #removeNeighbor} + * in {@link #processEvent}. + *

+ */ + private void processNeighborListSize(Node node, int pid) { + if (nNodes==20) { + Object ev; + long latency; + ev = new SimpleMsg(TRACKER, node); + Node tracker = ((BitTorrent)node.getProtocol(pid)).tracker; + if(tracker != null){ +// latency = ((Transport)node.getProtocol(tid)).getLatency(node, tracker); +// EDSimulator.add(latency,ev,tracker,pid); + ((Transport) node.getProtocol(tid)).send(node, tracker, ev, pid); + } + } + } + + /** + * The standard method that processes incoming events. + * @param node reference to the local node for which the event is going to be processed + * @param pid BitTorrent's protocol id + * @param event the event to process + */ + public void processEvent(Node node, int pid, Object event){ + + Object ev; + long latency; + switch(((SimpleEvent)event).getType()){ + + case KEEP_ALIVE: // 1 + { + Node sender = ((IntMsg)event).getSender(); + int isResponse = ((IntMsg)event).getInt(); + //System.out.println("process, keep_alive: sender is "+sender.getID()+", local is "+node.getID()); + Element e = search(sender.getID()); + if(e!= null){ //if I know the sender + cache[e.peer].isAlive(); + if(isResponse==0 && alive(sender)){ + Object msg = new IntMsg(KEEP_ALIVE,node,1,0); +// latency = ((Transport)node.getProtocol(tid)).getLatency(node, sender); +// EDSimulator.add(latency,msg,sender,pid); + ((Transport) node.getProtocol(tid)).send(node, sender, msg, pid); + cache[e.peer].justSent(); + } + } + else{ + System.err.println("despite it should never happen, it happened"); + ev = new BitfieldMsg(BITFIELD, true, false, node, status, nPieces); +// latency = ((Transport)node.getProtocol(tid)).getLatency(node,sender); +// EDSimulator.add(latency,ev,sender,pid); + ((Transport) node.getProtocol(tid)).send(node, sender, ev, pid); + nBitfieldSent++; + } + + };break; + + case CHOKE: // 2, CHOKE message. + { + Node sender = ((SimpleMsg)event).getSender(); + //System.out.println("process, choke: sender is "+sender.getID()+", local is "+node.getID()); + Element e = search(sender.getID()); + if(e!= null){ //if I know the sender + cache[e.peer].isAlive(); + unchokedBy[e.peer]= false; // I'm choked by it + } + else{ + System.err.println("despite it should never happen, it happened"); + ev = new BitfieldMsg(BITFIELD, true, false, node, status, nPieces); +// latency = ((Transport)node.getProtocol(tid)).getLatency(node,sender); +// EDSimulator.add(latency,ev,sender,pid); + ((Transport) node.getProtocol(tid)).send(node, sender, ev, pid); + nBitfieldSent++; + } + };break; + + case UNCHOKE: // 3, UNCHOKE message. + { + Node sender = ((SimpleMsg)event).getSender(); + //System.out.println("process, unchoke: sender is "+sender.getID()+", local is "+node.getID()); + Element e = search(sender.getID()); + if(e != null){ // If I know the sender + int senderIndex = e.peer; + cache[senderIndex].isAlive(); + /* I send to it some of the pending requests not yet satisfied. */ + int t = numberOfDuplicatedRequests; + for(int i=4;i>=0 && t>0;i--){ + if(pendingRequest[i]==-1) + break; + if(alive(cache[senderIndex].node) && swarm[senderIndex][decode(pendingRequest[i],0)]==1){ //If the sender has that piece + ev = new IntMsg(REQUEST, node,pendingRequest[i],0); +// latency = ((Transport)node.getProtocol(tid)).getLatency(node,sender); +// EDSimulator.add(latency,ev, sender,pid); + ((Transport) node.getProtocol(tid)).send(node, sender, ev, pid); + cache[senderIndex].justSent(); + } + if(!alive(cache[senderIndex].node)){ + System.out.println("unchoke1 rm neigh "+ cache[i].node.getID() ); + removeNeighbor(cache[senderIndex].node); + processNeighborListSize(node,pid); + return; + } + t--; + } + // I request missing blocks to fill the queue + int block = getBlock(); + int piece; + while(block != -2){ //while still available request to send + if(block < 0){ // No more block to request for the current piece + piece = getPiece(); + if(piece == -1){ // no more piece to request + break; + } + for(int j=0; j swarmSize) + ev = new BitfieldMsg(BITFIELD, false, false, node, status, nPieces); //response with nack +// latency = ((Transport)node.getProtocol(tid)).getLatency(node,sender); +// EDSimulator.add(latency,ev,sender,pid); + ((Transport) node.getProtocol(tid)).send(node, sender, ev, pid); + } + } + + } + else + System.out.println("Sender "+sender.getID()+" not alive"); + } + };break; + + case REQUEST: // 8, REQUEST message. + { + Object evnt; + Node sender = ((IntMsg)event).getSender(); + int value = ((IntMsg)event).getInt(); + Element e; + BitTorrent senderP; + int remoteRate; + int localRate; + int bandwidth; + int downloadTime; + + e = search(sender.getID()); + if (e==null) + return; + cache[e.peer].isAlive(); + + requestToServe.enqueue(value, sender); + + /*I serve the enqueued requests until 10 uploding pieces or an empty queue*/ + while(!requestToServe.empty() && nPiecesUp <10){ + Request req = requestToServe.dequeue(); + e = search(req.sender.getID()); + if(e!=null && alive(req.sender)){ +// ev = new IntMsg(PIECE, node, req.id); + nPiecesUp++; + e.valueUP++; + senderP = ((BitTorrent)req.sender.getProtocol(pid)); + senderP.nPiecesDown++; + remoteRate = senderP.maxBandwidth/(senderP.nPiecesUp + senderP.nPiecesDown); + localRate = maxBandwidth/(nPiecesUp + nPiecesDown); + bandwidth = Math.min(remoteRate, localRate); + downloadTime = ((16*8)/(bandwidth))*1000; // in milliseconds + + ev = new IntMsg(PIECE, node, req.id, 16*8 * 1024); + ((Transport) node.getProtocol(tid)).send(node, req.sender, ev, pid); + +// latency = ((Transport)node.getProtocol(tid)).getLatency(node,req.sender); +// EDSimulator.add(latency+downloadTime,ev,req.sender,pid); + cache[e.peer].justSent(); + + /*I send to me an event to indicate that the download is completed. + This prevent that, when the receiver death occurres, my value nPiecesUp + doesn't decrease.*/ + evnt = new SimpleMsg(DOWNLOAD_COMPLETED, req.sender); +// EDSimulator.add(latency+downloadTime,evnt,node,pid); + ((Transport) node.getProtocol(tid)).send(node, node, evnt, pid); + } + } + }; break; + + case PIECE: // 9, PIECE message. + { + Node sender = ((IntMsg)event).getSender(); + /* Set the correct value for the local uploading and remote + downloading number of pieces */ + nPiecesDown--; + + if(peerStatus == 1)// To save CPU cycles + return; + //System.out.println("process, piece: sender is "+sender.getID()+", local is "+node.getID()); + Element e = search(sender.getID()); + + if(e==null){ //I can't accept a piece not wait + return; + } + e.valueDOWN++; + + cache[e.peer].isAlive(); + + int value = ((IntMsg)event).getInt(); + int piece = decode(value,0); + int block = decode(value,1); + /* If the block has not been already downloaded and it belongs to + the current downloading piece.*/ + if(piece == currentPiece && decode(pieceStatus[block],0)!= piece){ + pieceStatus[block] = value; + status[piece]++; + removeRequest(value); + requestNextBlocks(node, pid, e.peer); + + }else{ // Either a future piece or an owned piece + if(piece!=currentPiece && status[piece]!=16){ // Piece not owned, will be considered later + incomingPieces.enqueue(value, sender); + } + + } + ev = new IntMsg(CANCEL, node, value,0); + /* I send a CANCEL to all nodes to which I previously requested the block*/ + for(int i=0; i 0){ // I process the queue + m--; + Request temp = incomingPieces.dequeue(); + int p = decode(temp.id,0); // piece id + int b = decode(temp.id,1); // block id + Element s = search(temp.sender.getID()); + if(s==null) // if the node that sent the block in the queue is dead + continue; + if(p==currentPiece && decode(pieceStatus[b],0)!= p){ + pieceStatus[b] = temp.id; + status[p]++; + removeRequest(temp.id); + requestNextBlocks(node, pid, s.peer); + } + else{ // The piece not currently desired will be moved to the tail + if(p!= currentPiece) // If not a duplicate block but belongs to another piece + incomingPieces.enqueue(temp.id,temp.sender); + else // duplicate block + requestNextBlocks(node, pid, s.peer); + } + } + } + }; break; + + case CANCEL: + { + Node sender = ((IntMsg)event).getSender(); + int value = ((IntMsg)event).getInt(); + requestToServe.remove(sender, value); + };break; + + case PEERSET: // PEERSET message + { + Node sender = ((PeerSetMsg)event).getSender(); + //System.out.println("process, peerset: sender is "+sender.getID()+", local is "+node.getID()); + Neighbor n[] = ((PeerSetMsg)event).getPeerSet(); + + for(int i=0; i 0){ + byBandwidth[j]=byPeer[i]; //shallow copy + j++; + } + } + + /*It ensures that in the next 20sec, if there are less nodes interested + than now, those in surplus will not be ordered. */ + for(;j0;z--){ + int lucky = CommonState.r.nextInt(nNodes); + while(cache[byPeer[lucky].peer].status ==1 && alive(cache[byPeer[lucky].peer].node) && + cache[byPeer[lucky].peer].interested == 0)// until the lucky peer is already unchoked or not interested + lucky = CommonState.r.nextInt(nNodes); + luckies[3-z]= byPeer[lucky].peer; + } + } + for(int i=0; i0 && (byPeer[i].valueDOWN - byPeer[i].head60)==0){// No blocks downloaded in 1 min + cache[byPeer[i].peer].status = 2; // I'm snubbed by it + } + byPeer[i].head60 = byPeer[i].valueDOWN; + } + ev = new SimpleEvent(ANTISNUB_TIME); + EDSimulator.add(60000,ev,node,pid); + long time = CommonState.getTime(); + }; break; + + case CHECKALIVE_TIME: + { + + //System.out.println("process, checkalive_time"); + + long now = CommonState.getTime(); + for(int i=0; ivalue depending + * on part: 0 means the piece value, 1 the block value. + * @param value the ID of the block to decode. + * @param part the information to extract from value. 0 means the piece index, 1 the block index. + * @return the piece or the block index depending about the value of part + */ + private int decode(int value, int part){ + if (value==-1) // Not a true value to decode + return -1; + if(part == 0) // I'm interested to the piece + return value/100; + else // I'm interested to the block + return value%100; + } + + /** + * Used by {@link NodeInitializer#choosePieces(int, BitTorrent) NodeInitializer} to set + * the number of piece completed from the beginning in according with + * the distribution in the configuration file. + * @param number the number of piece completed + */ + public void setCompleted(int number){ + this.nPieceCompleted = number; + } + + /** + * Sets the status (the set of blocks) of the file for the current node. + * Note that a piece is considered completed if the number + * of downloaded blocks is 16. + * @param index The index of the piece + * @param value Number of blocks downloaded for the piece index. + */ + public void setStatus(int index, int value){ + status[index]=value; + } + + /** + * Sets the status of the local node. + * @param status The status of the node: 1 means seeder, 0 leecher + */ + public void setPeerStatus(int status){ + this.peerStatus = status; + } + + /** + * Gets the status of the local node. + * @return The status of the local node: 1 means seeder, 0 leecher + */ + public int getPeerStatus(){ + return peerStatus; + } + + /** + * Gets the number of blocks for a given piece owned by the local node. + * @param index The index of the piece + * @return Number of blocks downloaded for the piece index + */ + public int getStatus(int index){ + return status[index]; + } + + /** + * Sets the maximum bandwdith for the local node. + * @param value The value of bandwidth in Kbps + */ + public void setBandwidth(int value){ + maxBandwidth = value; + } + + /** + * Checks if a node is still alive in the simulated network. + * @param node The node to check + * @return true if the node node is up, false otherwise + * @see peersim.core.GeneralNode#isUp + */ + public boolean alive(Node node){ + if(node == null) + return false; + else + return node.isUp(); + } + + /** + * Adds a neighbor to the cache of the local node. + * The new neighbor is put in the first null position. + * @param neighbor The neighbor node to add + * @return false if the neighbor is already present in the cache (this can happen when the peer requests a + * new peer set to the tracker an there is still this neighbor within) or no place is available. + * Otherwise, returns true if the node is correctly added to the cache. + */ + public boolean addNeighbor(Node neighbor){ + if(search(neighbor.getID()) !=null){// if already exists + // System.err.println("Node "+neighbor.getID() + " not added, already exist."); + return false; + } + if(this.tracker == null){ // I'm in the tracker's BitTorrent protocol + for(int i=0; i< nMaxNodes+maxGrowth; i++){ + if(cache[i].node == null){ + cache[i].node = neighbor; + cache[i].status = 0; //chocked + cache[i].interested = -1; //not interested + this.nNodes++; + + //System.err.println("i: " + i +" nMaxNodes: " + nMaxNodes); + return true; + } + } + } + else{ + if((nNodes+nBitfieldSent) < swarmSize){ + //System.out.println("I'm the node " + this.thisNodeID + ", trying to add node "+neighbor.getID()); + for(int i=0; i=0 && pendingRequest[i]!=-1){ + i--; + } + if(i>=0){ + pendingRequest[i] = block; + return true; + } + else { // It should never happen + //System.err.println("pendingRequest queue full"); + return false; + } + } + + /** + * Removes the block with the given id from the {@link #pendingRequest} queue + * and sorts the queue leaving the empty cell at the left. + * @param id the id of the requested block + */ + private void removeRequest(int id){ + int i = 4; + for(; i>=0; i--){ + if(pendingRequest[i]==id) + break; + } + for(; i>=0; i--){ + if(i==0) + pendingRequest[i] = -1; + else + pendingRequest[i] = pendingRequest[i-1]; + } + } + + /** + * Requests new block until the {@link #pendingRequest} is full to the sender of the just received piece. + * It calls {@link #getNewBlock(Node, int)} to implement the strict priority strategy. + * @param node the local node + * @param pid the BitTorrent protocol id + * @param sender the sender of the just received received piece. + */ + private void requestNextBlocks(Node node, int pid, int sender){ + int block = getNewBlock(node, pid); + while(block != -2){ + if(unchokedBy[sender]==true && alive(cache[sender].node) && addRequest(block)){ + Object ev = new IntMsg(REQUEST, node, block,0); + +// long latency = ((Transport)node.getProtocol(tid)).getLatency(node,cache[sender].node); +// EDSimulator.add(latency,ev,cache[sender].node,pid); + + ((Transport) node.getProtocol(tid)).send(node,cache[sender].node, ev, pid); + cache[sender].justSent(); + } + else{ // I cannot send request + if(!alive(cache[sender].node) && cache[sender].node!=null){ + System.out.println("piece2 rm neigh "+ cache[sender].node.getID() ); + removeNeighbor(cache[sender].node); + processNeighborListSize(node,pid); + } + return; + } + block = getNewBlock(node, pid); + } + } + + /** + * It returns the id of the next block to request. Sends INTERESTED if the new + * block belongs to a new piece. + * It uses {@link #getBlock()} to get the next block of a piece and calls {@link #getPiece()} + * when all the blocks for the {@link #currentPiece} have been requested. + * @param node the local node + * @param pid the BitTorrent protocol id + * @return -2 if no more places available in the pendingRequest queue;
+ * the value of the next block to request otherwise

+ */ + private int getNewBlock(Node node, int pid){ + int block = getBlock(); + if(block < 0){ // No more block to request for the current piece + + if(block ==-2) // Pending request queue full + return -2; + + int newPiece = getPiece(); + if(newPiece == -1){ // no more piece to request + return -2; + } + + lastInterested = newPiece; + Object ev = new IntMsg(INTERESTED, node, lastInterested,0); + + for(int j=0; jcurrentPiece if there are still + * available places in the {@link #pendingRequest} queue;
+ * -2 if the pendingRequest queue is full;
+ * -1 if no more blocks to request for the current piece. + */ + private int getBlock(){ + int i=4; + while(i>=0 && pendingRequest[i]!=-1){ // i is the first empty position from the head + i--; + } + if(i==-1){// No places in the pendingRequest available + //System.out.println("Pendig request queue full!"); + return -2; + } + int j; + //The queue is not empty & last requested block belongs to lastInterested piece + if(i!=4 && decode(pendingRequest[i+1],0)==lastInterested) + j=decode(pendingRequest[i+1],1)+1; // the block following the last requested + else // I don't know which is the next block, so I search it. + j=0; + /* I search another block until the current has been already received. + * If in pieceStatus at position j there is a block that belongs to + * lastInterested piece, means that the block j has been already + * received, otherwise I can request it. + */ + while(j<16 && decode(pieceStatus[j],0)==lastInterested){ + j++; + } + if(j==16) // No more block to request for lastInterested piece + return -1; + return encode(lastInterested,j); + } + + /** + * Returns the next correct piece to download. It choose the piece by using the + * random first and rarest first policy. For the beginning 4 pieces + * of a file the first one is used then the pieces are chosen using rarest first. + * @see "Documentation about the BitTorrent module" + * @return the next piece to download. If the whole file has been requested + * -1 is returned. + */ + private int getPiece(){ + int piece = -1; + if(nPieceCompleted < 4){ //Uses random first piece + piece = CommonState.r.nextInt(nPieces); + while(status[piece]==16 || piece == currentPiece) // until the piece is owned + piece = CommonState.r.nextInt(nPieces); + return piece; + } + else{ //Uses rarest piece first + int j=0; + for(; jInsertionSort + * algorithm. + */ + public void sortByPeer(){ + int i; + + for(int j=1; j=0 && (byPeer[i].ID > key.ID)) // until one is smaller, + { + byPeer[i].copyTo(byPeer[i+1]); // shift item right, + i--; // go left one position + } + key.copyTo(byPeer[i+1]); // insert marked item + } + + } + + /** + * Sorts the array {@link #byBandwidth} using QuickSort algorithm. + * null elements and seeders are moved to the end of the array. + */ + public void sortByBandwidth() { + quicksort(0, swarmSize-1); + } + + /** + * Used by {@link #sortByBandwidth()}. It's the implementation of the + * QuickSort algorithm. + * @param left the leftmost index of the array to sort. + * @param right the rightmost index of the array to sort. + */ + private void quicksort(int left, int right) { + if (right <= left) return; + int i = partition(left, right); + quicksort(left, i-1); + quicksort(i+1, right); + } + + /** + * Used by {@link #quicksort(int, int)}, partitions the subarray to sort returning + * the splitting point as stated by the QuickSort algorithm. + * @see "The QuickSort algorithm". + */ + private int partition(int left, int right) { + int i = left - 1; + int j = right; + while (true) { + while (greater(byBandwidth[++i], byBandwidth[right])) // find item on left to swap + ; // a[right] acts as sentinel + while (greater(byBandwidth[right], byBandwidth[--j])) { // find item on right to swap + if (j == left) break; // don't go out-of-bounds + } + if (i >= j) break; // check if pointers cross + swap(i, j); // swap two elements into place + } + swap(i, right); // swap with partition element + return i; + } + + /** + * Aswers to the question "is x > y?". Compares the {@link Element}s given as + * parameters. Element x is greater than y if isn't null + * and in the last 20 seconds the local node has downloaded ("uploaded" if the local node is a + * seeder) more blocks than from y. + * @param x the first Element to compare. + * @param y the second Element to compare + * @return true if x > y;
+ * false otherwise. + */ + private boolean greater(Element x, Element y) { + /* + * Null elements and seeders are shifted at the end + * of the array + */ + if (x==null) return false; + if (y==null) return true; + if (x.isSeeder) return false; + if (y.isSeeder) return true; + + // if the local node is a leecher + if (peerStatus==0) { + if ((x.valueDOWN - x.head20) > + (y.valueDOWN -y.head20)) + return true; + else return false; + } + + // if peerStatus==1 (the local node is a seeder) + else { + if ((x.valueUP - x.head20) > + (y.valueUP -y.head20)) + return true; + else return false; + } + } + + /** + * Swaps {@link Element} i with j in the {@link #byBandwidth}.
+ * Used by {@link #partition(int, int)} + * @param i index of the first element to swap + * @param j index of the second element to swap + */ + private void swap(int i, int j) { + Element swap = byBandwidth[i]; + byBandwidth[i] = byBandwidth[j]; + byBandwidth[j] = swap; + } + + /** Searches the node with the given ID. It does a dychotomic + * search. + * @param ID ID of the node to search. + * @return the {@link Element} in {@link #byPeer} which represents the node with the + * given ID. + */ + public Element search(long ID){ + int low = 0; + int high = swarmSize-1; + int p = low+((high-low)/2); //Initial probe position + while ( low <= high) { + if ( byPeer[p] == null || byPeer[p].ID > ID) + high = p - 1; + else { + if( byPeer[p].ID < ID ) //Wasteful second comparison forced by syntax limitations. + low = p + 1; + else + return byPeer[p]; + } + p = low+((high-low)/2); //Next probe position. + } + return null; + } +} + +/** + * This class is used to store the main informations about a neighbors regarding + * the calculation of the Downloading/Uploading rates. Is the class of items in + * {@link example.bittorrent.BitTorrent#byPeer} and {@link example.bittorrent.BitTorrent#byBandwidth}. + */ +class Element{ + /** + * ID of the represented node. + */ + public long ID = Integer.MAX_VALUE; + /** + * Index position of the node in the {@link example.bittorrent.BitTorrent#cache} array. + */ + public int peer = -1; + /** + * Number of blocks uploaded to anyone since the beginning. + */ + public int valueUP = 0; + /** + * Number of blocks downloaded from anyone since the beginning. + */ + public int valueDOWN = 0; + /** + * Value of either {@link #valueUP} or {@link #valueDOWN} (depending by + * {@link example.bittorrent.BitTorrent#peerStatus}) 20 seconds before. + */ + public int head20 = 0; + /** + * Value of either {@link #valueUP} or {@link #valueDOWN} (depending by + * {@link example.bittorrent.BitTorrent#peerStatus}) 60 seconds before. + */ + public int head60 = 0; + /** + * true if the node is a seeder, false otherwise. + */ + public boolean isSeeder = false; + /** + * Makes a deep copy of the Element to destination + * @param destination Element instance where to make the copy + */ + public void copyTo(Element destination){ + destination.ID = this.ID; + destination.peer = this.peer; + destination.valueUP = this.valueUP; + destination.valueDOWN = this.valueDOWN; + destination.head20 = this.head20; + destination.head60 = this.head60; + } +} + +/** + * This class stores information about the neighbors regarding their status. It is + * the type of the items in the {@link example.bittorrent.BitTorrent#cache}. + */ +class Neighbor{ + /** + * Reference to the node in the {@link peersim.core.Network}. + */ + public Node node = null; + /** + * -1 means not interested
+ * Other values means the last piece number for which the node is interested. + */ + public int interested; + /** + * 0 means CHOKED
+ * 1 means UNCHOKED
+ * 2 means SNUBBED_BY. If this value is set and the node is to be unchocked, + * value 2 has the priority. + */ + public int status; + /** + * Last time a message from the node represented has been received. + */ + public long lastSeen = 0; + /** + * Last time a message to the node represented has been sent. + */ + public long lastSent = 0; + + /** + * Sets the last time the neighbor was seen. + */ + public void isAlive(){ + long now = CommonState.getTime(); + this.lastSeen = now; + } + + /* + * Sets the last time the local peer sent something to the neighbor. + */ + public void justSent(){ + long now = CommonState.getTime(); + this.lastSent = now; + } + +} + +/** + * Class type of the queues's items in {@link example.bittorrent.BitTorrent#incomingPieces} + * and {@link example.bittorrent.BitTorrent#requestToServe}. + */ +class Queue{ + int maxSize; + int head = 0; + int tail = 0; + int dim = 0; + Request queue[]; + + /** + * Public constructor. Creates a queue of size size. + */ + public Queue(int size){ + maxSize = size; + queue = new Request[size]; + for(int i=0; i< size; i++) + queue[i]= new Request(); + } + + /** + * Enqueues the request of the block id and its sender + * @param id the id of the block in the request + * @param sender a reference to the sender of the request + * @return true if the request has been correctly added, false + * otherwise. + */ + public boolean enqueue(int id, Node sender){ + if(dim < maxSize){ + queue[tail%maxSize].id = id; + queue[tail%maxSize].sender = sender; + tail++; + dim++; + return true; + } + else return false; + } + + /** + * Returns the {@link Request} in the head of the queue. + * @return the element in the head.
+ * null if the queue is empty. + */ + public Request dequeue(){ + Request value; + if(dim > 0){ + value = queue[head%maxSize]; + head++; + dim--; + return value; + } + else return null; //empty queue + } + + /** + * Returns the status of the queue. + * @return true if the queue is empty, false + * otherwise. + */ + public boolean empty(){ + return (dim == 0); + } + + /** + * Returns true if block given as parameter is in. + * @param value the id of the block to search. + * @return true if the block value is in the queue, false + * otherwise. + */ + public boolean contains(int value){ + if(empty()) + return false; + for(int i=head; itrue if the request has been correctly removed, false + * otherwise. + */ + public boolean remove(Node sender, int value){ + if(empty()) + return false; + for(int i=head; ihead; j--){ // Shifts the elements for the removal + queue[j%maxSize]= queue[(j-1)%maxSize]; + } + head++; + dim--; + return true; + } + } + return false; + } +} + +/** + * This class represent an enqueued request of a block. + */ +class Request{ + /** + * The id of the block. + */ + public int id; + /** + * The sender of the request. + */ + public Node sender; +} \ No newline at end of file