3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.HashSet;
6 import java.util.Iterator;
7 import java.util.Map.Entry;
9 import org.simgrid.msg.Comm;
10 import org.simgrid.msg.Host;
11 import org.simgrid.msg.Msg;
12 import org.simgrid.msg.MsgException;
13 import org.simgrid.msg.RngStream;
14 import org.simgrid.msg.Process;
15 import org.simgrid.msg.Task;
17 import bittorrent.Connection;
20 * Main class for peers execution
22 public class Peer extends Process {
23 protected int round = 0;
25 protected double beginReceiveTime;
26 protected double deadline;
28 protected static RngStream stream = new RngStream();
31 protected String mailbox;
32 protected String mailboxTracker;
33 protected String hostname;
34 protected int pieces = 0;
35 protected char[] bitfield = new char[Common.FILE_PIECES];
36 protected char[][] bitfieldBlocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
38 protected short[] piecesCount = new short[Common.FILE_PIECES];
40 protected int piecesRequested = 0;
42 protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
43 protected int currentPiece = -1;
45 protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();
46 protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
48 protected Comm commReceived = null;
50 public Peer(Host host, String name, String[]args) {
51 super(host,name,args);
55 public void main(String[] args) throws MsgException {
57 if (args.length != 3 && args.length != 2) {
58 Msg.info("Wrong number of arguments");
60 if (args.length == 3) {
61 init(Integer.valueOf(args[0]),true);
64 init(Integer.valueOf(args[0]),false);
66 //Retrieve the deadline
67 deadline = Double.valueOf(args[1]);
69 Msg.info("Wrong deadline supplied");
72 Msg.info("Hi, I'm joining the network with id " + id);
73 //Getting peer data from the tracker
75 Msg.debug("Got " + peers.size() + " peers from the tracker");
76 Msg.debug("Here is my current status: " + getStatus());
77 beginReceiveTime = Msg.getClock();
79 pieces = Common.FILE_PIECES;
89 Msg.info("Couldn't contact the tracker.");
91 Msg.info("Here is my current status: " + getStatus());
94 * Peer main loop when it is leeching.
96 private void leechLoop() {
97 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
98 Msg.debug("Start downloading.");
100 * Send a "handshake" message to all the peers it got
101 * (it couldn't have gotten more than 50 peers anyway)
104 //Wait for at least one "bitfield" message.
106 Msg.debug("Starting main leech loop");
107 while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
108 if (commReceived == null) {
109 commReceived = Task.irecv(mailbox);
112 if (commReceived.test()) {
113 handleMessage(commReceived.getTask());
117 //If the user has a pending interesting
118 if (currentPiece != -1) {
119 sendInterestedToPeers();
122 if (currentPieces.size() < Common.MAX_PIECES) {
123 updateCurrentPiece();
126 //We don't execute the choke algorithm if we don't already have a piece
127 if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
129 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
136 catch (MsgException e) {
143 * Peer main loop when it is seeding
145 private void seedLoop() {
146 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
147 Msg.debug("Start seeding.");
148 //start the main seed loop
149 while (Msg.getClock() < deadline) {
150 if (commReceived == null) {
151 commReceived = Task.irecv(mailbox);
154 if (commReceived.test()) {
155 handleMessage(commReceived.getTask());
159 if (Msg.getClock() >= nextChokedUpdate) {
161 //TODO: Change the choked peer algorithm when seeding
162 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
169 catch (MsgException e) {
177 * Initialize the various peer data
178 * @param id id of the peer to take in the network
179 * @param seed indicates if the peer is a seed
181 private void init(int id, boolean seed) {
183 this.mailbox = Integer.toString(id);
184 this.mailboxTracker = "tracker_" + Integer.toString(id);
186 for (int i = 0; i < bitfield.length; i++) {
188 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
189 bitfieldBlocks[i][j] = '1';
194 for (int i = 0; i < bitfield.length; i++) {
196 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
197 bitfieldBlocks[i][j] = '0' ;
201 this.hostname = host.getName();
204 * Retrieves the peer list from the tracker
206 private boolean getPeersData() {
208 boolean success = false, sendSuccess = false;
209 double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
210 //Build the task to send to the tracker
211 TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
213 while (!sendSuccess && Msg.getClock() < timeout) {
215 Msg.debug("Sending a peer request to the tracker.");
216 taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
219 catch (MsgException e) {
223 while (!success && Msg.getClock() < timeout) {
224 commReceived = Task.irecv(this.mailboxTracker);
226 commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
227 if (commReceived.getTask() instanceof TrackerTask) {
228 TrackerTask task = (TrackerTask)commReceived.getTask();
229 for (Integer peerId: task.peers) {
230 if (peerId != this.id) {
231 peers.put(peerId, new Connection(peerId));
237 catch (MsgException e) {
246 * Handle a received message sent by another peer
247 * @param task task received.
249 void handleMessage(Task task) {
250 MessageTask message = (MessageTask)task;
251 Connection remotePeer = peers.get(message.peerId);
252 switch (message.type) {
254 Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
255 //Check if the peer is in our connection list
256 if (remotePeer == null) {
257 peers.put(message.peerId, new Connection(message.peerId));
258 sendHandshake(message.mailbox);
260 //Send our bitfield to the pair
261 sendBitfield(message.mailbox);
264 Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
265 //update the pieces list
266 updatePiecesCountFromBitfield(message.bitfield);
267 //Update the current piece
268 if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
269 updateCurrentPiece();
271 remotePeer.bitfield = message.bitfield.clone();
274 Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
275 assert remotePeer != null;
276 remotePeer.interested = true;
279 Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
280 assert remotePeer != null;
281 remotePeer.interested = false;
284 Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
285 assert remotePeer != null;
286 remotePeer.chokedDownload = false;
287 activePeers.put(remotePeer.id,remotePeer);
288 sendRequestsToPeer(remotePeer);
291 Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
292 assert remotePeer != null;
293 remotePeer.chokedDownload = true;
294 activePeers.remove(remotePeer.id);
297 if (remotePeer.bitfield == null) {
300 Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
301 assert message.index >= 0 && message.index < Common.FILE_PIECES;
302 assert remotePeer.bitfield != null;
303 remotePeer.bitfield[message.index] = '1';
304 piecesCount[message.index]++;
305 //Send interested message to the peer if he has what we want
306 if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
307 remotePeer.amInterested = true;
308 sendInterested(remotePeer.mailbox);
311 if (currentPieces.contains(message.index)) {
312 int blockIndex = getFirstBlock(message.index);
313 int blockLength = Common.PIECES_BLOCKS - blockIndex ;
314 blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
315 sendRequest(message.mailbox,message.index,blockIndex,blockLength);
319 assert message.index >= 0 && message.index < Common.FILE_PIECES;
320 if (!remotePeer.chokedUpload) {
321 Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
322 if (bitfield[message.index] == '1') {
323 sendPiece(message.mailbox,message.index,false,message.blockIndex,message.blockLength);
326 Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
331 if (message.stalled) {
332 Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ") is stalled");
335 Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
336 if (bitfield[message.index] == '0') {
337 updateBitfieldBlocks(message.index,message.blockIndex,message.blockLength);
338 if (pieceComplete(message.index)) {
340 //Removing the piece from our piece list.
341 if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
343 //Setting the fact that we have the piece
344 bitfield[message.index] = '1';
346 Msg.debug("My status is now " + getStatus());
347 //Sending the information to all the peers we are connected to
348 sendHave(message.index);
349 //sending UNINTERESTED to peers that doesn't have what we want.
350 updateInterestedAfterReceive();
354 Msg.debug("However, we already have it.");
359 if (remotePeer != null) {
360 remotePeer.addSpeedValue(1 / (Msg.getClock() - beginReceiveTime));
362 beginReceiveTime = Msg.getClock();
365 * Wait for the node to receive interesting bitfield messages (ie: non empty)
368 void waitForPieces() {
369 boolean finished = false;
370 while (Msg.getClock() < deadline && !finished) {
371 if (commReceived == null) {
372 commReceived = Task.irecv(mailbox);
375 commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
376 handleMessage(commReceived.getTask());
377 if (currentPiece != -1) {
382 catch (MsgException e) {
388 private boolean hasFinished() {
389 for (int i = 0; i < bitfield.length; i++) {
390 if (bitfield[i] == '1') {
397 * Updates the list of who has a piece from a bitfield
398 * @param bitfield bitfield
400 private void updatePiecesCountFromBitfield(char bitfield[]) {
401 for (int i = 0; i < Common.FILE_PIECES; i++) {
402 if (bitfield[i] == '1') {
408 * Update the piece the peer is currently interested in.
409 * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
410 * If the peer has less than 3 pieces, he chooses a piece at random.
411 * If the peer has more than pieces, he downloads the pieces that are the less
414 void updateCurrentPiece() {
415 if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
418 if (true || pieces < 3) {
419 int i = 0, peerPiece;
421 currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
423 } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
426 //trivial min algorithm.
429 currentPieces.add(currentPiece);
430 Msg.debug("New interested piece: " + currentPiece);
431 assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
434 * Update the list of current choked and unchoked peers, using the
437 private void updateChokedPeers() {
438 round = (round + 1) % 3;
439 if (peers.size() == 0) {
442 //remove a peer from the list
443 Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
445 Entry<Integer,Connection> e = it.next();
446 Connection peerChoked = e.getValue();
447 sendChoked(peerChoked.mailbox);
448 peerChoked.chokedUpload = true;
449 activePeers.remove(e.getKey());
451 Connection peerChoosed = null;
452 //Separate the case from when the peer is seeding.
453 if (pieces == Common.FILE_PIECES) {
454 //Find the last unchoked peer.
455 double unchokeTime = deadline + 1;
456 for (Connection connection : peers.values()) {
457 if (connection.lastUnchoke < unchokeTime && connection.chokedUpload && connection.interested) {
458 peerChoosed = connection;
459 unchokeTime = connection.lastUnchoke;
464 //Random optimistic unchoking
469 int idChosen = stream.randInt(0,peers.size() - 1);
470 for (Connection connection : peers.values()) {
472 peerChoosed = connection;
476 } //TODO: Not really the best way ever
477 if (!peerChoosed.interested) {
481 } while (peerChoosed == null && j <
482 Common.MAXIMUM_PEERS);
485 Connection fastest = null;
486 double fastestSpeed = 0;
487 for (Connection c : peers.values()) {
488 if (c.peerSpeed > fastestSpeed && c.chokedUpload && c.interested) {
490 fastestSpeed = c.peerSpeed;
493 peerChoosed = fastest;
496 if (peerChoosed != null) {
497 activePeers.put(peerChoosed.id,peerChoosed);
498 peerChoosed.chokedUpload = false;
499 peerChoosed.lastUnchoke = Msg.getClock();
500 sendUnchoked(peerChoosed.mailbox);
504 * Updates our "interested" state about peers: send "not interested" to peers
505 * that don't have any more pieces we want.
507 private void updateInterestedAfterReceive() {
509 for (Connection connection : peers.values()) {
511 if (connection.amInterested) {
512 for (Integer piece : currentPieces) {
513 if (connection.bitfield[piece] == '1') {
519 connection.amInterested = false;
520 sendNotInterested(connection.mailbox);
525 private void updateBitfieldBlocks(int index, int blockIndex, int blockLength) {
526 for (int i = blockIndex; i < (blockIndex + blockLength); i++) {
527 bitfieldBlocks[index][i] = '1';
531 * Returns if a piece is complete in the peer's bitfield.
532 * @param index the index of the piece.
534 private boolean pieceComplete(int index) {
535 for (int i = 0; i < bitfieldBlocks[index].length; i++) {
536 if (bitfieldBlocks[index][i] == '0') {
543 * Returns the first block of a piece that we don't have.
545 private int getFirstBlock(int piece) {
547 for (int i = 0; i < Common.PIECES_BLOCKS; i++) {
548 if (bitfieldBlocks[piece][i] == '0') {
556 * Send request messages to a peer that have unchoked us
557 * @param remotePeer peer data to the peer we want to send the request
559 private void sendRequestsToPeer(Connection remotePeer) {
560 if (remotePeer.bitfield == null) {
563 for (Integer piece : currentPieces) {
564 //Getting the block to send.
565 int blockIndex = -1, blockLength = 0;
566 blockIndex = getFirstBlock(piece);
567 blockLength = Common.PIECES_BLOCKS - blockIndex ;
568 blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
569 if (remotePeer.bitfield[piece] == '1') {
570 sendRequest(remotePeer.mailbox, piece, blockIndex, blockLength);
575 * Find the peers that have the current interested piece and send them
576 * the "interested" message
578 private void sendInterestedToPeers() {
579 if (currentPiece == -1) {
582 for (Connection connection : peers.values()) {
583 if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
584 connection.amInterested = true;
585 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
586 task.dsend(connection.mailbox);
593 * Send a "interested" message to a peer.
595 private void sendInterested(String mailbox) {
596 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
600 * Send a "not interested" message to a peer
601 * @param mailbox mailbox destination mailbox
603 private void sendNotInterested(String mailbox) {
604 MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
608 * Send a handshake message to all the peers the peer has.
609 * @param peer peer data
611 private void sendHandshakeAll() {
612 for (Connection remotePeer : peers.values()) {
613 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
615 task.dsend(remotePeer.mailbox);
619 * Send a "handshake" message to an user
620 * @param mailbox mailbox where to we send the message
622 private void sendHandshake(String mailbox) {
623 Msg.debug("Sending a HANDSHAKE to " + mailbox);
624 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
628 * Send a "choked" message to a peer
630 private void sendChoked(String mailbox) {
631 Msg.debug("Sending a CHOKE to " + mailbox);
632 MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
636 * Send a "unchoked" message to a peer
638 private void sendUnchoked(String mailbox) {
639 Msg.debug("Sending a UNCHOKE to " + mailbox);
640 MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
644 * Send a "HAVE" message to all peers we are connected to
646 private void sendHave(int piece) {
647 Msg.debug("Sending HAVE message to all my peers");
648 for (Connection remotePeer : peers.values()) {
649 MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
650 task.dsend(remotePeer.mailbox);
654 * Send a bitfield message to all the peers the peer has.
655 * @param peer peer data
657 private void sendBitfield(String mailbox) {
658 Msg.debug("Sending a BITFIELD to " + mailbox);
659 MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
663 * Send a "request" message to a pair, containing a request for a piece
665 private void sendRequest(String mailbox, int piece, int blockIndex, int blockLength) {
666 Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece + " and blocks " + blockIndex + "," + (blockIndex + blockLength));
667 MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece, blockIndex, blockLength);
671 * Send a "piece" message to a pair, containing a piece of the file
673 private void sendPiece(String mailbox, int piece, boolean stalled, int blockIndex, int blockLength) {
674 Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
675 MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled, blockIndex, blockLength);
679 private String getStatus() {
681 for (int i = 0; i < Common.FILE_PIECES; i++) {