3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.Iterator;
6 import java.util.Map.Entry;
8 import org.simgrid.msg.Comm;
9 import org.simgrid.msg.Host;
10 import org.simgrid.msg.Msg;
11 import org.simgrid.msg.MsgException;
12 import org.simgrid.msg.RngStream;
13 import org.simgrid.msg.Process;
14 import org.simgrid.msg.Task;
17 * Main class for peers execution
19 public class Peer extends Process {
20 protected int round = 0;
22 protected double deadline;
24 protected static RngStream stream = new RngStream();
27 protected String mailbox;
28 protected String mailboxTracker;
29 protected String hostname;
30 protected int pieces = 0;
31 protected char[] bitfield = new char[Common.FILE_PIECES];
32 protected short[] piecesCount = new short[Common.FILE_PIECES];
34 protected int piecesRequested = 0;
36 protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
37 protected int currentPiece = -1;
39 protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();
40 protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
42 protected Comm commReceived = null;
44 public Peer(Host host, String name, String[]args) {
45 super(host,name,args);
49 public void main(String[] args) throws MsgException {
51 if (args.length != 3 && args.length != 2) {
52 Msg.info("Wrong number of arguments");
54 if (args.length == 3) {
55 init(Integer.valueOf(args[0]),true);
58 init(Integer.valueOf(args[0]),false);
60 //Retrieve the deadline
61 deadline = Double.valueOf(args[1]);
63 Msg.info("Wrong deadline supplied");
66 Msg.info("Hi, I'm joining the network with id " + id);
67 //Getting peer data from the tracker
69 Msg.debug("Got " + peers.size() + " peers from the tracker");
70 Msg.debug("Here is my current status: " + getStatus());
72 pieces = Common.FILE_PIECES;
82 Msg.info("Couldn't contact the tracker.");
84 Msg.info("Here is my current status: " + getStatus());
87 * Peer main loop when it is leeching.
89 private void leechLoop() {
90 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
91 Msg.debug("Start downloading.");
93 * Send a "handshake" message to all the peers it got
94 * (it couldn't have gotten more than 50 peers anyway)
97 //Wait for at least one "bitfield" message.
99 Msg.debug("Starting main leech loop");
100 while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
101 if (commReceived == null) {
102 commReceived = Task.irecv(mailbox);
105 if (commReceived.test()) {
106 handleMessage(commReceived.getTask());
110 //If the user has a pending interesting
111 if (currentPiece != -1) {
112 sendInterestedToPeers();
115 if (currentPieces.size() < Common.MAX_PIECES) {
116 updateCurrentPiece();
119 //We don't execute the choke algorithm if we don't already have a piece
120 if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
122 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
129 catch (MsgException e) {
136 * Peer main loop when it is seeding
138 private void seedLoop() {
139 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
140 Msg.debug("Start seeding.");
141 //start the main seed loop
142 while (Msg.getClock() < deadline) {
143 if (commReceived == null) {
144 commReceived = Task.irecv(mailbox);
147 if (commReceived.test()) {
148 handleMessage(commReceived.getTask());
152 if (Msg.getClock() >= nextChokedUpdate) {
154 //TODO: Change the choked peer algorithm when seeding
155 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
162 catch (MsgException e) {
170 * Initialize the various peer data
171 * @param id id of the peer to take in the network
172 * @param seed indicates if the peer is a seed
174 private void init(int id, boolean seed) {
176 this.mailbox = Integer.toString(id);
177 this.mailboxTracker = "tracker_" + Integer.toString(id);
179 for (int i = 0; i < bitfield.length; i++) {
184 for (int i = 0; i < bitfield.length; i++) {
188 this.hostname = host.getName();
191 * Retrieves the peer list from the tracker
193 private boolean getPeersData() {
195 boolean success = false, sendSuccess = false;
196 double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
197 //Build the task to send to the tracker
198 TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
200 while (!sendSuccess && Msg.getClock() < timeout) {
202 Msg.debug("Sending a peer request to the tracker.");
203 taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
206 catch (MsgException e) {
210 while (!success && Msg.getClock() < timeout) {
211 commReceived = Task.irecv(this.mailboxTracker);
213 commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
214 if (commReceived.getTask() instanceof TrackerTask) {
215 TrackerTask task = (TrackerTask)commReceived.getTask();
216 for (Integer peerId: task.peers) {
217 if (peerId != this.id) {
218 peers.put(peerId, new Connection(peerId));
224 catch (MsgException e) {
233 * Handle a received message sent by another peer
234 * @param task task received.
236 void handleMessage(Task task) {
237 MessageTask message = (MessageTask)task;
238 Connection remotePeer = peers.get(message.peerId);
239 switch (message.type) {
241 Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
242 //Check if the peer is in our connection list
243 if (remotePeer == null) {
244 peers.put(message.peerId, new Connection(message.peerId));
245 sendHandshake(message.mailbox);
247 //Send our bitfield to the pair
248 sendBitfield(message.mailbox);
251 Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
252 //update the pieces list
253 updatePiecesCountFromBitfield(message.bitfield);
254 //Update the current piece
255 if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
256 updateCurrentPiece();
258 remotePeer.bitfield = message.bitfield.clone();
261 Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
262 assert remotePeer != null;
263 remotePeer.interested = true;
266 Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
267 assert remotePeer != null;
268 remotePeer.interested = false;
271 Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
272 assert remotePeer != null;
273 remotePeer.chokedDownload = false;
274 activePeers.put(remotePeer.id,remotePeer);
275 sendRequestsToPeer(remotePeer);
278 Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
279 assert remotePeer != null;
280 remotePeer.chokedDownload = true;
281 activePeers.remove(remotePeer.id);
284 if (remotePeer.bitfield == null) {
287 Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
288 assert message.index >= 0 && message.index < Common.FILE_PIECES;
289 assert remotePeer.bitfield != null;
290 remotePeer.bitfield[message.index] = '1';
291 piecesCount[message.index]++;
292 //Send interested message to the peer if he has what we want
293 if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
294 remotePeer.amInterested = true;
295 sendInterested(remotePeer.mailbox);
298 if (currentPieces.contains(message.index)) {
299 sendRequest(message.mailbox,message.index);
303 assert message.index >= 0 && message.index < Common.FILE_PIECES;
304 if (!remotePeer.chokedUpload) {
305 Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
306 if (bitfield[message.index] == '1') {
307 sendPiece(message.mailbox,message.index,false);
310 Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
315 if (message.stalled) {
316 Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
319 Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
320 if (bitfield[message.index] == '0') {
322 //Removing the piece from our piece list.
323 if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
325 //Setting the fact that we have the piece
326 bitfield[message.index] = '1';
328 Msg.debug("My status is now " + getStatus());
329 //Sending the information to all the peers we are connected to
330 sendHave(message.index);
331 //sending UNINTERESTED to peers that doesn't have what we want.
332 updateInterestedAfterReceive();
335 Msg.debug("However, we already have it.");
342 * Wait for the node to receive interesting bitfield messages (ie: non empty)
345 void waitForPieces() {
346 boolean finished = false;
347 while (Msg.getClock() < deadline && !finished) {
348 if (commReceived == null) {
349 commReceived = Task.irecv(mailbox);
352 commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
353 handleMessage(commReceived.getTask());
354 if (currentPiece != -1) {
359 catch (MsgException e) {
365 private boolean hasFinished() {
366 for (int i = 0; i < bitfield.length; i++) {
367 if (bitfield[i] == '1') {
374 * Updates the list of who has a piece from a bitfield
375 * @param bitfield bitfield
377 private void updatePiecesCountFromBitfield(char bitfield[]) {
378 for (int i = 0; i < Common.FILE_PIECES; i++) {
379 if (bitfield[i] == '1') {
385 * Update the piece the peer is currently interested in.
386 * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
387 * If the peer has less than 3 pieces, he chooses a piece at random.
388 * If the peer has more than pieces, he downloads the pieces that are the less
391 void updateCurrentPiece() {
392 if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
395 if (true || pieces < 3) {
396 int i = 0, peerPiece;
398 currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
400 } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
403 //trivial min algorithm.
406 currentPieces.add(currentPiece);
407 Msg.debug("New interested piece: " + currentPiece);
408 assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
411 * Update the list of current choked and unchoked peers, using the
414 private void updateChokedPeers() {
415 round = (round + 1) % 3;
416 if (peers.size() == 0) {
419 //remove a peer from the list
420 Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
422 Entry<Integer,Connection> e = it.next();
423 Connection peerChoked = e.getValue();
424 sendChoked(peerChoked.mailbox);
425 peerChoked.chokedUpload = true;
426 activePeers.remove(e.getKey());
428 //Random optimistic unchoking
429 if (round == 0 || true) {
431 Connection peerChoosed = null;
434 int idChosen = stream.randInt(0,peers.size() - 1);
435 for (Connection connection : peers.values()) {
437 peerChoosed = connection;
441 } //TODO: Not really the best way ever
442 if (!peerChoosed.interested) {
446 } while (peerChoosed == null && j <
447 Common.MAXIMUM_PEERS);
448 if (peerChoosed != null) {
449 activePeers.put(peerChoosed.id,peerChoosed);
450 peerChoosed.chokedUpload = false;
451 sendUnchoked(peerChoosed.mailbox);
454 //TODO: Use the leecher choke algorithm.
457 * Updates our "interested" state about peers: send "not interested" to peers
458 * that don't have any more pieces we want.
460 private void updateInterestedAfterReceive() {
462 for (Connection connection : peers.values()) {
464 if (connection.amInterested) {
465 for (Integer piece : currentPieces) {
466 if (connection.bitfield[piece] == '1') {
472 connection.amInterested = false;
473 sendNotInterested(connection.mailbox);
479 * Send request messages to a peer that have unchoked us
480 * @param remotePeer peer data to the peer we want to send the request
482 private void sendRequestsToPeer(Connection remotePeer) {
483 for (Integer piece : currentPieces) {
484 if (remotePeer.bitfield != null && remotePeer.bitfield[piece] == '1') {
485 sendRequest(remotePeer.mailbox, piece);
490 * Find the peers that have the current interested piece and send them
491 * the "interested" message
493 private void sendInterestedToPeers() {
494 if (currentPiece == -1) {
497 for (Connection connection : peers.values()) {
498 if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
499 connection.amInterested = true;
500 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
501 task.dsend(connection.mailbox);
508 * Send a "interested" message to a peer.
510 private void sendInterested(String mailbox) {
511 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
515 * Send a "not interested" message to a peer
516 * @param mailbox mailbox destination mailbox
518 private void sendNotInterested(String mailbox) {
519 MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
523 * Send a handshake message to all the peers the peer has.
524 * @param peer peer data
526 private void sendHandshakeAll() {
527 for (Connection remotePeer : peers.values()) {
528 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
530 task.dsend(remotePeer.mailbox);
534 * Send a "handshake" message to an user
535 * @param mailbox mailbox where to we send the message
537 private void sendHandshake(String mailbox) {
538 Msg.debug("Sending a HANDSHAKE to " + mailbox);
539 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
543 * Send a "choked" message to a peer
545 private void sendChoked(String mailbox) {
546 Msg.debug("Sending a CHOKE to " + mailbox);
547 MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
551 * Send a "unchoked" message to a peer
553 private void sendUnchoked(String mailbox) {
554 Msg.debug("Sending a UNCHOKE to " + mailbox);
555 MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
559 * Send a "HAVE" message to all peers we are connected to
561 private void sendHave(int piece) {
562 Msg.debug("Sending HAVE message to all my peers");
563 for (Connection remotePeer : peers.values()) {
564 MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
565 task.dsend(remotePeer.mailbox);
569 * Send a bitfield message to all the peers the peer has.
570 * @param peer peer data
572 private void sendBitfield(String mailbox) {
573 Msg.debug("Sending a BITFIELD to " + mailbox);
574 MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
578 * Send a "request" message to a pair, containing a request for a piece
580 private void sendRequest(String mailbox, int piece) {
581 Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece);
582 MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece);
586 * Send a "piece" message to a pair, containing a piece of the file
588 private void sendPiece(String mailbox, int piece, boolean stalled) {
589 Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
590 MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled);
594 private String getStatus() {
596 for (int i = 0; i < Common.FILE_PIECES; i++) {