3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.Iterator;
6 import java.util.Map.Entry;
8 import org.simgrid.msg.Comm;
9 import org.simgrid.msg.Host;
10 import org.simgrid.msg.Msg;
11 import org.simgrid.msg.MsgException;
12 import org.simgrid.msg.RngStream;
13 import org.simgrid.msg.Process;
14 import org.simgrid.msg.Task;
17 * Main class for peers execution
19 public class Peer extends Process {
20 protected int round = 0;
22 protected double deadline;
24 protected static RngStream stream = new RngStream();
27 protected String mailbox;
28 protected String mailboxTracker;
29 protected String hostname;
30 protected int pieces = 0;
31 protected char[] bitfield = new char[Common.FILE_PIECES];
32 protected char[][] bitfieldBlocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
34 protected short[] piecesCount = new short[Common.FILE_PIECES];
36 protected int piecesRequested = 0;
38 protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
39 protected int currentPiece = -1;
41 protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();
42 protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
44 protected Comm commReceived = null;
46 public Peer(Host host, String name, String[]args) {
47 super(host,name,args);
51 public void main(String[] args) throws MsgException {
53 if (args.length != 3 && args.length != 2) {
54 Msg.info("Wrong number of arguments");
56 if (args.length == 3) {
57 init(Integer.valueOf(args[0]),true);
60 init(Integer.valueOf(args[0]),false);
62 //Retrieve the deadline
63 deadline = Double.valueOf(args[1]);
65 Msg.info("Wrong deadline supplied");
68 Msg.info("Hi, I'm joining the network with id " + id);
69 //Getting peer data from the tracker
71 Msg.debug("Got " + peers.size() + " peers from the tracker");
72 Msg.debug("Here is my current status: " + getStatus());
74 pieces = Common.FILE_PIECES;
84 Msg.info("Couldn't contact the tracker.");
86 Msg.info("Here is my current status: " + getStatus());
89 * Peer main loop when it is leeching.
91 private void leechLoop() {
92 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
93 Msg.debug("Start downloading.");
95 * Send a "handshake" message to all the peers it got
96 * (it couldn't have gotten more than 50 peers anyway)
99 //Wait for at least one "bitfield" message.
101 Msg.debug("Starting main leech loop");
102 while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
103 if (commReceived == null) {
104 commReceived = Task.irecv(mailbox);
107 if (commReceived.test()) {
108 handleMessage(commReceived.getTask());
112 //If the user has a pending interesting
113 if (currentPiece != -1) {
114 sendInterestedToPeers();
117 if (currentPieces.size() < Common.MAX_PIECES) {
118 updateCurrentPiece();
121 //We don't execute the choke algorithm if we don't already have a piece
122 if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
124 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
131 catch (MsgException e) {
138 * Peer main loop when it is seeding
140 private void seedLoop() {
141 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
142 Msg.debug("Start seeding.");
143 //start the main seed loop
144 while (Msg.getClock() < deadline) {
145 if (commReceived == null) {
146 commReceived = Task.irecv(mailbox);
149 if (commReceived.test()) {
150 handleMessage(commReceived.getTask());
154 if (Msg.getClock() >= nextChokedUpdate) {
156 //TODO: Change the choked peer algorithm when seeding
157 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
164 catch (MsgException e) {
172 * Initialize the various peer data
173 * @param id id of the peer to take in the network
174 * @param seed indicates if the peer is a seed
176 private void init(int id, boolean seed) {
178 this.mailbox = Integer.toString(id);
179 this.mailboxTracker = "tracker_" + Integer.toString(id);
181 for (int i = 0; i < bitfield.length; i++) {
183 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
184 bitfieldBlocks[i][j] = '1';
189 for (int i = 0; i < bitfield.length; i++) {
191 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
192 bitfieldBlocks[i][j] = '0' ;
196 this.hostname = host.getName();
199 * Retrieves the peer list from the tracker
201 private boolean getPeersData() {
203 boolean success = false, sendSuccess = false;
204 double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
205 //Build the task to send to the tracker
206 TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
208 while (!sendSuccess && Msg.getClock() < timeout) {
210 Msg.debug("Sending a peer request to the tracker.");
211 taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
214 catch (MsgException e) {
218 while (!success && Msg.getClock() < timeout) {
219 commReceived = Task.irecv(this.mailboxTracker);
221 commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
222 if (commReceived.getTask() instanceof TrackerTask) {
223 TrackerTask task = (TrackerTask)commReceived.getTask();
224 for (Integer peerId: task.peers) {
225 if (peerId != this.id) {
226 peers.put(peerId, new Connection(peerId));
232 catch (MsgException e) {
241 * Handle a received message sent by another peer
242 * @param task task received.
244 void handleMessage(Task task) {
245 MessageTask message = (MessageTask)task;
246 Connection remotePeer = peers.get(message.peerId);
247 switch (message.type) {
249 Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
250 //Check if the peer is in our connection list
251 if (remotePeer == null) {
252 peers.put(message.peerId, new Connection(message.peerId));
253 sendHandshake(message.mailbox);
255 //Send our bitfield to the pair
256 sendBitfield(message.mailbox);
259 Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
260 //update the pieces list
261 updatePiecesCountFromBitfield(message.bitfield);
262 //Update the current piece
263 if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
264 updateCurrentPiece();
266 remotePeer.bitfield = message.bitfield.clone();
269 Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
270 assert remotePeer != null;
271 remotePeer.interested = true;
274 Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
275 assert remotePeer != null;
276 remotePeer.interested = false;
279 Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
280 assert remotePeer != null;
281 remotePeer.chokedDownload = false;
282 activePeers.put(remotePeer.id,remotePeer);
283 sendRequestsToPeer(remotePeer);
286 Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
287 assert remotePeer != null;
288 remotePeer.chokedDownload = true;
289 activePeers.remove(remotePeer.id);
292 if (remotePeer.bitfield == null) {
295 Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
296 assert message.index >= 0 && message.index < Common.FILE_PIECES;
297 assert remotePeer.bitfield != null;
298 remotePeer.bitfield[message.index] = '1';
299 piecesCount[message.index]++;
300 //Send interested message to the peer if he has what we want
301 if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
302 remotePeer.amInterested = true;
303 sendInterested(remotePeer.mailbox);
306 if (currentPieces.contains(message.index)) {
307 int blockIndex = getFirstBlock(message.index);
308 int blockLength = Common.PIECES_BLOCKS - blockIndex ;
309 blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
310 sendRequest(message.mailbox,message.index,blockIndex,blockLength);
314 assert message.index >= 0 && message.index < Common.FILE_PIECES;
315 if (!remotePeer.chokedUpload) {
316 Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
317 if (bitfield[message.index] == '1') {
318 sendPiece(message.mailbox,message.index,false,message.blockIndex,message.blockLength);
321 Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
326 if (message.stalled) {
327 Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ") is stalled");
330 Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
331 if (bitfield[message.index] == '0') {
332 updateBitfieldBlocks(message.index,message.blockIndex,message.blockLength);
333 if (pieceComplete(message.index)) {
335 //Removing the piece from our piece list.
336 if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
338 //Setting the fact that we have the piece
339 bitfield[message.index] = '1';
341 Msg.debug("My status is now " + getStatus());
342 //Sending the information to all the peers we are connected to
343 sendHave(message.index);
344 //sending UNINTERESTED to peers that doesn't have what we want.
345 updateInterestedAfterReceive();
349 Msg.debug("However, we already have it.");
356 * Wait for the node to receive interesting bitfield messages (ie: non empty)
359 void waitForPieces() {
360 boolean finished = false;
361 while (Msg.getClock() < deadline && !finished) {
362 if (commReceived == null) {
363 commReceived = Task.irecv(mailbox);
366 commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
367 handleMessage(commReceived.getTask());
368 if (currentPiece != -1) {
373 catch (MsgException e) {
379 private boolean hasFinished() {
380 for (int i = 0; i < bitfield.length; i++) {
381 if (bitfield[i] == '1') {
388 * Updates the list of who has a piece from a bitfield
389 * @param bitfield bitfield
391 private void updatePiecesCountFromBitfield(char bitfield[]) {
392 for (int i = 0; i < Common.FILE_PIECES; i++) {
393 if (bitfield[i] == '1') {
399 * Update the piece the peer is currently interested in.
400 * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
401 * If the peer has less than 3 pieces, he chooses a piece at random.
402 * If the peer has more than pieces, he downloads the pieces that are the less
405 void updateCurrentPiece() {
406 if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
409 if (true || pieces < 3) {
410 int i = 0, peerPiece;
412 currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
414 } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
417 //trivial min algorithm.
420 currentPieces.add(currentPiece);
421 Msg.debug("New interested piece: " + currentPiece);
422 assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
425 * Update the list of current choked and unchoked peers, using the
428 private void updateChokedPeers() {
429 round = (round + 1) % 3;
430 if (peers.size() == 0) {
433 //remove a peer from the list
434 Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
436 Entry<Integer,Connection> e = it.next();
437 Connection peerChoked = e.getValue();
438 sendChoked(peerChoked.mailbox);
439 peerChoked.chokedUpload = true;
440 activePeers.remove(e.getKey());
442 //Random optimistic unchoking
443 if (round == 0 || true) {
445 Connection peerChoosed = null;
448 int idChosen = stream.randInt(0,peers.size() - 1);
449 for (Connection connection : peers.values()) {
451 peerChoosed = connection;
455 } //TODO: Not really the best way ever
456 if (!peerChoosed.interested) {
460 } while (peerChoosed == null && j <
461 Common.MAXIMUM_PEERS);
462 if (peerChoosed != null) {
463 activePeers.put(peerChoosed.id,peerChoosed);
464 peerChoosed.chokedUpload = false;
465 sendUnchoked(peerChoosed.mailbox);
468 //TODO: Use the leecher choke algorithm.
471 * Updates our "interested" state about peers: send "not interested" to peers
472 * that don't have any more pieces we want.
474 private void updateInterestedAfterReceive() {
476 for (Connection connection : peers.values()) {
478 if (connection.amInterested) {
479 for (Integer piece : currentPieces) {
480 if (connection.bitfield[piece] == '1') {
486 connection.amInterested = false;
487 sendNotInterested(connection.mailbox);
492 private void updateBitfieldBlocks(int index, int blockIndex, int blockLength) {
493 for (int i = blockIndex; i < (blockIndex + blockLength); i++) {
494 bitfieldBlocks[index][i] = '1';
498 * Returns if a piece is complete in the peer's bitfield.
499 * @param index the index of the piece.
501 private boolean pieceComplete(int index) {
502 for (int i = 0; i < bitfieldBlocks[index].length; i++) {
503 if (bitfieldBlocks[index][i] == '0') {
510 * Returns the first block of a piece that we don't have.
512 private int getFirstBlock(int piece) {
514 for (int i = 0; i < Common.PIECES_BLOCKS; i++) {
515 if (bitfieldBlocks[piece][i] == '0') {
523 * Send request messages to a peer that have unchoked us
524 * @param remotePeer peer data to the peer we want to send the request
526 private void sendRequestsToPeer(Connection remotePeer) {
527 if (remotePeer.bitfield == null) {
530 for (Integer piece : currentPieces) {
531 //Getting the block to send.
532 int blockIndex = -1, blockLength = 0;
533 blockIndex = getFirstBlock(piece);
534 blockLength = Common.PIECES_BLOCKS - blockIndex ;
535 blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;
536 if (remotePeer.bitfield[piece] == '1') {
537 sendRequest(remotePeer.mailbox, piece, blockIndex, blockLength);
542 * Find the peers that have the current interested piece and send them
543 * the "interested" message
545 private void sendInterestedToPeers() {
546 if (currentPiece == -1) {
549 for (Connection connection : peers.values()) {
550 if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
551 connection.amInterested = true;
552 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
553 task.dsend(connection.mailbox);
560 * Send a "interested" message to a peer.
562 private void sendInterested(String mailbox) {
563 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
567 * Send a "not interested" message to a peer
568 * @param mailbox mailbox destination mailbox
570 private void sendNotInterested(String mailbox) {
571 MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
575 * Send a handshake message to all the peers the peer has.
576 * @param peer peer data
578 private void sendHandshakeAll() {
579 for (Connection remotePeer : peers.values()) {
580 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
582 task.dsend(remotePeer.mailbox);
586 * Send a "handshake" message to an user
587 * @param mailbox mailbox where to we send the message
589 private void sendHandshake(String mailbox) {
590 Msg.debug("Sending a HANDSHAKE to " + mailbox);
591 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
595 * Send a "choked" message to a peer
597 private void sendChoked(String mailbox) {
598 Msg.debug("Sending a CHOKE to " + mailbox);
599 MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
603 * Send a "unchoked" message to a peer
605 private void sendUnchoked(String mailbox) {
606 Msg.debug("Sending a UNCHOKE to " + mailbox);
607 MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
611 * Send a "HAVE" message to all peers we are connected to
613 private void sendHave(int piece) {
614 Msg.debug("Sending HAVE message to all my peers");
615 for (Connection remotePeer : peers.values()) {
616 MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
617 task.dsend(remotePeer.mailbox);
621 * Send a bitfield message to all the peers the peer has.
622 * @param peer peer data
624 private void sendBitfield(String mailbox) {
625 Msg.debug("Sending a BITFIELD to " + mailbox);
626 MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
630 * Send a "request" message to a pair, containing a request for a piece
632 private void sendRequest(String mailbox, int piece, int blockIndex, int blockLength) {
633 Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece + " and blocks " + blockIndex + "," + (blockIndex + blockLength));
634 MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece, blockIndex, blockLength);
638 * Send a "piece" message to a pair, containing a piece of the file
640 private void sendPiece(String mailbox, int piece, boolean stalled, int blockIndex, int blockLength) {
641 Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
642 MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled, blockIndex, blockLength);
646 private String getStatus() {
648 for (int i = 0; i < Common.FILE_PIECES; i++) {