Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines with new year.
[simgrid.git] / examples / deprecated / java / app / bittorrent / Peer.java
1 /* Copyright (c) 2006-2020. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 package app.bittorrent;
7
8 import java.util.ArrayList;
9 import java.util.HashMap;
10 import java.util.Iterator;
11 import java.util.Map.Entry;
12 import java.util.Random;
13
14 import org.simgrid.msg.Msg;
15 import org.simgrid.msg.Comm;
16 import org.simgrid.msg.Host;
17 import org.simgrid.msg.Task;
18 import org.simgrid.msg.Process;
19 import org.simgrid.msg.MsgException;
20
21 public class Peer extends Process {
22   Random rand = new Random();
23   protected int round = 0;
24   protected double beginReceiveTime;
25   protected double deadline;
26   protected int id;
27   protected String mailbox;
28   protected String mailboxTracker;
29   protected String hostname;
30   protected int pieces = 0;
31   protected char[] bitfield = new char[Common.FILE_PIECES];
32   protected char[][] bitfieldBlocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
33   protected short[] piecesCount = new short[Common.FILE_PIECES];
34   protected int piecesRequested = 0;
35   protected ArrayList<Integer> currentPieces = new ArrayList<>();
36   protected int currentPiece = -1;
37   protected HashMap<Integer, Connection> activePeers = new HashMap<>();
38   protected HashMap<Integer, Connection> peers = new HashMap<>();
39   protected Comm commReceived = null;
40
41   public Peer(Host host, String name, String[]args) {
42     super(host,name,args);
43   }
44
45   @Override
46   public void main(String[] args) throws MsgException {
47     //Check arguments
48     if (args.length != 3 && args.length != 2) {
49       Msg.info("Wrong number of arguments");
50     }
51     init(Integer.parseInt(args[0]), (args.length == 3));
52      
53     //Retrieve the deadline
54     deadline = Double.parseDouble(args[1]);
55     if (deadline < 0) {
56       Msg.info("Wrong deadline supplied");
57       return;
58     }
59     Msg.info("Hi, I'm joining the network with id " + id);
60     //Getting peer data from the tracker
61     if (getPeersData()) {
62       Msg.debug("Got " + peers.size() + " peers from the tracker");
63       Msg.debug("Here is my current status: " + getStatus());
64       beginReceiveTime = Msg.getClock();      
65       if (hasFinished()) {
66         pieces = Common.FILE_PIECES;
67         sendHandshakeAll();
68         seedLoop();
69       } else {
70         leechLoop();
71         seedLoop();
72       }
73     } else {
74       Msg.info("Couldn't contact the tracker.");
75     }
76     Msg.info("Here is my current status: " + getStatus());
77   }
78
79   private void leechLoop() {
80     double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
81     Msg.debug("Start downloading.");
82     // Send a "handshake" message to all the peers it got(it couldn't have gotten more than 50 peers anyway)
83     sendHandshakeAll();
84     //Wait for at least one "bitfield" message.
85     waitForPieces();
86     Msg.debug("Starting main leech loop");
87     while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
88       if (commReceived == null) {
89         commReceived = Task.irecv(mailbox);
90       }
91       try {
92         if (commReceived.test()) {
93           handleMessage(commReceived.getTask());
94           commReceived = null;
95         } else {
96           //If the user has a pending interesting
97           if (currentPiece != -1) {
98             sendInterestedToPeers();
99           } else {
100             if (currentPieces.size() < Common.MAX_PIECES) {
101               updateCurrentPiece();
102             }
103           }
104           //We don't execute the choke algorithm if we don't already have a piece
105           if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
106             updateChokedPeers();
107             nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
108           } else {
109             waitFor(1);
110           }
111         }
112       }
113       catch (MsgException e) {
114         e.printStackTrace();
115         commReceived = null;
116       }
117     }
118   }
119
120   private void seedLoop() {
121     double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
122     Msg.debug("Start seeding.");
123     //start the main seed loop
124     while (Msg.getClock() < deadline) {
125       if (commReceived == null) {
126         commReceived = Task.irecv(mailbox);
127       }
128       try {
129         if (commReceived.test()) {
130           handleMessage(commReceived.getTask());
131           commReceived = null;
132         } else {
133           if (Msg.getClock() >= nextChokedUpdate) {
134             updateChokedPeers();
135             //TODO: Change the choked peer algorithm when seeding
136             nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
137           } else {
138             waitFor(1);
139           }
140         }
141       }
142       catch (MsgException e) {
143         commReceived = null;
144       }
145     }
146   }
147
148   /**
149    * @brief Initialize the various peer data
150    * @param id id of the peer to take in the network
151    * @param seed indicates if the peer is a seed
152    */
153   private void init(int id, boolean seed) {
154     this.id = id;
155     this.mailbox = Integer.toString(id);
156     this.mailboxTracker = "tracker_" + Integer.toString(id);
157     if (seed) {
158       for (int i = 0; i < bitfield.length; i++) {
159         bitfield[i] = '1';
160         for (int j = 0; j < bitfieldBlocks[i].length; j++) {
161           bitfieldBlocks[i][j] = '1';
162         }
163       }
164     } else {
165       for (int i = 0; i < bitfield.length; i++) {
166         bitfield[i] = '0';
167         for (int j = 0; j < bitfieldBlocks[i].length; j++) {
168           bitfieldBlocks[i][j] = '0'  ;
169         }
170       }
171     }
172     this.hostname = getHost().getName();
173   }
174
175   private boolean getPeersData() {
176     boolean success = false;
177     double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
178     //Build the task to send to the tracker
179     TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
180
181     while (Msg.getClock() < timeout) {
182       try {
183         Msg.debug("Sending a peer request to the tracker.");
184         taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
185         break;
186       }
187       catch (MsgException e) {
188         e.printStackTrace();
189       }
190     }
191     while (!success && Msg.getClock() < timeout) {
192       commReceived = Task.irecv(this.mailboxTracker);
193       try {
194         commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
195         if (commReceived.getTask() instanceof TrackerTask) {
196           TrackerTask task = (TrackerTask)commReceived.getTask();
197           for (Integer peerId: task.peers) {
198             if (peerId != this.id) {
199               peers.put(peerId, new Connection(peerId));
200             }
201           }
202           success = true;
203         }
204       }
205       catch (MsgException e) {
206         e.printStackTrace();
207       }
208       commReceived = null;
209     }
210     return success;
211   }
212
213   private void handleMessage(Task task) {
214     MessageTask message = (MessageTask)task;
215     Connection remotePeer = peers.get(message.peerId);
216     switch (message.type) {
217       case HANDSHAKE:
218         Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
219         //Check if the peer is in our connection list
220         if (remotePeer == null) {
221           peers.put(message.peerId, new Connection(message.peerId));
222           sendHandshake(message.mailbox);
223         }
224         //Send our bitfield to the pair
225         sendBitfield(message.mailbox);
226       break;
227       case BITFIELD:
228         Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
229         //update the pieces list
230         updatePiecesCountFromBitfield(message.bitfield);
231         //Update the current piece
232         if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
233           updateCurrentPiece();
234         }
235         remotePeer.bitfield  = message.bitfield.clone();
236       break;
237       case INTERESTED:
238         Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
239         assert remotePeer != null;
240         remotePeer.interested = true;
241       break;
242       case NOTINTERESTED:
243         Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
244         assert remotePeer != null;
245         remotePeer.interested = false;
246       break;
247       case UNCHOKE:
248         Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
249         assert remotePeer != null;
250         remotePeer.chokedDownload = false;
251         activePeers.put(remotePeer.id,remotePeer);
252         sendRequestsToPeer(remotePeer);
253       break;
254       case CHOKE:
255         Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
256         assert remotePeer != null;
257         remotePeer.chokedDownload = true;
258         activePeers.remove(remotePeer.id);
259       break;
260       case HAVE:
261         if (remotePeer.bitfield == null) {
262           return;
263         }
264         Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
265         assert message.index >= 0 && message.index < Common.FILE_PIECES;
266         assert remotePeer.bitfield != null;
267         remotePeer.bitfield[message.index] = '1';
268         piecesCount[message.index]++; 
269         //Send interested message to the peer if he has what we want
270         if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
271           remotePeer.amInterested = true;
272           sendInterested(remotePeer.mailbox);
273         }
274         
275         if (currentPieces.contains(message.index)) {
276           int blockIndex = getFirstBlock(message.index);      
277           int blockLength = Common.PIECES_BLOCKS - blockIndex ;
278           blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;    
279           sendRequest(message.mailbox,message.index,blockIndex,blockLength);
280         }
281       break;
282       case REQUEST:
283         assert message.index >= 0 && message.index < Common.FILE_PIECES;
284         if (!remotePeer.chokedUpload) {
285           Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " 
286                     + message.peerId);
287           if (bitfield[message.index] == '1') {
288             sendPiece(message.mailbox,message.index,false,message.blockIndex,message.blockLength);  
289           } else {
290             Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname 
291                       + ") but he is choked" );
292           }
293         }
294       break;
295       case PIECE:
296         if (message.stalled) {
297           Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname 
298                     + ") is stalled");
299         } else {
300           Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" 
301                     + message.issuerHostname + ")");
302           if (bitfield[message.index] == '0') {
303             updateBitfieldBlocks(message.index,message.blockIndex,message.blockLength);
304             if (pieceComplete(message.index)) {
305               piecesRequested--;
306               //Removing the piece from our piece list.
307               currentPieces.remove((Object)Integer.valueOf(message.index));
308               //Setting the fact that we have the piece
309               bitfield[message.index] = '1';
310               pieces++;
311               Msg.debug("My status is now " + getStatus());
312               //Sending the information to all the peers we are connected to
313               sendHave(message.index);
314               //sending UNINTERESTED to peers that doesn't have what we want.
315               updateInterestedAfterReceive();
316             }
317           } else {
318             Msg.debug("However, we already have it.");
319           }
320         }
321       break;
322       default:
323         Msg.error("Unexpected message type: " + message.type);
324         break;
325     }
326     if (remotePeer != null) {
327       remotePeer.addSpeedValue(1 / (Msg.getClock() - beginReceiveTime));
328     }
329     beginReceiveTime = Msg.getClock();
330   }
331
332   private void waitForPieces() {
333     boolean finished = false;
334     while (Msg.getClock() < deadline && !finished) {
335       if (commReceived == null) {
336         commReceived = Task.irecv(mailbox);
337       }
338       try {
339         commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
340         handleMessage(commReceived.getTask());
341         if (currentPiece != -1) {
342           finished = true;
343         }
344         commReceived = null;
345       }
346       catch (MsgException e) {
347         commReceived = null;
348       }
349     }
350   }
351
352   private boolean hasFinished() {
353     for (int i = 0; i < bitfield.length; i++) {
354       if (bitfield[i] == '1') {
355         return true;
356       }
357     }
358     return false;
359   }
360
361   /**
362    * @brief Updates the list of who has a piece from a bitfield
363    * @param bitfield bitfield
364    */
365   private void updatePiecesCountFromBitfield(char[] bitfield) {
366     for (int i = 0; i < Common.FILE_PIECES; i++) {
367       if (bitfield[i] == '1') {
368         piecesCount[i]++;
369       }
370     }
371   }
372
373   /**
374    * Update the piece the peer is currently interested in.
375    * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
376    * If the peer has less than 3 pieces, he chooses a piece at random.
377    * If the peer has more than pieces, he downloads the pieces that are the less
378    * replicated
379    */
380   private void updateCurrentPiece() {
381     if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
382       return;
383     }
384
385     //TODO: trivial min algorithm when pieces >= 3
386     do {
387       currentPiece = rand.nextInt(Common.FILE_PIECES);
388     } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
389
390     currentPieces.add(currentPiece);
391     Msg.debug("New interested piece: " + currentPiece);
392     assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
393   }
394
395   // Update the list of current choked and unchoked peers, using the choke algorithm
396   private void updateChokedPeers() {
397     round = (round + 1) % 3;
398     if (peers.size() == 0) {
399       return;
400     }
401     //remove a peer from the list
402     Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
403     if (it.hasNext()) {
404       Entry<Integer,Connection> e = it.next();
405       Connection peerChoked = e.getValue();
406       peerChoked.chokedUpload = true;
407       sendChoked(peerChoked.mailbox);
408       activePeers.remove(e.getKey());
409     }
410     Connection peerChosen = null;
411     //Separate the case from when the peer is seeding.
412     if (pieces == Common.FILE_PIECES) {
413       //Find the last unchoked peer.
414       double unchokeTime = deadline + 1;
415       for (Connection connection : peers.values()) {
416         if (connection.lastUnchoke < unchokeTime && connection.interested) {
417           peerChosen = connection;
418           unchokeTime = connection.lastUnchoke;
419         }
420       }
421     } else {
422       //Random optimistic unchoking
423       if (round == 0) {
424         int j = 0;
425         do {
426           int i = 0;
427           int idChosen = rand.nextInt(peers.size());
428           for (Connection connection : peers.values()) {
429             if (i == idChosen) {
430               peerChosen = connection;
431               break;
432             }
433             i++;
434           } //TODO: Not really the best way ever
435           if (peerChosen != null && !peerChosen.interested) {
436             peerChosen = null;
437           }
438           j++;
439         } while (peerChosen == null && j < Common.MAXIMUM_PEERS);
440       } else {
441         Connection fastest = null;
442         double fastestSpeed = 0;
443         for (Connection c : peers.values()) {
444           if (c.peerSpeed > fastestSpeed && c.interested && c.chokedUpload) {
445             fastest = c;
446             fastestSpeed = c.peerSpeed;
447           }
448         }
449         peerChosen = fastest;
450       }
451     }
452     if (peerChosen != null) {
453       activePeers.put(peerChosen.id,peerChosen);
454       peerChosen.chokedUpload = false;
455       peerChosen.lastUnchoke = Msg.getClock();
456       sendUnchoked(peerChosen.mailbox);
457     }
458   }
459
460   // Updates our "interested" state about peers: send "not interested" to peers that don't have any more pieces we want.
461   private void updateInterestedAfterReceive() {
462     boolean interested;
463     for (Connection connection : peers.values()) {
464       interested = false;
465       if (connection.amInterested) {
466         for (Integer piece : currentPieces) {
467           if (connection.bitfield[piece] == '1') {
468             interested = true;
469             break;
470           }
471         }
472         if (!interested) {
473           connection.amInterested = false;
474           sendNotInterested(connection.mailbox);
475         }
476       }
477     }
478   }
479
480   private void updateBitfieldBlocks(int index, int blockIndex, int blockLength) {
481     for (int i = blockIndex; i < (blockIndex + blockLength); i++) {
482       bitfieldBlocks[index][i] = '1';
483     }
484   }
485
486   // Returns if a piece is complete in the peer's bitfield.
487   private boolean pieceComplete(int index) {
488     for (int i = 0; i < bitfieldBlocks[index].length; i++) {
489       if (bitfieldBlocks[index][i] == '0') {
490         return false;
491       }
492     }
493     return true;
494   }
495
496   // Returns the first block of a piece that we don't have. 
497   private int getFirstBlock(int piece) {
498     int blockIndex = -1;
499     for (int i = 0; i < Common.PIECES_BLOCKS; i++) {
500       if (bitfieldBlocks[piece][i] == '0') {
501         blockIndex = i;
502         break;
503       }
504     }
505     return blockIndex;
506   }
507
508   /**
509    * @brief Send request messages to a peer that have unchoked us
510    * @param remotePeer peer data to the peer we want to send the request
511    */
512   private void sendRequestsToPeer(Connection remotePeer) {
513     if (remotePeer.bitfield == null) {
514       return;
515     }
516     for (Integer piece : currentPieces) {
517       //Getting the block to send.
518       int blockIndex = getFirstBlock(piece);
519       int blockLength = Common.PIECES_BLOCKS - blockIndex ;
520       blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;    
521       if (remotePeer.bitfield[piece] == '1') {
522         sendRequest(remotePeer.mailbox, piece, blockIndex, blockLength);
523       }
524     }
525   }
526
527   // Find the peers that have the current interested piece and send them the "interested" message
528   private void sendInterestedToPeers() {
529     if (currentPiece == -1) {
530       return;
531     }
532     for (Connection connection : peers.values()) {
533       if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
534         connection.amInterested = true;        
535         MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
536         task.dsend(connection.mailbox);        
537       }
538     }
539     currentPiece = -1;
540     piecesRequested++;
541   }
542
543   // Send a "interested" message to a peer.
544   private void sendInterested(String mailbox) {
545     MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
546     task.dsend(mailbox);
547   }
548
549   /**
550    * @brief Send a "not interested" message to a peer
551    * @param mailbox mailbox destination mailbox
552    */
553   private void sendNotInterested(String mailbox) {
554     MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
555     task.dsend(mailbox);
556   }
557
558   // Send a handshake message to all the peers the peer has.
559   private void sendHandshakeAll() {
560     for (Connection remotePeer : peers.values()) {
561       MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox, id);
562       task.dsend(remotePeer.mailbox);
563     }
564   }
565
566   /**
567    * @brief Send a "handshake" message to an user
568    * @param mailbox mailbox where to we send the message
569    */
570   private void sendHandshake(String mailbox) {
571     Msg.debug("Sending a HANDSHAKE to " + mailbox);
572     MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
573     task.dsend(mailbox);
574   }
575
576   // Send a "choked" message to a peer
577   private void sendChoked(String mailbox) {
578     Msg.debug("Sending a CHOKE to " + mailbox);
579     MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
580     task.dsend(mailbox);
581   }
582
583   // Send a "unchoked" message to a peer
584   private void sendUnchoked(String mailbox) {
585     Msg.debug("Sending a UNCHOKE to " + mailbox);
586     MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
587     task.dsend(mailbox);
588   }
589
590   // Send a "HAVE" message to all peers we are connected to
591   private void sendHave(int piece) {
592     Msg.debug("Sending HAVE message to all my peers");
593     for (Connection remotePeer : peers.values()) {
594       MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
595       task.dsend(remotePeer.mailbox);
596     }
597   }
598   // Send a bitfield message to all the peers the peer has.
599   private void sendBitfield(String mailbox) {
600     Msg.debug("Sending a BITFIELD to " + mailbox);
601     MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
602     task.dsend(mailbox);
603   }
604   // Send a "request" message to a peer, containing a request for a piece
605   private void sendRequest(String mailbox, int piece, int blockIndex, int blockLength) {
606     Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece + " and blocks " + blockIndex + ","
607               + (blockIndex + blockLength));
608     MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece, blockIndex, 
609                                        blockLength);
610     task.dsend(mailbox);
611   }
612
613   // Send a "piece" message to a peer, containing a piece of the file
614   private void sendPiece(String mailbox, int piece, boolean stalled, int blockIndex, int blockLength) {
615     Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
616     MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled,
617                                        blockIndex, blockLength);
618     task.dsend(mailbox);
619   }
620
621   private String getStatus() {
622     StringBuilder s = new StringBuilder("");
623     for (int i = 0; i < Common.FILE_PIECES; i++)
624       s.append(bitfield[i]);
625     return s.toString();
626   }
627 }