Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
58b4b489d4394793db23e0e5847fbc5b0a67c568
[simgrid.git] / examples / java / app / bittorrent / Peer.java
1 /* Copyright (c) 2006-2014, 2016. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 package app.bittorrent;
8
9 import java.util.ArrayList;
10 import java.util.HashMap;
11 import java.util.Iterator;
12 import java.util.Map.Entry;
13
14 import org.simgrid.msg.Msg;
15 import org.simgrid.msg.Comm;
16 import org.simgrid.msg.Host;
17 import org.simgrid.msg.Task;
18 import org.simgrid.msg.Process;
19 import org.simgrid.msg.RngStream;
20 import org.simgrid.msg.MsgException;
21
22 public class Peer extends Process {
23   protected int round = 0;
24   protected double beginReceiveTime;
25   protected double deadline;
26   protected static RngStream stream = new RngStream();
27   protected int id;
28   protected String mailbox;
29   protected String mailboxTracker;
30   protected String hostname;
31   protected int pieces = 0;
32   protected char[] bitfield = new char[Common.FILE_PIECES];
33   protected char[][] bitfieldBlocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
34   protected short[] piecesCount = new short[Common.FILE_PIECES];
35   protected int piecesRequested = 0;
36   protected ArrayList<Integer> currentPieces = new ArrayList<>();
37   protected int currentPiece = -1;
38   protected HashMap<Integer, Connection> activePeers = new HashMap<>();
39   protected HashMap<Integer, Connection> peers = new HashMap<>();
40   protected Comm commReceived = null;
41
42   public Peer(Host host, String name, String[]args) {
43     super(host,name,args);
44   }
45
46   @Override
47   public void main(String[] args) throws MsgException {
48     //Check arguments
49     if (args.length != 3 && args.length != 2) {
50       Msg.info("Wrong number of arguments");
51     }
52     if (args.length == 3) {
53       init(Integer.valueOf(args[0]),true);
54     } else {
55       init(Integer.valueOf(args[0]),false);
56     }
57     //Retrieve the deadline
58     deadline = Double.valueOf(args[1]);
59     if (deadline < 0) {
60       Msg.info("Wrong deadline supplied");
61       return;
62     }
63     Msg.info("Hi, I'm joining the network with id " + id);
64     //Getting peer data from the tracker
65     if (getPeersData()) {
66       Msg.debug("Got " + peers.size() + " peers from the tracker");
67       Msg.debug("Here is my current status: " + getStatus());
68       beginReceiveTime = Msg.getClock();      
69       if (hasFinished()) {
70         pieces = Common.FILE_PIECES;
71         sendHandshakeAll();
72         seedLoop();
73       } else {
74         leechLoop();
75         seedLoop();
76       }
77     } else {
78       Msg.info("Couldn't contact the tracker.");
79     }
80     Msg.info("Here is my current status: " + getStatus());
81   }
82
83   private void leechLoop() {
84     double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
85     Msg.debug("Start downloading.");
86     // Send a "handshake" message to all the peers it got(it couldn't have gotten more than 50 peers anyway)
87     sendHandshakeAll();
88     //Wait for at least one "bitfield" message.
89     waitForPieces();
90     Msg.debug("Starting main leech loop");
91     while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
92       if (commReceived == null) {
93         commReceived = Task.irecv(mailbox);
94       }
95       try {
96         if (commReceived.test()) {
97           handleMessage(commReceived.getTask());
98           commReceived = null;
99         } else {
100           //If the user has a pending interesting
101           if (currentPiece != -1) {
102             sendInterestedToPeers();
103           } else {
104             if (currentPieces.size() < Common.MAX_PIECES) {
105               updateCurrentPiece();
106             }
107           }
108           //We don't execute the choke algorithm if we don't already have a piece
109           if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
110             updateChokedPeers();
111             nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
112           } else {
113             waitFor(1);
114           }
115         }
116       }
117       catch (MsgException e) {
118         e.printStackTrace();
119         commReceived = null;
120       }
121     }
122   }
123
124   private void seedLoop() {
125     double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
126     Msg.debug("Start seeding.");
127     //start the main seed loop
128     while (Msg.getClock() < deadline) {
129       if (commReceived == null) {
130         commReceived = Task.irecv(mailbox);
131       }
132       try {
133         if (commReceived.test()) {
134           handleMessage(commReceived.getTask());
135           commReceived = null;
136         } else {
137           if (Msg.getClock() >= nextChokedUpdate) {
138             updateChokedPeers();
139             //TODO: Change the choked peer algorithm when seeding
140             nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
141           } else {
142             waitFor(1);
143           }
144         }
145       }
146       catch (MsgException e) {
147         commReceived = null;
148       }
149     }
150   }
151
152   /**
153    * @brief Initialize the various peer data
154    * @param id id of the peer to take in the network
155    * @param seed indicates if the peer is a seed
156    */
157   private void init(int id, boolean seed) {
158     this.id = id;
159     this.mailbox = Integer.toString(id);
160     this.mailboxTracker = "tracker_" + Integer.toString(id);
161     if (seed) {
162       for (int i = 0; i < bitfield.length; i++) {
163         bitfield[i] = '1';
164         for (int j = 0; j < bitfieldBlocks[i].length; j++) {
165           bitfieldBlocks[i][j] = '1';
166         }
167       }
168     } else {
169       for (int i = 0; i < bitfield.length; i++) {
170         bitfield[i] = '0';
171         for (int j = 0; j < bitfieldBlocks[i].length; j++) {
172           bitfieldBlocks[i][j] = '0'  ;
173         }
174       }
175     }
176     this.hostname = getHost().getName();
177   }
178
179   private boolean getPeersData() {
180     boolean success = false;
181     boolean sendSuccess = false;
182     double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
183     //Build the task to send to the tracker
184     TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
185
186     while (!sendSuccess && Msg.getClock() < timeout) {
187       try {
188         Msg.debug("Sending a peer request to the tracker.");
189         taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
190         sendSuccess = true;
191       }
192       catch (MsgException e) {
193         e.printStackTrace();
194       }
195     }
196     while (!success && Msg.getClock() < timeout) {
197       commReceived = Task.irecv(this.mailboxTracker);
198       try {
199         commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
200         if (commReceived.getTask() instanceof TrackerTask) {
201           TrackerTask task = (TrackerTask)commReceived.getTask();
202           for (Integer peerId: task.peers) {
203             if (peerId != this.id) {
204               peers.put(peerId, new Connection(peerId));
205             }
206           }
207           success = true;
208         }
209       }
210       catch (MsgException e) {
211         e.printStackTrace();
212       }
213       commReceived = null;
214     }
215     commReceived = null;
216     return success;
217   }
218
219   void handleMessage(Task task) {
220     MessageTask message = (MessageTask)task;
221     Connection remotePeer = peers.get(message.peerId);
222     switch (message.type) {
223       case HANDSHAKE:
224         Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
225         //Check if the peer is in our connection list
226         if (remotePeer == null) {
227           peers.put(message.peerId, new Connection(message.peerId));
228           sendHandshake(message.mailbox);
229         }
230         //Send our bitfield to the pair
231         sendBitfield(message.mailbox);
232       break;
233       case BITFIELD:
234         Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
235         //update the pieces list
236         updatePiecesCountFromBitfield(message.bitfield);
237         //Update the current piece
238         if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
239           updateCurrentPiece();
240         }
241         remotePeer.bitfield  = message.bitfield.clone();
242       break;
243       case INTERESTED:
244         Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
245         assert remotePeer != null;
246         remotePeer.interested = true;
247       break;
248       case NOTINTERESTED:
249         Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
250         assert remotePeer != null;
251         remotePeer.interested = false;
252       break;
253       case UNCHOKE:
254         Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
255         assert remotePeer != null;
256         remotePeer.chokedDownload = false;
257         activePeers.put(remotePeer.id,remotePeer);
258         sendRequestsToPeer(remotePeer);
259       break;
260       case CHOKE:
261         Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
262         assert remotePeer != null;
263         remotePeer.chokedDownload = true;
264         activePeers.remove(remotePeer.id);
265       break;
266       case HAVE:
267         if (remotePeer.bitfield == null) {
268           return;
269         }
270         Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
271         assert message.index >= 0 && message.index < Common.FILE_PIECES;
272         assert remotePeer.bitfield != null;
273         remotePeer.bitfield[message.index] = '1';
274         piecesCount[message.index]++; 
275         //Send interested message to the peer if he has what we want
276         if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
277           remotePeer.amInterested = true;
278           sendInterested(remotePeer.mailbox);
279         }
280         
281         if (currentPieces.contains(message.index)) {
282           int blockIndex = getFirstBlock(message.index);      
283           int blockLength = Common.PIECES_BLOCKS - blockIndex ;
284           blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;    
285           sendRequest(message.mailbox,message.index,blockIndex,blockLength);
286         }
287       break;
288       case REQUEST:
289         assert message.index >= 0 && message.index < Common.FILE_PIECES;
290         if (!remotePeer.chokedUpload) {
291           Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " 
292                     + message.peerId);
293           if (bitfield[message.index] == '1') {
294             sendPiece(message.mailbox,message.index,false,message.blockIndex,message.blockLength);  
295           } else {
296             Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname 
297                       + ") but he is choked" );
298           }
299         }
300       break;
301       case PIECE:
302         if (message.stalled) {
303           Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname 
304                     + ") is stalled");
305         } else {
306           Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" 
307                     + message.issuerHostname + ")");
308           if (bitfield[message.index] == '0') {
309             updateBitfieldBlocks(message.index,message.blockIndex,message.blockLength);
310             if (pieceComplete(message.index)) {
311               piecesRequested--;
312               //Removing the piece from our piece list.
313               currentPieces.remove((Object)Integer.valueOf(message.index));
314               //Setting the fact that we have the piece
315               bitfield[message.index] = '1';
316               pieces++;
317               Msg.debug("My status is now " + getStatus());
318               //Sending the information to all the peers we are connected to
319               sendHave(message.index);
320               //sending UNINTERESTED to peers that doesn't have what we want.
321               updateInterestedAfterReceive();
322             }
323           } else {
324             Msg.debug("However, we already have it.");
325           }
326         }
327       break;
328     }
329     if (remotePeer != null) {
330       remotePeer.addSpeedValue(1 / (Msg.getClock() - beginReceiveTime));
331     }
332     beginReceiveTime = Msg.getClock();
333   }
334
335   void waitForPieces() {
336     boolean finished = false;
337     while (Msg.getClock() < deadline && !finished) {
338       if (commReceived == null) {
339         commReceived = Task.irecv(mailbox);
340       }
341       try {
342         commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
343         handleMessage(commReceived.getTask());
344         if (currentPiece != -1) {
345           finished = true;
346         }
347         commReceived = null;
348       }
349       catch (MsgException e) {
350         commReceived = null;
351       }
352     }
353   }
354
355   private boolean hasFinished() {
356     for (int i = 0; i < bitfield.length; i++) {
357       if (bitfield[i] == '1') {
358         return true;
359       }
360     }
361     return false;
362   }
363
364   /**
365    * @brief Updates the list of who has a piece from a bitfield
366    * @param bitfield bitfield
367    */
368   private void updatePiecesCountFromBitfield(char[] bitfield) {
369     for (int i = 0; i < Common.FILE_PIECES; i++) {
370       if (bitfield[i] == '1') {
371         piecesCount[i]++;
372       }
373     }
374   }
375
376   /**
377    * Update the piece the peer is currently interested in.
378    * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
379    * If the peer has less than 3 pieces, he chooses a piece at random.
380    * If the peer has more than pieces, he downloads the pieces that are the less
381    * replicated
382    */
383   void updateCurrentPiece() {
384     if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
385       return;
386     }
387 //    if (pieces < 3) {
388       do {
389         currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
390       } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
391 //    }
392 //    else {
393       //TODO trivial min algorithm.
394 //    }
395     currentPieces.add(currentPiece);
396     Msg.debug("New interested piece: " + currentPiece);
397     assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
398   }
399
400   // Update the list of current choked and unchoked peers, using the choke algorithm
401   private void updateChokedPeers() {
402     round = (round + 1) % 3;
403     if (peers.size() == 0) {
404       return;
405     }
406     //remove a peer from the list
407     Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
408     if (it.hasNext()) {
409       Entry<Integer,Connection> e = it.next();
410       Connection peerChoked = e.getValue();
411       peerChoked.chokedUpload = true;
412       sendChoked(peerChoked.mailbox);
413       activePeers.remove(e.getKey());
414     }
415     Connection peerChoosed = null;
416     //Separate the case from when the peer is seeding.
417     if (pieces == Common.FILE_PIECES) {
418       //Find the last unchoked peer.
419       double unchokeTime = deadline + 1;
420       for (Connection connection : peers.values()) {
421         if (connection.lastUnchoke < unchokeTime && connection.interested) {
422           peerChoosed = connection;
423           unchokeTime = connection.lastUnchoke;
424         }
425       }
426     } else {
427       //Random optimistic unchoking
428       if (round == 0) {
429         int j = 0;
430         do {
431           int i = 0;
432           int idChosen = stream.randInt(0,peers.size() - 1);
433           for (Connection connection : peers.values()) {
434             if (i == idChosen) {
435               peerChoosed = connection;
436               break;
437             }
438             i++;
439           } //TODO: Not really the best way ever
440           if (peerChoosed != null && !peerChoosed.interested) {
441             peerChoosed = null;
442           }
443           j++;
444         } while (peerChoosed == null && j < Common.MAXIMUM_PEERS);
445       } else {
446         Connection fastest = null;
447         double fastestSpeed = 0;
448         for (Connection c : peers.values()) {
449           if (c.peerSpeed > fastestSpeed && c.interested && c.chokedUpload) {
450             fastest = c;
451             fastestSpeed = c.peerSpeed;
452           }
453         }
454         peerChoosed = fastest;
455       }
456     }
457     if (peerChoosed != null) {
458       activePeers.put(peerChoosed.id,peerChoosed);
459       peerChoosed.chokedUpload = false;
460       peerChoosed.lastUnchoke = Msg.getClock();
461       sendUnchoked(peerChoosed.mailbox);
462     }
463   }
464
465   // Updates our "interested" state about peers: send "not interested" to peers that don't have any more pieces we want.
466   private void updateInterestedAfterReceive() {
467     boolean interested;
468     for (Connection connection : peers.values()) {
469       interested = false;
470       if (connection.amInterested) {
471         for (Integer piece : currentPieces) {
472           if (connection.bitfield[piece] == '1') {
473             interested = true;
474             break;
475           }
476         }
477         if (!interested) {
478           connection.amInterested = false;
479           sendNotInterested(connection.mailbox);
480         }
481       }
482     }
483   }
484
485   private void updateBitfieldBlocks(int index, int blockIndex, int blockLength) {
486     for (int i = blockIndex; i < (blockIndex + blockLength); i++) {
487       bitfieldBlocks[index][i] = '1';
488     }
489   }
490
491   // Returns if a piece is complete in the peer's bitfield.
492   private boolean pieceComplete(int index) {
493     for (int i = 0; i < bitfieldBlocks[index].length; i++) {
494       if (bitfieldBlocks[index][i] == '0') {
495         return false;
496       }
497     }
498     return true;
499   }
500
501   // Returns the first block of a piece that we don't have. 
502   private int getFirstBlock(int piece) {
503     int blockIndex = -1;
504     for (int i = 0; i < Common.PIECES_BLOCKS; i++) {
505       if (bitfieldBlocks[piece][i] == '0') {
506         blockIndex = i;
507         break;
508       }
509     }
510     return blockIndex;
511   }
512
513   /**
514    * @brief Send request messages to a peer that have unchoked us
515    * @param remotePeer peer data to the peer we want to send the request
516    */
517   private void sendRequestsToPeer(Connection remotePeer) {
518     if (remotePeer.bitfield == null) {
519       return;
520     }
521     for (Integer piece : currentPieces) {
522       //Getting the block to send.
523       int blockIndex = getFirstBlock(piece);
524       int blockLength = Common.PIECES_BLOCKS - blockIndex ;
525       blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;    
526       if (remotePeer.bitfield[piece] == '1') {
527         sendRequest(remotePeer.mailbox, piece, blockIndex, blockLength);
528       }
529     }
530   }
531
532   // Find the peers that have the current interested piece and send them the "interested" message
533   private void sendInterestedToPeers() {
534     if (currentPiece == -1) {
535       return;
536     }
537     for (Connection connection : peers.values()) {
538       if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
539         connection.amInterested = true;        
540         MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
541         task.dsend(connection.mailbox);        
542       }
543     }
544     currentPiece = -1;
545     piecesRequested++;
546   }
547
548   // Send a "interested" message to a peer.
549   private void sendInterested(String mailbox) {
550     MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
551     task.dsend(mailbox);
552   }
553
554   /**
555    * @brief Send a "not interested" message to a peer
556    * @param mailbox mailbox destination mailbox
557    */
558   private void sendNotInterested(String mailbox) {
559     MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
560     task.dsend(mailbox);
561   }
562
563   // Send a handshake message to all the peers the peer has.
564   private void sendHandshakeAll() {
565     for (Connection remotePeer : peers.values()) {
566       MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox, id);
567       task.dsend(remotePeer.mailbox);
568     }
569   }
570
571   /**
572    * @brief Send a "handshake" message to an user
573    * @param mailbox mailbox where to we send the message
574    */
575   private void sendHandshake(String mailbox) {
576     Msg.debug("Sending a HANDSHAKE to " + mailbox);
577     MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
578     task.dsend(mailbox);
579   }
580
581   // Send a "choked" message to a peer
582   private void sendChoked(String mailbox) {
583     Msg.debug("Sending a CHOKE to " + mailbox);
584     MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
585     task.dsend(mailbox);
586   }
587
588   // Send a "unchoked" message to a peer
589   private void sendUnchoked(String mailbox) {
590     Msg.debug("Sending a UNCHOKE to " + mailbox);
591     MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
592     task.dsend(mailbox);
593   }
594
595   // Send a "HAVE" message to all peers we are connected to
596   private void sendHave(int piece) {
597     Msg.debug("Sending HAVE message to all my peers");
598     for (Connection remotePeer : peers.values()) {
599       MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
600       task.dsend(remotePeer.mailbox);
601     }
602   }
603   // Send a bitfield message to all the peers the peer has.
604   private void sendBitfield(String mailbox) {
605     Msg.debug("Sending a BITFIELD to " + mailbox);
606     MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
607     task.dsend(mailbox);
608   }
609   // Send a "request" message to a peer, containing a request for a piece
610   private void sendRequest(String mailbox, int piece, int blockIndex, int blockLength) {
611     Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece + " and blocks " + blockIndex + ","
612               + (blockIndex + blockLength));
613     MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece, blockIndex, 
614                                        blockLength);
615     task.dsend(mailbox);
616   }
617
618   // Send a "piece" message to a peer, containing a piece of the file
619   private void sendPiece(String mailbox, int piece, boolean stalled, int blockIndex, int blockLength) {
620     Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
621     MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled,
622                                        blockIndex, blockLength);
623     task.dsend(mailbox);
624   }
625
626   private String getStatus() {
627     String s = "";
628     for (int i = 0; i < Common.FILE_PIECES; i++) {
629       s = s + bitfield[i];
630     }
631     return s;
632   }
633 }