Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
create an app package
[simgrid.git] / examples / java / app / bittorrent / Peer.java
1 /* Copyright (c) 2006-2014, 2016. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 package app.bittorrent;
8
9 import java.util.ArrayList;
10 import java.util.HashMap;
11 import java.util.Iterator;
12 import java.util.Map.Entry;
13
14 import org.simgrid.msg.Msg;
15 import org.simgrid.msg.Comm;
16 import org.simgrid.msg.Host;
17 import org.simgrid.msg.Task;
18 import org.simgrid.msg.Process;
19 import org.simgrid.msg.RngStream;
20 import org.simgrid.msg.MsgException;
21
22 public class Peer extends Process {
23   protected int round = 0;
24   protected double beginReceiveTime;
25   protected double deadline;
26   protected static RngStream stream = new RngStream();
27   protected int id;
28   protected String mailbox;
29   protected String mailboxTracker;
30   protected String hostname;
31   protected int pieces = 0;
32   protected char[] bitfield = new char[Common.FILE_PIECES];
33   protected char[][] bitfieldBlocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
34   protected short[] piecesCount = new short[Common.FILE_PIECES];
35   protected int piecesRequested = 0;
36   protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
37   protected int currentPiece = -1;
38   protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();  
39   protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
40   protected Comm commReceived = null;
41
42   public Peer(Host host, String name, String[]args) {
43     super(host,name,args);
44   }
45
46   @Override
47   public void main(String[] args) throws MsgException {
48     //Check arguments
49     if (args.length != 3 && args.length != 2) {
50       Msg.info("Wrong number of arguments");
51     }
52     if (args.length == 3) {
53       init(Integer.valueOf(args[0]),true);
54     } else {
55       init(Integer.valueOf(args[0]),false);
56     }
57     //Retrieve the deadline
58     deadline = Double.valueOf(args[1]);
59     if (deadline < 0) {
60       Msg.info("Wrong deadline supplied");
61       return;
62     }
63     Msg.info("Hi, I'm joining the network with id " + id);
64     //Getting peer data from the tracker
65     if (getPeersData()) {
66       Msg.debug("Got " + peers.size() + " peers from the tracker");
67       Msg.debug("Here is my current status: " + getStatus());
68       beginReceiveTime = Msg.getClock();      
69       if (hasFinished()) {
70         pieces = Common.FILE_PIECES;
71         sendHandshakeAll();
72         seedLoop();
73       } else {
74         leechLoop();
75         seedLoop();
76       }
77     } else {
78       Msg.info("Couldn't contact the tracker.");
79     }
80     Msg.info("Here is my current status: " + getStatus());
81   }
82
83   private void leechLoop() {
84     double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
85     Msg.debug("Start downloading.");
86     // Send a "handshake" message to all the peers it got(it couldn't have gotten more than 50 peers anyway)
87     sendHandshakeAll();
88     //Wait for at least one "bitfield" message.
89     waitForPieces();
90     Msg.debug("Starting main leech loop");
91     while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
92       if (commReceived == null) {
93         commReceived = Task.irecv(mailbox);
94       }
95       try {
96         if (commReceived.test()) {
97           handleMessage(commReceived.getTask());
98           commReceived = null;
99         } else {
100           //If the user has a pending interesting
101           if (currentPiece != -1) {
102             sendInterestedToPeers();
103           } else {
104             if (currentPieces.size() < Common.MAX_PIECES) {
105               updateCurrentPiece();
106             }
107           }
108           //We don't execute the choke algorithm if we don't already have a piece
109           if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
110             updateChokedPeers();
111             nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
112           } else {
113             waitFor(1);
114           }
115         }
116       }
117       catch (MsgException e) {
118         commReceived = null;
119       }
120     }
121   }
122
123   private void seedLoop() {
124     double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
125     Msg.debug("Start seeding.");
126     //start the main seed loop
127     while (Msg.getClock() < deadline) {
128       if (commReceived == null) {
129         commReceived = Task.irecv(mailbox);
130       }
131       try {
132         if (commReceived.test()) {
133           handleMessage(commReceived.getTask());
134           commReceived = null;
135         } else {
136           if (Msg.getClock() >= nextChokedUpdate) {
137             updateChokedPeers();
138             //TODO: Change the choked peer algorithm when seeding
139             nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
140           } else {
141             waitFor(1);
142           }
143         }
144       }
145       catch (MsgException e) {
146         commReceived = null;
147       }
148     }
149   }
150
151   /**
152    * @brief Initialize the various peer data
153    * @param id id of the peer to take in the network
154    * @param seed indicates if the peer is a seed
155    */
156   private void init(int id, boolean seed) {
157     this.id = id;
158     this.mailbox = Integer.toString(id);
159     this.mailboxTracker = "tracker_" + Integer.toString(id);
160     if (seed) {
161       for (int i = 0; i < bitfield.length; i++) {
162         bitfield[i] = '1';
163         for (int j = 0; j < bitfieldBlocks[i].length; j++) {
164           bitfieldBlocks[i][j] = '1';
165         }
166       }
167     } else {
168       for (int i = 0; i < bitfield.length; i++) {
169         bitfield[i] = '0';
170         for (int j = 0; j < bitfieldBlocks[i].length; j++) {
171           bitfieldBlocks[i][j] = '0'  ;
172         }
173       }
174     }
175     this.hostname = getHost().getName();
176   }
177
178   private boolean getPeersData() {
179     boolean success = false, sendSuccess = false;
180     double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
181     //Build the task to send to the tracker
182     TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
183
184     while (!sendSuccess && Msg.getClock() < timeout) {
185       try {
186         Msg.debug("Sending a peer request to the tracker.");
187         taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
188         sendSuccess = true;
189       }
190       catch (MsgException e) {
191       }
192     }
193     while (!success && Msg.getClock() < timeout) {
194       commReceived = Task.irecv(this.mailboxTracker);
195       try {
196         commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
197         if (commReceived.getTask() instanceof TrackerTask) {
198           TrackerTask task = (TrackerTask)commReceived.getTask();
199           for (Integer peerId: task.peers) {
200             if (peerId != this.id) {
201               peers.put(peerId, new Connection(peerId));
202             }
203           }
204           success = true;
205         }
206       }
207       catch (MsgException e) {}
208       commReceived = null;
209     }
210     commReceived = null;
211     return success;
212   }
213
214   void handleMessage(Task task) {
215     MessageTask message = (MessageTask)task;
216     Connection remotePeer = peers.get(message.peerId);
217     switch (message.type) {
218       case HANDSHAKE:
219         Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
220         //Check if the peer is in our connection list
221         if (remotePeer == null) {
222           peers.put(message.peerId, new Connection(message.peerId));
223           sendHandshake(message.mailbox);
224         }
225         //Send our bitfield to the pair
226         sendBitfield(message.mailbox);
227       break;
228       case BITFIELD:
229         Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
230         //update the pieces list
231         updatePiecesCountFromBitfield(message.bitfield);
232         //Update the current piece
233         if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
234           updateCurrentPiece();
235         }
236         remotePeer.bitfield  = message.bitfield.clone();
237       break;
238       case INTERESTED:
239         Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
240         assert remotePeer != null;
241         remotePeer.interested = true;
242       break;
243       case NOTINTERESTED:
244         Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
245         assert remotePeer != null;
246         remotePeer.interested = false;
247       break;
248       case UNCHOKE:
249         Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
250         assert remotePeer != null;
251         remotePeer.chokedDownload = false;
252         activePeers.put(remotePeer.id,remotePeer);
253         sendRequestsToPeer(remotePeer);
254       break;
255       case CHOKE:
256         Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
257         assert remotePeer != null;
258         remotePeer.chokedDownload = true;
259         activePeers.remove(remotePeer.id);
260       break;
261       case HAVE:
262         if (remotePeer.bitfield == null) {
263           return;
264         }
265         Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
266         assert message.index >= 0 && message.index < Common.FILE_PIECES;
267         assert remotePeer.bitfield != null;
268         remotePeer.bitfield[message.index] = '1';
269         piecesCount[message.index]++; 
270         //Send interested message to the peer if he has what we want
271         if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
272           remotePeer.amInterested = true;
273           sendInterested(remotePeer.mailbox);
274         }
275         
276         if (currentPieces.contains(message.index)) {
277           int blockIndex = getFirstBlock(message.index);      
278           int blockLength = Common.PIECES_BLOCKS - blockIndex ;
279           blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;    
280           sendRequest(message.mailbox,message.index,blockIndex,blockLength);
281         }
282       break;
283       case REQUEST:
284         assert message.index >= 0 && message.index < Common.FILE_PIECES;
285         if (!remotePeer.chokedUpload) {
286           Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " 
287                     + message.peerId);
288           if (bitfield[message.index] == '1') {
289             sendPiece(message.mailbox,message.index,false,message.blockIndex,message.blockLength);  
290           } else {
291             Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname 
292                       + ") but he is choked" );
293           }
294         }
295       break;
296       case PIECE:
297         if (message.stalled) {
298           Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname 
299                     + ") is stalled");
300         } else {
301           Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" 
302                     + message.issuerHostname + ")");
303           if (bitfield[message.index] == '0') {
304             updateBitfieldBlocks(message.index,message.blockIndex,message.blockLength);
305             if (pieceComplete(message.index)) {
306               piecesRequested--;
307               //Removing the piece from our piece list.
308               currentPieces.remove((Object)Integer.valueOf(message.index));
309               //Setting the fact that we have the piece
310               bitfield[message.index] = '1';
311               pieces++;
312               Msg.debug("My status is now " + getStatus());
313               //Sending the information to all the peers we are connected to
314               sendHave(message.index);
315               //sending UNINTERESTED to peers that doesn't have what we want.
316               updateInterestedAfterReceive();
317             }
318           } else {
319             Msg.debug("However, we already have it.");
320           }
321         }
322       break;
323     }
324     if (remotePeer != null) {
325       remotePeer.addSpeedValue(1 / (Msg.getClock() - beginReceiveTime));
326     }
327     beginReceiveTime = Msg.getClock();
328   }
329
330   void waitForPieces() {
331     boolean finished = false;
332     while (Msg.getClock() < deadline && !finished) {
333       if (commReceived == null) {
334         commReceived = Task.irecv(mailbox);
335       }
336       try {
337         commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
338         handleMessage(commReceived.getTask());
339         if (currentPiece != -1) {
340           finished = true;
341         }
342         commReceived = null;
343       }
344       catch (MsgException e) {
345         commReceived = null;
346       }
347     }
348   }
349
350   private boolean hasFinished() {
351     for (int i = 0; i < bitfield.length; i++) {
352       if (bitfield[i] == '1') {
353         return true;
354       }
355     }
356     return false;
357   }
358
359   /**
360    * @brief Updates the list of who has a piece from a bitfield
361    * @param bitfield bitfield
362    */
363   private void updatePiecesCountFromBitfield(char bitfield[]) {
364     for (int i = 0; i < Common.FILE_PIECES; i++) {
365       if (bitfield[i] == '1') {
366         piecesCount[i]++;
367       }
368     }
369   }
370
371   /**
372    * Update the piece the peer is currently interested in.
373    * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
374    * If the peer has less than 3 pieces, he chooses a piece at random.
375    * If the peer has more than pieces, he downloads the pieces that are the less
376    * replicated
377    */
378   void updateCurrentPiece() {
379     if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
380       return;
381     }
382     if (true || pieces < 3) {
383       int peerPiece;
384       do {
385         currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
386       } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
387     }
388     else {
389       //trivial min algorithm.
390       //TODO
391     }
392     currentPieces.add(currentPiece);
393     Msg.debug("New interested piece: " + currentPiece);
394     assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
395   }
396
397   // Update the list of current choked and unchoked peers, using the choke algorithm
398   private void updateChokedPeers() {
399     round = (round + 1) % 3;
400     if (peers.size() == 0) {
401       return;
402     }
403     //remove a peer from the list
404     Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
405     if (it.hasNext()) {
406       Entry<Integer,Connection> e = it.next();
407       Connection peerChoked = e.getValue();
408       peerChoked.chokedUpload = true;
409       sendChoked(peerChoked.mailbox);
410       activePeers.remove(e.getKey());
411     }
412     Connection peerChoosed = null;
413     //Separate the case from when the peer is seeding.
414     if (pieces == Common.FILE_PIECES) {
415       //Find the last unchoked peer.
416       double unchokeTime = deadline + 1;
417       for (Connection connection : peers.values()) {
418         if (connection.lastUnchoke < unchokeTime && connection.interested) {
419           peerChoosed = connection;
420           unchokeTime = connection.lastUnchoke;
421         }
422       }
423     } else {
424       //Random optimistic unchoking
425       if (round == 0) {
426         int j = 0, i;
427         do {
428           i = 0;
429           int idChosen = stream.randInt(0,peers.size() - 1);
430           for (Connection connection : peers.values()) {
431             if (i == idChosen) {
432               peerChoosed = connection;
433               break;
434             }
435             i++;
436           } //TODO: Not really the best way ever
437           if (!peerChoosed.interested) {
438             peerChoosed = null;
439           }
440           j++;
441         } while (peerChoosed == null && j < Common.MAXIMUM_PEERS);
442       } else {
443         Connection fastest = null;
444         double fastestSpeed = 0;
445         for (Connection c : peers.values()) {
446           if (c.peerSpeed > fastestSpeed && c.interested && c.chokedUpload) {
447             fastest = c;
448             fastestSpeed = c.peerSpeed;
449           }
450         }
451         peerChoosed = fastest;
452       }
453     }
454     if (peerChoosed != null) {
455       activePeers.put(peerChoosed.id,peerChoosed);
456       peerChoosed.chokedUpload = false;
457       peerChoosed.lastUnchoke = Msg.getClock();
458       sendUnchoked(peerChoosed.mailbox);
459     }
460   }
461
462   // Updates our "interested" state about peers: send "not interested" to peers that don't have any more pieces we want.
463   private void updateInterestedAfterReceive() {
464     boolean interested;
465     for (Connection connection : peers.values()) {
466       interested = false;
467       if (connection.amInterested) {
468         for (Integer piece : currentPieces) {
469           if (connection.bitfield[piece] == '1') {
470             interested = true;
471             break;
472           }
473         }
474         if (!interested) {
475           connection.amInterested = false;
476           sendNotInterested(connection.mailbox);
477         }
478       }
479     }
480   }
481
482   private void updateBitfieldBlocks(int index, int blockIndex, int blockLength) {
483     for (int i = blockIndex; i < (blockIndex + blockLength); i++) {
484       bitfieldBlocks[index][i] = '1';
485     }
486   }
487
488   // Returns if a piece is complete in the peer's bitfield.
489   private boolean pieceComplete(int index) {
490     for (int i = 0; i < bitfieldBlocks[index].length; i++) {
491       if (bitfieldBlocks[index][i] == '0') {
492         return false;
493       }
494     }
495     return true;
496   }
497
498   // Returns the first block of a piece that we don't have. 
499   private int getFirstBlock(int piece) {
500     int blockIndex = -1;
501     for (int i = 0; i < Common.PIECES_BLOCKS; i++) {
502       if (bitfieldBlocks[piece][i] == '0') {
503         blockIndex = i;
504         break;
505       }
506     }
507     return blockIndex;
508   }
509
510   /**
511    * @brief Send request messages to a peer that have unchoked us
512    * @param remotePeer peer data to the peer we want to send the request
513    */
514   private void sendRequestsToPeer(Connection remotePeer) {
515     if (remotePeer.bitfield == null) {
516       return;
517     }
518     for (Integer piece : currentPieces) {
519       //Getting the block to send.  
520       int blockIndex = -1, blockLength = 0;
521       blockIndex = getFirstBlock(piece);      
522       blockLength = Common.PIECES_BLOCKS - blockIndex ;
523       blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;    
524       if (remotePeer.bitfield[piece] == '1') {
525         sendRequest(remotePeer.mailbox, piece, blockIndex, blockLength);
526       }
527     }
528   }
529
530   // Find the peers that have the current interested piece and send them the "interested" message
531   private void sendInterestedToPeers() {
532     if (currentPiece == -1) {
533       return;
534     }
535     for (Connection connection : peers.values()) {
536       if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
537         connection.amInterested = true;        
538         MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
539         task.dsend(connection.mailbox);        
540       }
541     }
542     currentPiece = -1;
543     piecesRequested++;
544   }
545
546   // Send a "interested" message to a peer.
547   private void sendInterested(String mailbox) {
548     MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
549     task.dsend(mailbox);
550   }
551
552   /**
553    * @brief Send a "not interested" message to a peer
554    * @param mailbox mailbox destination mailbox
555    */
556   private void sendNotInterested(String mailbox) {
557     MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
558     task.dsend(mailbox);
559   }
560
561   // Send a handshake message to all the peers the peer has.
562   private void sendHandshakeAll() {
563     for (Connection remotePeer : peers.values()) {
564       MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox, id);
565       task.dsend(remotePeer.mailbox);
566     }
567   }
568
569   /**
570    * @brief Send a "handshake" message to an user
571    * @param mailbox mailbox where to we send the message
572    */
573   private void sendHandshake(String mailbox) {
574     Msg.debug("Sending a HANDSHAKE to " + mailbox);
575     MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
576     task.dsend(mailbox);
577   }
578
579   // Send a "choked" message to a peer
580   private void sendChoked(String mailbox) {
581     Msg.debug("Sending a CHOKE to " + mailbox);
582     MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
583     task.dsend(mailbox);
584   }
585
586   // Send a "unchoked" message to a peer
587   private void sendUnchoked(String mailbox) {
588     Msg.debug("Sending a UNCHOKE to " + mailbox);
589     MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
590     task.dsend(mailbox);
591   }
592
593   // Send a "HAVE" message to all peers we are connected to
594   private void sendHave(int piece) {
595     Msg.debug("Sending HAVE message to all my peers");
596     for (Connection remotePeer : peers.values()) {
597       MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
598       task.dsend(remotePeer.mailbox);
599     }
600   }
601   // Send a bitfield message to all the peers the peer has.
602   private void sendBitfield(String mailbox) {
603     Msg.debug("Sending a BITFIELD to " + mailbox);
604     MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
605     task.dsend(mailbox);
606   }
607   // Send a "request" message to a peer, containing a request for a piece
608   private void sendRequest(String mailbox, int piece, int blockIndex, int blockLength) {
609     Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece + " and blocks " + blockIndex + ","
610               + (blockIndex + blockLength));
611     MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece, blockIndex, 
612                                        blockLength);
613     task.dsend(mailbox);
614   }
615
616   // Send a "piece" message to a peer, containing a piece of the file
617   private void sendPiece(String mailbox, int piece, boolean stalled, int blockIndex, int blockLength) {
618     Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
619     MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled,
620                                        blockIndex, blockLength);
621     task.dsend(mailbox);
622   }
623
624   private String getStatus() {
625     String s = "";
626     for (int i = 0; i < Common.FILE_PIECES; i++) {
627       s = s + bitfield[i];
628     }
629     return s;
630   }
631 }