1 /* Copyright (c) 2006-2014, 2016-2017. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 package app.bittorrent;
9 import java.util.ArrayList;
10 import java.util.HashMap;
11 import java.util.Iterator;
12 import java.util.Map.Entry;
14 import org.simgrid.msg.Msg;
15 import org.simgrid.msg.Comm;
16 import org.simgrid.msg.Host;
17 import org.simgrid.msg.Task;
18 import org.simgrid.msg.Process;
19 import org.simgrid.msg.RngStream;
20 import org.simgrid.msg.MsgException;
22 public class Peer extends Process {
23 protected int round = 0;
24 protected double beginReceiveTime;
25 protected double deadline;
26 protected static RngStream stream = new RngStream();
28 protected String mailbox;
29 protected String mailboxTracker;
30 protected String hostname;
31 protected int pieces = 0;
32 protected char[] bitfield = new char[Common.FILE_PIECES];
33 protected char[][] bitfieldBlocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
34 protected short[] piecesCount = new short[Common.FILE_PIECES];
35 protected int piecesRequested = 0;
36 protected ArrayList<Integer> currentPieces = new ArrayList<>();
37 protected int currentPiece = -1;
38 protected HashMap<Integer, Connection> activePeers = new HashMap<>();
39 protected HashMap<Integer, Connection> peers = new HashMap<>();
40 protected Comm commReceived = null;
42 public Peer(Host host, String name, String[]args) {
43 super(host,name,args);
47 public void main(String[] args) throws MsgException {
49 if (args.length != 3 && args.length != 2) {
50 Msg.info("Wrong number of arguments");
52 if (args.length == 3) {
53 init(Integer.parseInt(args[0]),true);
55 init(Integer.parseInt(args[0]),false);
57 //Retrieve the deadline
58 deadline = Double.parseDouble(args[1]);
60 Msg.info("Wrong deadline supplied");
63 Msg.info("Hi, I'm joining the network with id " + id);
64 //Getting peer data from the tracker
66 Msg.debug("Got " + peers.size() + " peers from the tracker");
67 Msg.debug("Here is my current status: " + getStatus());
68 beginReceiveTime = Msg.getClock();
70 pieces = Common.FILE_PIECES;
78 Msg.info("Couldn't contact the tracker.");
80 Msg.info("Here is my current status: " + getStatus());
83 private void leechLoop() {
84 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
85 Msg.debug("Start downloading.");
86 // Send a "handshake" message to all the peers it got(it couldn't have gotten more than 50 peers anyway)
88 //Wait for at least one "bitfield" message.
90 Msg.debug("Starting main leech loop");
91 while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
92 if (commReceived == null) {
93 commReceived = Task.irecv(mailbox);
96 if (commReceived.test()) {
97 handleMessage(commReceived.getTask());
100 //If the user has a pending interesting
101 if (currentPiece != -1) {
102 sendInterestedToPeers();
104 if (currentPieces.size() < Common.MAX_PIECES) {
105 updateCurrentPiece();
108 //We don't execute the choke algorithm if we don't already have a piece
109 if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
111 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
117 catch (MsgException e) {
124 private void seedLoop() {
125 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
126 Msg.debug("Start seeding.");
127 //start the main seed loop
128 while (Msg.getClock() < deadline) {
129 if (commReceived == null) {
130 commReceived = Task.irecv(mailbox);
133 if (commReceived.test()) {
134 handleMessage(commReceived.getTask());
137 if (Msg.getClock() >= nextChokedUpdate) {
139 //TODO: Change the choked peer algorithm when seeding
140 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
146 catch (MsgException e) {
153 * @brief Initialize the various peer data
154 * @param id id of the peer to take in the network
155 * @param seed indicates if the peer is a seed
157 private void init(int id, boolean seed) {
159 this.mailbox = Integer.toString(id);
160 this.mailboxTracker = "tracker_" + Integer.toString(id);
162 for (int i = 0; i < bitfield.length; i++) {
164 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
165 bitfieldBlocks[i][j] = '1';
169 for (int i = 0; i < bitfield.length; i++) {
171 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
172 bitfieldBlocks[i][j] = '0' ;
176 this.hostname = getHost().getName();
179 private boolean getPeersData() {
180 boolean success = false;
181 boolean sendSuccess = false;
182 double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
183 //Build the task to send to the tracker
184 TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
186 while (!sendSuccess && Msg.getClock() < timeout) {
188 Msg.debug("Sending a peer request to the tracker.");
189 taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
192 catch (MsgException e) {
196 while (!success && Msg.getClock() < timeout) {
197 commReceived = Task.irecv(this.mailboxTracker);
199 commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
200 if (commReceived.getTask() instanceof TrackerTask) {
201 TrackerTask task = (TrackerTask)commReceived.getTask();
202 for (Integer peerId: task.peers) {
203 if (peerId != this.id) {
204 peers.put(peerId, new Connection(peerId));
210 catch (MsgException e) {
219 private void handleMessage(Task task) {
220 MessageTask message = (MessageTask)task;
221 Connection remotePeer = peers.get(message.peerId);
222 switch (message.type) {
224 Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
225 //Check if the peer is in our connection list
226 if (remotePeer == null) {
227 peers.put(message.peerId, new Connection(message.peerId));
228 sendHandshake(message.mailbox);
230 //Send our bitfield to the pair
231 sendBitfield(message.mailbox);
234 Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
235 //update the pieces list
236 updatePiecesCountFromBitfield(message.bitfield);
237 //Update the current piece
238 if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
239 updateCurrentPiece();
241 remotePeer.bitfield = message.bitfield.clone();
244 Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
245 assert remotePeer != null;
246 remotePeer.interested = true;
249 Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
250 assert remotePeer != null;
251 remotePeer.interested = false;
254 Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
255 assert remotePeer != null;
256 remotePeer.chokedDownload = false;
257 activePeers.put(remotePeer.id,remotePeer);
258 sendRequestsToPeer(remotePeer);
261 Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
262 assert remotePeer != null;
263 remotePeer.chokedDownload = true;
264 activePeers.remove(remotePeer.id);
267 if (remotePeer.bitfield == null) {
270 Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
271 assert message.index >= 0 && message.index < Common.FILE_PIECES;
272 assert remotePeer.bitfield != null;
273 remotePeer.bitfield[message.index] = '1';
274 piecesCount[message.index]++;
275 //Send interested message to the peer if he has what we want
276 if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
277 remotePeer.amInterested = true;
278 sendInterested(remotePeer.mailbox);
281 if (currentPieces.contains(message.index)) {
282 int blockIndex = getFirstBlock(message.index);
283 int blockLength = Common.PIECES_BLOCKS - blockIndex ;
284 blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
285 sendRequest(message.mailbox,message.index,blockIndex,blockLength);
289 assert message.index >= 0 && message.index < Common.FILE_PIECES;
290 if (!remotePeer.chokedUpload) {
291 Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for "
293 if (bitfield[message.index] == '1') {
294 sendPiece(message.mailbox,message.index,false,message.blockIndex,message.blockLength);
296 Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname
297 + ") but he is choked" );
302 if (message.stalled) {
303 Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname
306 Msg.debug("Received piece " + message.index + " from " + message.peerId + " ("
307 + message.issuerHostname + ")");
308 if (bitfield[message.index] == '0') {
309 updateBitfieldBlocks(message.index,message.blockIndex,message.blockLength);
310 if (pieceComplete(message.index)) {
312 //Removing the piece from our piece list.
313 currentPieces.remove((Object)Integer.valueOf(message.index));
314 //Setting the fact that we have the piece
315 bitfield[message.index] = '1';
317 Msg.debug("My status is now " + getStatus());
318 //Sending the information to all the peers we are connected to
319 sendHave(message.index);
320 //sending UNINTERESTED to peers that doesn't have what we want.
321 updateInterestedAfterReceive();
324 Msg.debug("However, we already have it.");
329 Msg.error("Unexpected message type: " + message.type);
332 if (remotePeer != null) {
333 remotePeer.addSpeedValue(1 / (Msg.getClock() - beginReceiveTime));
335 beginReceiveTime = Msg.getClock();
338 private void waitForPieces() {
339 boolean finished = false;
340 while (Msg.getClock() < deadline && !finished) {
341 if (commReceived == null) {
342 commReceived = Task.irecv(mailbox);
345 commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
346 handleMessage(commReceived.getTask());
347 if (currentPiece != -1) {
352 catch (MsgException e) {
358 private boolean hasFinished() {
359 for (int i = 0; i < bitfield.length; i++) {
360 if (bitfield[i] == '1') {
368 * @brief Updates the list of who has a piece from a bitfield
369 * @param bitfield bitfield
371 private void updatePiecesCountFromBitfield(char[] bitfield) {
372 for (int i = 0; i < Common.FILE_PIECES; i++) {
373 if (bitfield[i] == '1') {
380 * Update the piece the peer is currently interested in.
381 * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
382 * If the peer has less than 3 pieces, he chooses a piece at random.
383 * If the peer has more than pieces, he downloads the pieces that are the less
386 private void updateCurrentPiece() {
387 if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
391 //TODO: trivial min algorithm when pieces >= 3
393 currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
394 } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
396 currentPieces.add(currentPiece);
397 Msg.debug("New interested piece: " + currentPiece);
398 assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
401 // Update the list of current choked and unchoked peers, using the choke algorithm
402 private void updateChokedPeers() {
403 round = (round + 1) % 3;
404 if (peers.size() == 0) {
407 //remove a peer from the list
408 Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
410 Entry<Integer,Connection> e = it.next();
411 Connection peerChoked = e.getValue();
412 peerChoked.chokedUpload = true;
413 sendChoked(peerChoked.mailbox);
414 activePeers.remove(e.getKey());
416 Connection peerChoosed = null;
417 //Separate the case from when the peer is seeding.
418 if (pieces == Common.FILE_PIECES) {
419 //Find the last unchoked peer.
420 double unchokeTime = deadline + 1;
421 for (Connection connection : peers.values()) {
422 if (connection.lastUnchoke < unchokeTime && connection.interested) {
423 peerChoosed = connection;
424 unchokeTime = connection.lastUnchoke;
428 //Random optimistic unchoking
433 int idChosen = stream.randInt(0,peers.size() - 1);
434 for (Connection connection : peers.values()) {
436 peerChoosed = connection;
440 } //TODO: Not really the best way ever
441 if (peerChoosed != null && !peerChoosed.interested) {
445 } while (peerChoosed == null && j < Common.MAXIMUM_PEERS);
447 Connection fastest = null;
448 double fastestSpeed = 0;
449 for (Connection c : peers.values()) {
450 if (c.peerSpeed > fastestSpeed && c.interested && c.chokedUpload) {
452 fastestSpeed = c.peerSpeed;
455 peerChoosed = fastest;
458 if (peerChoosed != null) {
459 activePeers.put(peerChoosed.id,peerChoosed);
460 peerChoosed.chokedUpload = false;
461 peerChoosed.lastUnchoke = Msg.getClock();
462 sendUnchoked(peerChoosed.mailbox);
466 // Updates our "interested" state about peers: send "not interested" to peers that don't have any more pieces we want.
467 private void updateInterestedAfterReceive() {
469 for (Connection connection : peers.values()) {
471 if (connection.amInterested) {
472 for (Integer piece : currentPieces) {
473 if (connection.bitfield[piece] == '1') {
479 connection.amInterested = false;
480 sendNotInterested(connection.mailbox);
486 private void updateBitfieldBlocks(int index, int blockIndex, int blockLength) {
487 for (int i = blockIndex; i < (blockIndex + blockLength); i++) {
488 bitfieldBlocks[index][i] = '1';
492 // Returns if a piece is complete in the peer's bitfield.
493 private boolean pieceComplete(int index) {
494 for (int i = 0; i < bitfieldBlocks[index].length; i++) {
495 if (bitfieldBlocks[index][i] == '0') {
502 // Returns the first block of a piece that we don't have.
503 private int getFirstBlock(int piece) {
505 for (int i = 0; i < Common.PIECES_BLOCKS; i++) {
506 if (bitfieldBlocks[piece][i] == '0') {
515 * @brief Send request messages to a peer that have unchoked us
516 * @param remotePeer peer data to the peer we want to send the request
518 private void sendRequestsToPeer(Connection remotePeer) {
519 if (remotePeer.bitfield == null) {
522 for (Integer piece : currentPieces) {
523 //Getting the block to send.
524 int blockIndex = getFirstBlock(piece);
525 int blockLength = Common.PIECES_BLOCKS - blockIndex ;
526 blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
527 if (remotePeer.bitfield[piece] == '1') {
528 sendRequest(remotePeer.mailbox, piece, blockIndex, blockLength);
533 // Find the peers that have the current interested piece and send them the "interested" message
534 private void sendInterestedToPeers() {
535 if (currentPiece == -1) {
538 for (Connection connection : peers.values()) {
539 if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
540 connection.amInterested = true;
541 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
542 task.dsend(connection.mailbox);
549 // Send a "interested" message to a peer.
550 private void sendInterested(String mailbox) {
551 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
556 * @brief Send a "not interested" message to a peer
557 * @param mailbox mailbox destination mailbox
559 private void sendNotInterested(String mailbox) {
560 MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
564 // Send a handshake message to all the peers the peer has.
565 private void sendHandshakeAll() {
566 for (Connection remotePeer : peers.values()) {
567 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox, id);
568 task.dsend(remotePeer.mailbox);
573 * @brief Send a "handshake" message to an user
574 * @param mailbox mailbox where to we send the message
576 private void sendHandshake(String mailbox) {
577 Msg.debug("Sending a HANDSHAKE to " + mailbox);
578 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
582 // Send a "choked" message to a peer
583 private void sendChoked(String mailbox) {
584 Msg.debug("Sending a CHOKE to " + mailbox);
585 MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
589 // Send a "unchoked" message to a peer
590 private void sendUnchoked(String mailbox) {
591 Msg.debug("Sending a UNCHOKE to " + mailbox);
592 MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
596 // Send a "HAVE" message to all peers we are connected to
597 private void sendHave(int piece) {
598 Msg.debug("Sending HAVE message to all my peers");
599 for (Connection remotePeer : peers.values()) {
600 MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
601 task.dsend(remotePeer.mailbox);
604 // Send a bitfield message to all the peers the peer has.
605 private void sendBitfield(String mailbox) {
606 Msg.debug("Sending a BITFIELD to " + mailbox);
607 MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
610 // Send a "request" message to a peer, containing a request for a piece
611 private void sendRequest(String mailbox, int piece, int blockIndex, int blockLength) {
612 Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece + " and blocks " + blockIndex + ","
613 + (blockIndex + blockLength));
614 MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece, blockIndex,
619 // Send a "piece" message to a peer, containing a piece of the file
620 private void sendPiece(String mailbox, int piece, boolean stalled, int blockIndex, int blockLength) {
621 Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
622 MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled,
623 blockIndex, blockLength);
627 private String getStatus() {
628 StringBuilder s = new StringBuilder("");
629 for (int i = 0; i < Common.FILE_PIECES; i++)
630 s.append(bitfield[i]);