1 /* Copyright (c) 2006-2020. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 package app.bittorrent;
8 import java.util.ArrayList;
9 import java.util.HashMap;
10 import java.util.Iterator;
11 import java.util.Map.Entry;
12 import java.util.Random;
14 import org.simgrid.msg.Msg;
15 import org.simgrid.msg.Comm;
16 import org.simgrid.msg.Host;
17 import org.simgrid.msg.Task;
18 import org.simgrid.msg.Process;
19 import org.simgrid.msg.MsgException;
21 public class Peer extends Process {
22 Random rand = new Random();
23 protected int round = 0;
24 protected double beginReceiveTime;
25 protected double deadline;
27 protected String mailbox;
28 protected String mailboxTracker;
29 protected String hostname;
30 protected int pieces = 0;
31 protected char[] bitfield = new char[Common.FILE_PIECES];
32 protected char[][] bitfieldBlocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
33 protected short[] piecesCount = new short[Common.FILE_PIECES];
34 protected int piecesRequested = 0;
35 protected ArrayList<Integer> currentPieces = new ArrayList<>();
36 protected int currentPiece = -1;
37 protected HashMap<Integer, Connection> activePeers = new HashMap<>();
38 protected HashMap<Integer, Connection> peers = new HashMap<>();
39 protected Comm commReceived = null;
41 public Peer(Host host, String name, String[]args) {
42 super(host,name,args);
46 public void main(String[] args) throws MsgException {
48 if (args.length != 3 && args.length != 2) {
49 Msg.info("Wrong number of arguments");
51 init(Integer.parseInt(args[0]), (args.length == 3));
53 //Retrieve the deadline
54 deadline = Double.parseDouble(args[1]);
56 Msg.info("Wrong deadline supplied");
59 Msg.info("Hi, I'm joining the network with id " + id);
60 //Getting peer data from the tracker
62 Msg.debug("Got " + peers.size() + " peers from the tracker");
63 Msg.debug("Here is my current status: " + getStatus());
64 beginReceiveTime = Msg.getClock();
66 pieces = Common.FILE_PIECES;
74 Msg.info("Couldn't contact the tracker.");
76 Msg.info("Here is my current status: " + getStatus());
79 private void leechLoop() {
80 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
81 Msg.debug("Start downloading.");
82 // Send a "handshake" message to all the peers it got(it couldn't have gotten more than 50 peers anyway)
84 //Wait for at least one "bitfield" message.
86 Msg.debug("Starting main leech loop");
87 while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
88 if (commReceived == null) {
89 commReceived = Task.irecv(mailbox);
92 if (commReceived.test()) {
93 handleMessage(commReceived.getTask());
96 //If the user has a pending interesting
97 if (currentPiece != -1) {
98 sendInterestedToPeers();
100 if (currentPieces.size() < Common.MAX_PIECES) {
101 updateCurrentPiece();
104 //We don't execute the choke algorithm if we don't already have a piece
105 if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
107 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
113 catch (MsgException e) {
120 private void seedLoop() {
121 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
122 Msg.debug("Start seeding.");
123 //start the main seed loop
124 while (Msg.getClock() < deadline) {
125 if (commReceived == null) {
126 commReceived = Task.irecv(mailbox);
129 if (commReceived.test()) {
130 handleMessage(commReceived.getTask());
133 if (Msg.getClock() >= nextChokedUpdate) {
135 //TODO: Change the choked peer algorithm when seeding
136 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
142 catch (MsgException e) {
149 * @brief Initialize the various peer data
150 * @param id id of the peer to take in the network
151 * @param seed indicates if the peer is a seed
153 private void init(int id, boolean seed) {
155 this.mailbox = Integer.toString(id);
156 this.mailboxTracker = "tracker_" + Integer.toString(id);
158 for (int i = 0; i < bitfield.length; i++) {
160 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
161 bitfieldBlocks[i][j] = '1';
165 for (int i = 0; i < bitfield.length; i++) {
167 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
168 bitfieldBlocks[i][j] = '0' ;
172 this.hostname = getHost().getName();
175 private boolean getPeersData() {
176 boolean success = false;
177 double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
178 //Build the task to send to the tracker
179 TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
181 while (Msg.getClock() < timeout) {
183 Msg.debug("Sending a peer request to the tracker.");
184 taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
187 catch (MsgException e) {
191 while (!success && Msg.getClock() < timeout) {
192 commReceived = Task.irecv(this.mailboxTracker);
194 commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
195 if (commReceived.getTask() instanceof TrackerTask) {
196 TrackerTask task = (TrackerTask)commReceived.getTask();
197 for (Integer peerId: task.peers) {
198 if (peerId != this.id) {
199 peers.put(peerId, new Connection(peerId));
205 catch (MsgException e) {
213 private void handleMessage(Task task) {
214 MessageTask message = (MessageTask)task;
215 Connection remotePeer = peers.get(message.peerId);
216 switch (message.type) {
218 Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
219 //Check if the peer is in our connection list
220 if (remotePeer == null) {
221 peers.put(message.peerId, new Connection(message.peerId));
222 sendHandshake(message.mailbox);
224 //Send our bitfield to the pair
225 sendBitfield(message.mailbox);
228 Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
229 //update the pieces list
230 updatePiecesCountFromBitfield(message.bitfield);
231 //Update the current piece
232 if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
233 updateCurrentPiece();
235 remotePeer.bitfield = message.bitfield.clone();
238 Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
239 assert remotePeer != null;
240 remotePeer.interested = true;
243 Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
244 assert remotePeer != null;
245 remotePeer.interested = false;
248 Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
249 assert remotePeer != null;
250 remotePeer.chokedDownload = false;
251 activePeers.put(remotePeer.id,remotePeer);
252 sendRequestsToPeer(remotePeer);
255 Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
256 assert remotePeer != null;
257 remotePeer.chokedDownload = true;
258 activePeers.remove(remotePeer.id);
261 if (remotePeer.bitfield == null) {
264 Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
265 assert message.index >= 0 && message.index < Common.FILE_PIECES;
266 assert remotePeer.bitfield != null;
267 remotePeer.bitfield[message.index] = '1';
268 piecesCount[message.index]++;
269 //Send interested message to the peer if he has what we want
270 if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
271 remotePeer.amInterested = true;
272 sendInterested(remotePeer.mailbox);
275 if (currentPieces.contains(message.index)) {
276 int blockIndex = getFirstBlock(message.index);
277 int blockLength = Common.PIECES_BLOCKS - blockIndex ;
278 blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
279 sendRequest(message.mailbox,message.index,blockIndex,blockLength);
283 assert message.index >= 0 && message.index < Common.FILE_PIECES;
284 if (!remotePeer.chokedUpload) {
285 Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for "
287 if (bitfield[message.index] == '1') {
288 sendPiece(message.mailbox,message.index,false,message.blockIndex,message.blockLength);
290 Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname
291 + ") but he is choked" );
296 if (message.stalled) {
297 Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname
300 Msg.debug("Received piece " + message.index + " from " + message.peerId + " ("
301 + message.issuerHostname + ")");
302 if (bitfield[message.index] == '0') {
303 updateBitfieldBlocks(message.index,message.blockIndex,message.blockLength);
304 if (pieceComplete(message.index)) {
306 //Removing the piece from our piece list.
307 currentPieces.remove((Object)Integer.valueOf(message.index));
308 //Setting the fact that we have the piece
309 bitfield[message.index] = '1';
311 Msg.debug("My status is now " + getStatus());
312 //Sending the information to all the peers we are connected to
313 sendHave(message.index);
314 //sending UNINTERESTED to peers that doesn't have what we want.
315 updateInterestedAfterReceive();
318 Msg.debug("However, we already have it.");
323 Msg.error("Unexpected message type: " + message.type);
326 if (remotePeer != null) {
327 remotePeer.addSpeedValue(1 / (Msg.getClock() - beginReceiveTime));
329 beginReceiveTime = Msg.getClock();
332 private void waitForPieces() {
333 boolean finished = false;
334 while (Msg.getClock() < deadline && !finished) {
335 if (commReceived == null) {
336 commReceived = Task.irecv(mailbox);
339 commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
340 handleMessage(commReceived.getTask());
341 if (currentPiece != -1) {
346 catch (MsgException e) {
352 private boolean hasFinished() {
353 for (int i = 0; i < bitfield.length; i++) {
354 if (bitfield[i] == '1') {
362 * @brief Updates the list of who has a piece from a bitfield
363 * @param bitfield bitfield
365 private void updatePiecesCountFromBitfield(char[] bitfield) {
366 for (int i = 0; i < Common.FILE_PIECES; i++) {
367 if (bitfield[i] == '1') {
374 * Update the piece the peer is currently interested in.
375 * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
376 * If the peer has less than 3 pieces, he chooses a piece at random.
377 * If the peer has more than pieces, he downloads the pieces that are the less
380 private void updateCurrentPiece() {
381 if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
385 //TODO: trivial min algorithm when pieces >= 3
387 currentPiece = rand.nextInt(Common.FILE_PIECES);
388 } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
390 currentPieces.add(currentPiece);
391 Msg.debug("New interested piece: " + currentPiece);
392 assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
395 // Update the list of current choked and unchoked peers, using the choke algorithm
396 private void updateChokedPeers() {
397 round = (round + 1) % 3;
398 if (peers.size() == 0) {
401 //remove a peer from the list
402 Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
404 Entry<Integer,Connection> e = it.next();
405 Connection peerChoked = e.getValue();
406 peerChoked.chokedUpload = true;
407 sendChoked(peerChoked.mailbox);
408 activePeers.remove(e.getKey());
410 Connection peerChosen = null;
411 //Separate the case from when the peer is seeding.
412 if (pieces == Common.FILE_PIECES) {
413 //Find the last unchoked peer.
414 double unchokeTime = deadline + 1;
415 for (Connection connection : peers.values()) {
416 if (connection.lastUnchoke < unchokeTime && connection.interested) {
417 peerChosen = connection;
418 unchokeTime = connection.lastUnchoke;
422 //Random optimistic unchoking
427 int idChosen = rand.nextInt(peers.size());
428 for (Connection connection : peers.values()) {
430 peerChosen = connection;
434 } //TODO: Not really the best way ever
435 if (peerChosen != null && !peerChosen.interested) {
439 } while (peerChosen == null && j < Common.MAXIMUM_PEERS);
441 Connection fastest = null;
442 double fastestSpeed = 0;
443 for (Connection c : peers.values()) {
444 if (c.peerSpeed > fastestSpeed && c.interested && c.chokedUpload) {
446 fastestSpeed = c.peerSpeed;
449 peerChosen = fastest;
452 if (peerChosen != null) {
453 activePeers.put(peerChosen.id,peerChosen);
454 peerChosen.chokedUpload = false;
455 peerChosen.lastUnchoke = Msg.getClock();
456 sendUnchoked(peerChosen.mailbox);
460 // Updates our "interested" state about peers: send "not interested" to peers that don't have any more pieces we want.
461 private void updateInterestedAfterReceive() {
463 for (Connection connection : peers.values()) {
465 if (connection.amInterested) {
466 for (Integer piece : currentPieces) {
467 if (connection.bitfield[piece] == '1') {
473 connection.amInterested = false;
474 sendNotInterested(connection.mailbox);
480 private void updateBitfieldBlocks(int index, int blockIndex, int blockLength) {
481 for (int i = blockIndex; i < (blockIndex + blockLength); i++) {
482 bitfieldBlocks[index][i] = '1';
486 // Returns if a piece is complete in the peer's bitfield.
487 private boolean pieceComplete(int index) {
488 for (int i = 0; i < bitfieldBlocks[index].length; i++) {
489 if (bitfieldBlocks[index][i] == '0') {
496 // Returns the first block of a piece that we don't have.
497 private int getFirstBlock(int piece) {
499 for (int i = 0; i < Common.PIECES_BLOCKS; i++) {
500 if (bitfieldBlocks[piece][i] == '0') {
509 * @brief Send request messages to a peer that have unchoked us
510 * @param remotePeer peer data to the peer we want to send the request
512 private void sendRequestsToPeer(Connection remotePeer) {
513 if (remotePeer.bitfield == null) {
516 for (Integer piece : currentPieces) {
517 //Getting the block to send.
518 int blockIndex = getFirstBlock(piece);
519 int blockLength = Common.PIECES_BLOCKS - blockIndex ;
520 blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
521 if (remotePeer.bitfield[piece] == '1') {
522 sendRequest(remotePeer.mailbox, piece, blockIndex, blockLength);
527 // Find the peers that have the current interested piece and send them the "interested" message
528 private void sendInterestedToPeers() {
529 if (currentPiece == -1) {
532 for (Connection connection : peers.values()) {
533 if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
534 connection.amInterested = true;
535 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
536 task.dsend(connection.mailbox);
543 // Send a "interested" message to a peer.
544 private void sendInterested(String mailbox) {
545 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
550 * @brief Send a "not interested" message to a peer
551 * @param mailbox mailbox destination mailbox
553 private void sendNotInterested(String mailbox) {
554 MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
558 // Send a handshake message to all the peers the peer has.
559 private void sendHandshakeAll() {
560 for (Connection remotePeer : peers.values()) {
561 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox, id);
562 task.dsend(remotePeer.mailbox);
567 * @brief Send a "handshake" message to an user
568 * @param mailbox mailbox where to we send the message
570 private void sendHandshake(String mailbox) {
571 Msg.debug("Sending a HANDSHAKE to " + mailbox);
572 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
576 // Send a "choked" message to a peer
577 private void sendChoked(String mailbox) {
578 Msg.debug("Sending a CHOKE to " + mailbox);
579 MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
583 // Send a "unchoked" message to a peer
584 private void sendUnchoked(String mailbox) {
585 Msg.debug("Sending a UNCHOKE to " + mailbox);
586 MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
590 // Send a "HAVE" message to all peers we are connected to
591 private void sendHave(int piece) {
592 Msg.debug("Sending HAVE message to all my peers");
593 for (Connection remotePeer : peers.values()) {
594 MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
595 task.dsend(remotePeer.mailbox);
598 // Send a bitfield message to all the peers the peer has.
599 private void sendBitfield(String mailbox) {
600 Msg.debug("Sending a BITFIELD to " + mailbox);
601 MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
604 // Send a "request" message to a peer, containing a request for a piece
605 private void sendRequest(String mailbox, int piece, int blockIndex, int blockLength) {
606 Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece + " and blocks " + blockIndex + ","
607 + (blockIndex + blockLength));
608 MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece, blockIndex,
613 // Send a "piece" message to a peer, containing a piece of the file
614 private void sendPiece(String mailbox, int piece, boolean stalled, int blockIndex, int blockLength) {
615 Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
616 MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled,
617 blockIndex, blockLength);
621 private String getStatus() {
622 StringBuilder s = new StringBuilder("");
623 for (int i = 0; i < Common.FILE_PIECES; i++)
624 s.append(bitfield[i]);