3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.Iterator;
6 import java.util.Map.Entry;
8 import org.simgrid.msg.Comm;
9 import org.simgrid.msg.Host;
10 import org.simgrid.msg.Msg;
11 import org.simgrid.msg.MsgException;
12 import org.simgrid.msg.RngStream;
13 import org.simgrid.msg.Process;
14 import org.simgrid.msg.Task;
17 * Main class for peers execution
19 public class Peer extends Process {
20 protected int round = 0;
22 protected double deadline;
24 protected static RngStream stream = new RngStream();
27 protected String mailbox;
28 protected String mailboxTracker;
29 protected String hostname;
30 protected int pieces = 0;
31 protected char[] bitfield = new char[Common.FILE_PIECES];
32 protected char[][] bitfield_blocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
34 protected short[] piecesCount = new short[Common.FILE_PIECES];
36 protected int piecesRequested = 0;
38 protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
39 protected int currentPiece = -1;
41 protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();
42 protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
44 protected Comm commReceived = null;
46 public Peer(Host host, String name, String[]args) {
47 super(host,name,args);
51 public void main(String[] args) throws MsgException {
53 if (args.length != 3 && args.length != 2) {
54 Msg.info("Wrong number of arguments");
56 if (args.length == 3) {
57 init(Integer.valueOf(args[0]),true);
60 init(Integer.valueOf(args[0]),false);
62 //Retrieve the deadline
63 deadline = Double.valueOf(args[1]);
65 Msg.info("Wrong deadline supplied");
68 Msg.info("Hi, I'm joining the network with id " + id);
69 //Getting peer data from the tracker
71 Msg.debug("Got " + peers.size() + " peers from the tracker");
72 Msg.debug("Here is my current status: " + getStatus());
74 pieces = Common.FILE_PIECES;
84 Msg.info("Couldn't contact the tracker.");
86 Msg.info("Here is my current status: " + getStatus());
89 * Peer main loop when it is leeching.
91 private void leechLoop() {
92 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
93 Msg.debug("Start downloading.");
95 * Send a "handshake" message to all the peers it got
96 * (it couldn't have gotten more than 50 peers anyway)
99 //Wait for at least one "bitfield" message.
101 Msg.debug("Starting main leech loop");
102 while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
103 if (commReceived == null) {
104 commReceived = Task.irecv(mailbox);
107 if (commReceived.test()) {
108 handleMessage(commReceived.getTask());
112 //If the user has a pending interesting
113 if (currentPiece != -1) {
114 sendInterestedToPeers();
117 if (currentPieces.size() < Common.MAX_PIECES) {
118 updateCurrentPiece();
121 //We don't execute the choke algorithm if we don't already have a piece
122 if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
124 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
131 catch (MsgException e) {
138 * Peer main loop when it is seeding
140 private void seedLoop() {
141 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
142 Msg.debug("Start seeding.");
143 //start the main seed loop
144 while (Msg.getClock() < deadline) {
145 if (commReceived == null) {
146 commReceived = Task.irecv(mailbox);
149 if (commReceived.test()) {
150 handleMessage(commReceived.getTask());
154 if (Msg.getClock() >= nextChokedUpdate) {
156 //TODO: Change the choked peer algorithm when seeding
157 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
164 catch (MsgException e) {
172 * Initialize the various peer data
173 * @param id id of the peer to take in the network
174 * @param seed indicates if the peer is a seed
176 private void init(int id, boolean seed) {
178 this.mailbox = Integer.toString(id);
179 this.mailboxTracker = "tracker_" + Integer.toString(id);
181 for (int i = 0; i < bitfield.length; i++) {
186 for (int i = 0; i < bitfield.length; i++) {
190 this.hostname = host.getName();
193 * Retrieves the peer list from the tracker
195 private boolean getPeersData() {
197 boolean success = false, sendSuccess = false;
198 double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
199 //Build the task to send to the tracker
200 TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
202 while (!sendSuccess && Msg.getClock() < timeout) {
204 Msg.debug("Sending a peer request to the tracker.");
205 taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
208 catch (MsgException e) {
212 while (!success && Msg.getClock() < timeout) {
213 commReceived = Task.irecv(this.mailboxTracker);
215 commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
216 if (commReceived.getTask() instanceof TrackerTask) {
217 TrackerTask task = (TrackerTask)commReceived.getTask();
218 for (Integer peerId: task.peers) {
219 if (peerId != this.id) {
220 peers.put(peerId, new Connection(peerId));
226 catch (MsgException e) {
235 * Handle a received message sent by another peer
236 * @param task task received.
238 void handleMessage(Task task) {
239 MessageTask message = (MessageTask)task;
240 Connection remotePeer = peers.get(message.peerId);
241 switch (message.type) {
243 Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
244 //Check if the peer is in our connection list
245 if (remotePeer == null) {
246 peers.put(message.peerId, new Connection(message.peerId));
247 sendHandshake(message.mailbox);
249 //Send our bitfield to the pair
250 sendBitfield(message.mailbox);
253 Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
254 //update the pieces list
255 updatePiecesCountFromBitfield(message.bitfield);
256 //Update the current piece
257 if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
258 updateCurrentPiece();
260 remotePeer.bitfield = message.bitfield.clone();
263 Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
264 assert remotePeer != null;
265 remotePeer.interested = true;
268 Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
269 assert remotePeer != null;
270 remotePeer.interested = false;
273 Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
274 assert remotePeer != null;
275 remotePeer.chokedDownload = false;
276 activePeers.put(remotePeer.id,remotePeer);
277 sendRequestsToPeer(remotePeer);
280 Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
281 assert remotePeer != null;
282 remotePeer.chokedDownload = true;
283 activePeers.remove(remotePeer.id);
286 if (remotePeer.bitfield == null) {
289 Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
290 assert message.index >= 0 && message.index < Common.FILE_PIECES;
291 assert remotePeer.bitfield != null;
292 remotePeer.bitfield[message.index] = '1';
293 piecesCount[message.index]++;
294 //Send interested message to the peer if he has what we want
295 if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
296 remotePeer.amInterested = true;
297 sendInterested(remotePeer.mailbox);
300 if (currentPieces.contains(message.index)) {
301 sendRequest(message.mailbox,message.index);
305 assert message.index >= 0 && message.index < Common.FILE_PIECES;
306 if (!remotePeer.chokedUpload) {
307 Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
308 if (bitfield[message.index] == '1') {
309 sendPiece(message.mailbox,message.index,false);
312 Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
317 if (message.stalled) {
318 Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
321 Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
322 if (bitfield[message.index] == '0') {
324 //Removing the piece from our piece list.
325 if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
327 //Setting the fact that we have the piece
328 bitfield[message.index] = '1';
330 Msg.debug("My status is now " + getStatus());
331 //Sending the information to all the peers we are connected to
332 sendHave(message.index);
333 //sending UNINTERESTED to peers that doesn't have what we want.
334 updateInterestedAfterReceive();
337 Msg.debug("However, we already have it.");
344 * Wait for the node to receive interesting bitfield messages (ie: non empty)
347 void waitForPieces() {
348 boolean finished = false;
349 while (Msg.getClock() < deadline && !finished) {
350 if (commReceived == null) {
351 commReceived = Task.irecv(mailbox);
354 commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
355 handleMessage(commReceived.getTask());
356 if (currentPiece != -1) {
361 catch (MsgException e) {
367 private boolean hasFinished() {
368 for (int i = 0; i < bitfield.length; i++) {
369 if (bitfield[i] == '1') {
376 * Updates the list of who has a piece from a bitfield
377 * @param bitfield bitfield
379 private void updatePiecesCountFromBitfield(char bitfield[]) {
380 for (int i = 0; i < Common.FILE_PIECES; i++) {
381 if (bitfield[i] == '1') {
387 * Update the piece the peer is currently interested in.
388 * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
389 * If the peer has less than 3 pieces, he chooses a piece at random.
390 * If the peer has more than pieces, he downloads the pieces that are the less
393 void updateCurrentPiece() {
394 if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
397 if (true || pieces < 3) {
398 int i = 0, peerPiece;
400 currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
402 } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
405 //trivial min algorithm.
408 currentPieces.add(currentPiece);
409 Msg.debug("New interested piece: " + currentPiece);
410 assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
413 * Update the list of current choked and unchoked peers, using the
416 private void updateChokedPeers() {
417 round = (round + 1) % 3;
418 if (peers.size() == 0) {
421 //remove a peer from the list
422 Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
424 Entry<Integer,Connection> e = it.next();
425 Connection peerChoked = e.getValue();
426 sendChoked(peerChoked.mailbox);
427 peerChoked.chokedUpload = true;
428 activePeers.remove(e.getKey());
430 //Random optimistic unchoking
431 if (round == 0 || true) {
433 Connection peerChoosed = null;
436 int idChosen = stream.randInt(0,peers.size() - 1);
437 for (Connection connection : peers.values()) {
439 peerChoosed = connection;
443 } //TODO: Not really the best way ever
444 if (!peerChoosed.interested) {
448 } while (peerChoosed == null && j <
449 Common.MAXIMUM_PEERS);
450 if (peerChoosed != null) {
451 activePeers.put(peerChoosed.id,peerChoosed);
452 peerChoosed.chokedUpload = false;
453 sendUnchoked(peerChoosed.mailbox);
456 //TODO: Use the leecher choke algorithm.
459 * Updates our "interested" state about peers: send "not interested" to peers
460 * that don't have any more pieces we want.
462 private void updateInterestedAfterReceive() {
464 for (Connection connection : peers.values()) {
466 if (connection.amInterested) {
467 for (Integer piece : currentPieces) {
468 if (connection.bitfield[piece] == '1') {
474 connection.amInterested = false;
475 sendNotInterested(connection.mailbox);
481 * Send request messages to a peer that have unchoked us
482 * @param remotePeer peer data to the peer we want to send the request
484 private void sendRequestsToPeer(Connection remotePeer) {
485 for (Integer piece : currentPieces) {
486 if (remotePeer.bitfield != null && remotePeer.bitfield[piece] == '1') {
487 sendRequest(remotePeer.mailbox, piece);
492 * Find the peers that have the current interested piece and send them
493 * the "interested" message
495 private void sendInterestedToPeers() {
496 if (currentPiece == -1) {
499 for (Connection connection : peers.values()) {
500 if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
501 connection.amInterested = true;
502 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
503 task.dsend(connection.mailbox);
510 * Send a "interested" message to a peer.
512 private void sendInterested(String mailbox) {
513 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
517 * Send a "not interested" message to a peer
518 * @param mailbox mailbox destination mailbox
520 private void sendNotInterested(String mailbox) {
521 MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
525 * Send a handshake message to all the peers the peer has.
526 * @param peer peer data
528 private void sendHandshakeAll() {
529 for (Connection remotePeer : peers.values()) {
530 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
532 task.dsend(remotePeer.mailbox);
536 * Send a "handshake" message to an user
537 * @param mailbox mailbox where to we send the message
539 private void sendHandshake(String mailbox) {
540 Msg.debug("Sending a HANDSHAKE to " + mailbox);
541 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
545 * Send a "choked" message to a peer
547 private void sendChoked(String mailbox) {
548 Msg.debug("Sending a CHOKE to " + mailbox);
549 MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
553 * Send a "unchoked" message to a peer
555 private void sendUnchoked(String mailbox) {
556 Msg.debug("Sending a UNCHOKE to " + mailbox);
557 MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
561 * Send a "HAVE" message to all peers we are connected to
563 private void sendHave(int piece) {
564 Msg.debug("Sending HAVE message to all my peers");
565 for (Connection remotePeer : peers.values()) {
566 MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
567 task.dsend(remotePeer.mailbox);
571 * Send a bitfield message to all the peers the peer has.
572 * @param peer peer data
574 private void sendBitfield(String mailbox) {
575 Msg.debug("Sending a BITFIELD to " + mailbox);
576 MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
580 * Send a "request" message to a pair, containing a request for a piece
582 private void sendRequest(String mailbox, int piece) {
583 Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece);
584 MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece);
588 * Send a "piece" message to a pair, containing a piece of the file
590 private void sendPiece(String mailbox, int piece, boolean stalled) {
591 Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
592 MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled);
596 private String getStatus() {
598 for (int i = 0; i < Common.FILE_PIECES; i++) {