Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge concerns - Adrien
[simgrid.git] / examples / bittorrent / Peer.java
1 package bittorrent;
2
3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.Iterator;
6 import java.util.Map.Entry;
7
8 import org.simgrid.msg.Comm;
9 import org.simgrid.msg.Host;
10 import org.simgrid.msg.Msg;
11 import org.simgrid.msg.MsgException;
12 import org.simgrid.msg.RngStream;
13 import org.simgrid.msg.Process;
14 import org.simgrid.msg.Task;
15
16 /**
17  * Main class for peers execution
18  */
19 public class Peer extends Process {
20         protected int round = 0;
21         
22         protected double deadline;
23         
24         protected static RngStream stream = new RngStream();
25         
26         protected int id;
27         protected String mailbox;
28         protected String mailboxTracker;
29         protected String hostname;
30         protected int pieces = 0;
31         protected char[] bitfield = new char[Common.FILE_PIECES];
32         protected short[] piecesCount = new short[Common.FILE_PIECES];
33         
34         protected int piecesRequested = 0;
35         
36         protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
37         protected int currentPiece = -1;
38
39         protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();        
40         protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
41         
42         protected Comm commReceived = null;
43
44         public Peer(Host host, String name, String[]args) {
45                 super(host,name,args);
46         }       
47         
48         @Override
49         public void main(String[] args) throws MsgException {
50                 //Check arguments
51                 if (args.length != 3 && args.length != 2) {
52                         Msg.info("Wrong number of arguments");
53                 }
54                 if (args.length == 3) {
55                         init(Integer.valueOf(args[0]),true);
56                 }
57                 else {
58                         init(Integer.valueOf(args[0]),false);
59                 }
60                 //Retrieve the deadline
61                 deadline = Double.valueOf(args[1]);
62                 if (deadline < 0) {
63                         Msg.info("Wrong deadline supplied");
64                         return;
65                 }
66                 Msg.info("Hi, I'm joining the network with id " + id);
67                 //Getting peer data from the tracker
68                 if (getPeersData()) {
69                         Msg.debug("Got " + peers.size() + " peers from the tracker");
70                         Msg.debug("Here is my current status: " + getStatus());
71                         if (hasFinished()) {
72                                 pieces = Common.FILE_PIECES;
73                                 sendHandshakeAll();
74                                 seedLoop();
75                         }
76                         else {
77                                 leechLoop();
78                                 seedLoop();
79                         }
80                 }
81                 else {
82                         Msg.info("Couldn't contact the tracker.");
83                 }
84                 Msg.info("Here is my current status: " + getStatus());
85         }
86         /**
87          * Peer main loop when it is leeching.
88          */
89         private void leechLoop() {
90                 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
91                 Msg.debug("Start downloading.");
92                 /**
93                  * Send a "handshake" message to all the peers it got
94                  * (it couldn't have gotten more than 50 peers anyway)
95                  */
96                 sendHandshakeAll();
97                 //Wait for at least one "bitfield" message.
98                 waitForPieces();
99                 Msg.debug("Starting main leech loop");
100                 while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
101                         if (commReceived == null) {
102                                 commReceived = Task.irecv(mailbox);
103                         }
104                         try {
105                                 if (commReceived.test()) {
106                                         handleMessage(commReceived.getTask());
107                                         commReceived = null;
108                                 }
109                                 else {
110                                         //If the user has a pending interesting
111                                         if (currentPiece != -1) {
112                                                 sendInterestedToPeers();
113                                         }
114                                         else {
115                                                 if (currentPieces.size() < Common.MAX_PIECES) {
116                                                         updateCurrentPiece();
117                                                 }
118                                         }
119                                         //We don't execute the choke algorithm if we don't already have a piece
120                                         if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
121                                                 updateChokedPeers();
122                                                 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
123                                         }
124                                         else {
125                                                 waitFor(1);
126                                         }
127                                 }
128                         }
129                         catch (MsgException e) {
130                                 commReceived = null;                            
131                         }
132                 }
133         }
134         
135         /**
136          * Peer main loop when it is seeding
137          */
138         private void seedLoop() {
139                 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
140                 Msg.debug("Start seeding.");
141                 //start the main seed loop
142                 while (Msg.getClock() < deadline) {
143                         if (commReceived == null) {
144                                 commReceived = Task.irecv(mailbox);
145                         }
146                         try {
147                                 if (commReceived.test()) {
148                                         handleMessage(commReceived.getTask());
149                                         commReceived = null;
150                                 }
151                                 else {
152                                         if (Msg.getClock() >= nextChokedUpdate) {
153                                                 updateChokedPeers();
154                                                 //TODO: Change the choked peer algorithm when seeding
155                                                 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
156                                         }
157                                         else {
158                                                 waitFor(1);
159                                         }
160                                 }
161                         }
162                         catch (MsgException e) {
163                                 commReceived = null;                            
164                         }
165
166                 }
167         }
168         
169         /**
170          * Initialize the various peer data
171          * @param id id of the peer to take in the network
172          * @param seed indicates if the peer is a seed
173          */
174         private void init(int id, boolean seed) {
175                 this.id = id;
176                 this.mailbox = Integer.toString(id);
177                 this.mailboxTracker = "tracker_" + Integer.toString(id);
178                 if (seed) {
179                         for (int i = 0; i < bitfield.length; i++) {
180                                 bitfield[i] = '1';
181                         }
182                 }
183                 else {
184                         for (int i = 0; i < bitfield.length; i++) {
185                                 bitfield[i] = '0';
186                         }                       
187                 }
188                 this.hostname = host.getName();
189         }
190         /**
191          * Retrieves the peer list from the tracker
192          */
193         private boolean getPeersData() {
194                 
195                 boolean success = false, sendSuccess = false;
196                 double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
197                 //Build the task to send to the tracker
198                 TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
199                         
200                 while (!sendSuccess && Msg.getClock() < timeout) {
201                         try {
202                                 Msg.debug("Sending a peer request to the tracker.");
203                                 taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
204                                 sendSuccess = true;
205                         }
206                         catch (MsgException e) {
207                                 
208                         }
209                 }
210                 while (!success && Msg.getClock() < timeout) {
211                         commReceived = Task.irecv(this.mailboxTracker);
212                         try {
213                                 commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
214                                 if (commReceived.getTask() instanceof TrackerTask) {
215                                         TrackerTask task = (TrackerTask)commReceived.getTask();
216                                         for (Integer peerId: task.peers) {
217                                                 if (peerId != this.id) {
218                                                         peers.put(peerId, new Connection(peerId));
219                                                 }       
220                                         }
221                                         success = true;
222                                 }
223                         }
224                         catch (MsgException e) {
225                                 
226                         }
227                         commReceived = null;
228                 }
229                 commReceived = null;
230                 return success;
231         }
232         /**
233          * Handle a received message sent by another peer
234          * @param task task received.
235          */
236         void handleMessage(Task task) {
237                 MessageTask message = (MessageTask)task;
238                 Connection remotePeer = peers.get(message.peerId);
239                 switch (message.type) {
240                         case HANDSHAKE:
241                                 Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
242                                 //Check if the peer is in our connection list
243                                 if (remotePeer == null) {
244                                         peers.put(message.peerId, new Connection(message.peerId));
245                                         sendHandshake(message.mailbox);
246                                 }
247                                 //Send our bitfield to the pair
248                                 sendBitfield(message.mailbox);
249                         break;
250                         case BITFIELD:
251                                 Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
252                                 //update the pieces list
253                                 updatePiecesCountFromBitfield(message.bitfield);
254                                 //Update the current piece
255                                 if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
256                                         updateCurrentPiece();
257                                 }                               
258                                 remotePeer.bitfield  = message.bitfield.clone();
259                         break;
260                         case INTERESTED:
261                                 Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
262                                 assert remotePeer != null;
263                                 remotePeer.interested = true;
264                         break;
265                         case NOTINTERESTED:
266                                 Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
267                                 assert remotePeer != null;
268                                 remotePeer.interested = false;
269                         break;
270                         case UNCHOKE:
271                                 Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
272                                 assert remotePeer != null;
273                                 remotePeer.chokedDownload = false;
274                                 activePeers.put(remotePeer.id,remotePeer);
275                                 sendRequestsToPeer(remotePeer);
276                         break;
277                         case CHOKE:
278                                 Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
279                                 assert remotePeer != null;
280                                 remotePeer.chokedDownload = true;
281                                 activePeers.remove(remotePeer.id);
282                         break;
283                         case HAVE:
284                                 if (remotePeer.bitfield == null) {
285                                         return;
286                                 }
287                                 Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
288                                 assert message.index >= 0 && message.index < Common.FILE_PIECES;
289                                 assert remotePeer.bitfield != null;
290                                 remotePeer.bitfield[message.index] = '1';
291                                 piecesCount[message.index]++; 
292                                 //Send interested message to the peer if he has what we want
293                                 if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
294                                         remotePeer.amInterested = true;
295                                         sendInterested(remotePeer.mailbox);
296                                 }
297                                 
298                                 if (currentPieces.contains(message.index)) {
299                                         sendRequest(message.mailbox,message.index);
300                                 }
301                         break;
302                         case REQUEST:
303                                 assert message.index >= 0 && message.index < Common.FILE_PIECES;
304                                 if (!remotePeer.chokedUpload) {
305                                         Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
306                                         if (bitfield[message.index] == '1') {
307                                                 sendPiece(message.mailbox,message.index,false); 
308                                         }
309                                         else {
310                                                 Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
311                                         }
312                                 }
313                         break;
314                         case PIECE:
315                                 if (message.stalled) {
316                                         Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
317                                 }
318                                 else {
319                                         Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
320                                         if (bitfield[message.index] == '0') {
321                                                 piecesRequested--;
322                                                 //Removing the piece from our piece list.
323                                                 if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
324                                                 }
325                                                 //Setting the fact that we have the piece
326                                                 bitfield[message.index] = '1';
327                                                 pieces++;
328                                                 Msg.debug("My status is now " + getStatus());
329                                                 //Sending the information to all the peers we are connected to
330                                                 sendHave(message.index);
331                                                 //sending UNINTERSTED to peers that doesn't have what we want.
332                                                 updateInterestedAfterReceive();
333                                         }
334                                         else {
335                                                 Msg.debug("However, we already have it.");
336                                         }
337                                 }
338                         break;
339                 }
340         }
341         /**
342          * Wait for the node to receive interesting bitfield messages (ie: non empty)
343          * to be received
344          */
345         void waitForPieces() {
346                 boolean finished = false;
347                 while (Msg.getClock() < deadline && !finished) {
348                         if (commReceived == null) {
349                                 commReceived = Task.irecv(mailbox);
350                         }
351                         try {
352                                 commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
353                                 handleMessage(commReceived.getTask());
354                                 if (currentPiece != -1) {
355                                         finished = true;
356                                 }
357                                 commReceived = null;
358                         }
359                         catch (MsgException e) {
360                                 commReceived = null;
361                         }
362                 }
363         }
364         
365         private boolean hasFinished() {
366                 for (int i = 0; i < bitfield.length; i++) {
367                         if (bitfield[i] == '1') {
368                                 return true;
369                         }
370                 }
371                 return false;
372         }
373         /**
374          * Updates the list of who has a piece from a bitfield
375          * @param bitfield bitfield
376          */
377         private void updatePiecesCountFromBitfield(char bitfield[]) {
378                 for (int i = 0; i < Common.FILE_PIECES; i++) {
379                         if (bitfield[i] == '1') {
380                                 piecesCount[i]++;
381                         }
382                 }
383         }
384         /**
385          * Update the piece the peer is currently interested in.
386          * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
387          * If the peer has less than 3 pieces, he chooses a piece at random.
388          * If the peer has more than pieces, he downloads the pieces that are the less
389          * replicated
390          */
391         void updateCurrentPiece() {
392                 if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
393                         return;
394                 }
395                 if (true || pieces < 3) {
396                         int i = 0, peerPiece;
397                         do {
398                                 currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
399                                 i++;
400                         } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
401                 }
402                 else {
403                         //trivial min algorithm.
404                         //TODO
405                 }
406                 currentPieces.add(currentPiece);
407                 Msg.debug("New interested piece: " + currentPiece);
408                 assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
409         }
410         /**
411          * Update the list of current choked and unchoked peers, using the
412          * choke algorithm
413          */
414         private void updateChokedPeers() {
415                 round = (round + 1) % 3;
416                 if (peers.size() == 0) {
417                         return;
418                 }
419                 //remove a peer from the list
420                 Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
421                 if (it.hasNext()) {
422                         Entry<Integer,Connection> e = it.next();
423                         Connection peerChoked = e.getValue();
424                         sendChoked(peerChoked.mailbox);
425                         peerChoked.chokedUpload = true;
426                         activePeers.remove(e.getKey());
427                 }
428                 //Random optimistic unchoking
429                 if (round == 0 || true) {
430                         int j = 0, i;
431                         Connection peerChoosed = null;
432                         do {
433                                 i = 0;
434                                 int idChosen = ((int)Msg.getClock() + j) % peers.size();
435                                 for (Connection connection : peers.values()) {
436                                         if (i == idChosen) {
437                                                 peerChoosed = connection;
438                                                 break;
439                                         }
440                                         i++;
441                                 } //TODO: Not really the best way ever
442                                 if (!peerChoosed.interested) {
443                                         peerChoosed = null;
444                                 }
445                                 j++;
446                         } while (peerChoosed == null && j < Common.MAXIMUM_PAIRS);
447                         if (peerChoosed != null) {
448                                 activePeers.put(peerChoosed.id,peerChoosed);
449                                 peerChoosed.chokedUpload = false;
450                                 sendUnchoked(peerChoosed.mailbox);
451                         }
452                 }
453                 //TODO: Use the leecher choke algorithm.
454         }
455         /**     
456          * Updates our "interested" state about peers: send "not interested" to peers
457          * that don't have any more pieces we want.
458          */
459         private void updateInterestedAfterReceive() {
460                 boolean interested;
461                 for (Connection connection : peers.values()) {
462                         interested = false;
463                         if (connection.amInterested) {
464                                 for (Integer piece : currentPieces) {
465                                         if (connection.bitfield[piece] == '1') {
466                                                 interested = true;
467                                                 break;
468                                         }
469                                 }       
470                                 if (!interested) {
471                                         connection.amInterested = false;
472                                         sendNotInterested(connection.mailbox);
473                                 }
474                         }
475                 }
476         }
477         /**
478          * Send request messages to a peer that have unchoked us
479          * @param remotePeer peer data to the peer we want to send the request
480          */
481         private void sendRequestsToPeer(Connection remotePeer) {
482                 for (Integer piece : currentPieces) {
483                         if (remotePeer.bitfield != null && remotePeer.bitfield[piece] == '1') {
484                                 sendRequest(remotePeer.mailbox, piece);
485                         }                       
486                 }
487         }       
488         /**
489          * Find the peers that have the current interested piece and send them
490          * the "interested" message
491          */
492         private void sendInterestedToPeers() {
493                 if (currentPiece == -1) {
494                         return;
495                 }
496                 for (Connection connection : peers.values()) {
497                         if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
498                                 connection.amInterested = true;                         
499                                 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
500                                 task.dsend(connection.mailbox);                         
501                         }
502                 }
503                 currentPiece = -1;
504                 piecesRequested++;
505         }
506         /**
507          * Send a "interested" message to a peer.
508          */
509         private void sendInterested(String mailbox) {
510                 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
511                 task.dsend(mailbox);                                            
512         }
513         /**
514          * Send a "not interested" message to a peer
515          * @param mailbox mailbox destination mailbox
516          */
517         private void sendNotInterested(String mailbox) {
518                 MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
519                 task.dsend(mailbox);                            
520         }
521         /**
522          * Send a handshake message to all the peers the peer has.
523          * @param peer peer data
524          */
525         private void sendHandshakeAll() {
526                 for (Connection remotePeer : peers.values()) {
527                         MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
528                         id);
529                         task.dsend(remotePeer.mailbox);
530                 }
531         }
532         /**
533          * Send a "handshake" message to an user
534          * @param mailbox mailbox where to we send the message
535          */
536         private void sendHandshake(String mailbox) {
537                 Msg.debug("Sending a HANDSHAKE to " + mailbox);
538                 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
539                 task.dsend(mailbox);            
540         }
541         /**
542          * Send a "choked" message to a peer
543          */
544         private void sendChoked(String mailbox) {
545                 Msg.debug("Sending a CHOKE to " + mailbox);
546                 MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
547                 task.dsend(mailbox);
548         }
549         /**
550          * Send a "unchoked" message to a peer
551          */
552         private void sendUnchoked(String mailbox) {
553                 Msg.debug("Sending a UNCHOKE to " + mailbox);
554                 MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
555                 task.dsend(mailbox);
556         }
557         /**
558          * Send a "HAVE" message to all peers we are connected to
559          */
560         private void sendHave(int piece) {
561                 Msg.debug("Sending HAVE message to all my peers");
562                 for (Connection remotePeer : peers.values()) {
563                         MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
564                         task.dsend(remotePeer.mailbox);
565                 }
566         }
567         /**
568          * Send a bitfield message to all the peers the peer has.
569          * @param peer peer data
570          */
571         private void sendBitfield(String mailbox) {
572                 Msg.debug("Sending a BITFIELD to " + mailbox);
573                 MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
574                 task.dsend(mailbox);
575         }
576         /**
577          * Send a "request" message to a pair, containing a request for a piece
578          */
579         private void sendRequest(String mailbox, int piece) {
580                 Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece);
581                 MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece);
582                 task.dsend(mailbox);
583         }
584         /**
585          * Send a "piece" message to a pair, containing a piece of the file
586          */
587         private void sendPiece(String mailbox, int piece, boolean stalled) {
588                 Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
589                 MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled);
590                 task.dsend(mailbox);
591         }
592         
593         private String getStatus() {
594                 String s = "";
595                 for (int i = 0; i < Common.FILE_PIECES; i++) {
596                         s = s + bitfield[i];
597                 }
598                 return s;
599         }
600 }
601