Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Bugfix in bittorrent example
[simgrid.git] / examples / bittorrent / Peer.java
1 package bittorrent;
2
3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.Iterator;
6 import java.util.Map.Entry;
7
8 import org.simgrid.msg.Comm;
9 import org.simgrid.msg.Host;
10 import org.simgrid.msg.Msg;
11 import org.simgrid.msg.MsgException;
12 import org.simgrid.msg.RngStream;
13 import org.simgrid.msg.Process;
14 import org.simgrid.msg.Task;
15
16 /**
17  * Main class for peers execution
18  */
19 public class Peer extends Process {
20         protected int round = 0;
21         
22         protected double deadline;
23         
24         protected static RngStream stream = new RngStream();
25         
26         protected int id;
27         protected String mailbox;
28         protected String mailboxTracker;
29         protected String hostname;
30         protected int pieces = 0;
31         protected char[] bitfield = new char[Common.FILE_PIECES];
32         protected short[] piecesCount = new short[Common.FILE_PIECES];
33         
34         protected int piecesRequested = 0;
35         
36         protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
37         protected int currentPiece = -1;
38
39         protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();        
40         protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
41         
42         protected Comm commReceived = null;
43
44         public Peer(Host host, String name, String[]args) {
45                 super(host,name,args);
46         }       
47         
48         @Override
49         public void main(String[] args) throws MsgException {
50                 //Check arguments
51                 if (args.length != 3 && args.length != 2) {
52                         Msg.info("Wrong number of arguments");
53                 }
54                 if (args.length == 3) {
55                         init(Integer.valueOf(args[0]),true);
56                 }
57                 else {
58                         init(Integer.valueOf(args[0]),false);
59                 }
60                 //Retrieve the deadline
61                 deadline = Double.valueOf(args[1]);
62                 if (deadline < 0) {
63                         Msg.info("Wrong deadline supplied");
64                         return;
65                 }
66                 Msg.info("Hi, I'm joining the network with id " + id);
67                 //Getting peer data from the tracker
68                 if (getPeersData()) {
69                         Msg.debug("Got " + peers.size() + " peers from the tracker");
70                         Msg.debug("Here is my current status: " + getStatus());
71                         if (hasFinished()) {
72                                 pieces = Common.FILE_PIECES;
73                                 sendHandshakeAll();
74                                 seedLoop();
75                         }
76                         else {
77                                 leechLoop();
78                                 seedLoop();
79                         }
80                 }
81                 else {
82                         Msg.info("Couldn't contact the tracker.");
83                 }
84                 Msg.info("Here is my current status: " + getStatus());
85         }
86         /**
87          * Peer main loop when it is leeching.
88          */
89         private void leechLoop() {
90                 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
91                 Msg.debug("Start downloading.");
92                 /**
93                  * Send a "handshake" message to all the peers it got
94                  * (it couldn't have gotten more than 50 peers anyway)
95                  */
96                 sendHandshakeAll();
97                 //Wait for at least one "bitfield" message.
98                 waitForPieces();
99                 Msg.debug("Starting main leech loop");
100                 while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
101                         if (commReceived == null) {
102                                 commReceived = Task.irecv(mailbox);
103                         }
104                         try {
105                                 if (commReceived.test()) {
106                                         handleMessage(commReceived.getTask());
107                                         commReceived = null;
108                                 }
109                                 else {
110                                         //If the user has a pending interesting
111                                         if (currentPiece != -1) {
112                                                 sendInterestedToPeers();
113                                         }
114                                         else {
115                                                 if (currentPieces.size() < Common.MAX_PIECES) {
116                                                         updateCurrentPiece();
117                                                 }
118                                         }
119                                         //We don't execute the choke algorithm if we don't already have a piece
120                                         if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
121                                                 updateChokedPeers();
122                                                 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
123                                         }
124                                         else {
125                                                 waitFor(1);
126                                         }
127                                 }
128                         }
129                         catch (MsgException e) {
130                                 commReceived = null;                            
131                         }
132                 }
133         }
134         
135         /**
136          * Peer main loop when it is seeding
137          */
138         private void seedLoop() {
139                 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
140                 Msg.debug("Start seeding.");
141                 //start the main seed loop
142                 while (Msg.getClock() < deadline) {
143                         if (commReceived == null) {
144                                 commReceived = Task.irecv(mailbox);
145                         }
146                         try {
147                                 if (commReceived.test()) {
148                                         handleMessage(commReceived.getTask());
149                                         commReceived = null;
150                                 }
151                                 else {
152                                         if (Msg.getClock() >= nextChokedUpdate) {
153                                                 updateChokedPeers();
154                                                 //TODO: Change the choked peer algorithm when seeding
155                                                 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
156                                         }
157                                         else {
158                                                 waitFor(1);
159                                         }
160                                 }
161                         }
162                         catch (MsgException e) {
163                                 commReceived = null;                            
164                         }
165
166                 }
167         }
168         
169         /**
170          * Initialize the various peer data
171          * @param id id of the peer to take in the network
172          * @param seed indicates if the peer is a seed
173          */
174         private void init(int id, boolean seed) {
175                 this.id = id;
176                 this.mailbox = Integer.toString(id);
177                 this.mailboxTracker = "tracker_" + Integer.toString(id);
178                 if (seed) {
179                         for (int i = 0; i < bitfield.length; i++) {
180                                 bitfield[i] = '1';
181                         }
182                 }
183                 else {
184                         for (int i = 0; i < bitfield.length; i++) {
185                                 bitfield[i] = '0';
186                         }                       
187                 }
188                 this.hostname = host.getName();
189         }
190         /**
191          * Retrieves the peer list from the tracker
192          */
193         private boolean getPeersData() {
194                 
195                 boolean success = false, sendSuccess = false;
196                 double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
197                 //Build the task to send to the tracker
198                 TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
199                         
200                 while (!sendSuccess && Msg.getClock() < timeout) {
201                         try {
202                                 Msg.debug("Sending a peer request to the tracker.");
203                                 taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
204                                 sendSuccess = true;
205                         }
206                         catch (MsgException e) {
207                                 
208                         }
209                 }
210                 while (!success && Msg.getClock() < timeout) {
211                         commReceived = Task.irecv(this.mailboxTracker);
212                         try {
213                                 commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
214                                 if (commReceived.getTask() instanceof TrackerTask) {
215                                         TrackerTask task = (TrackerTask)commReceived.getTask();
216                                         for (Integer peerId: task.peers) {
217                                                 if (peerId != this.id) {
218                                                         peers.put(peerId, new Connection(peerId));
219                                                 }       
220                                         }
221                                         success = true;
222                                 }
223                         }
224                         catch (MsgException e) {
225                                 
226                         }
227                         commReceived = null;
228                 }
229                 commReceived = null;
230                 return success;
231         }
232         /**
233          * Handle a received message sent by another peer
234          * @param task task received.
235          */
236         void handleMessage(Task task) {
237                 MessageTask message = (MessageTask)task;
238                 Connection remotePeer = peers.get(message.peerId);
239                 switch (message.type) {
240                         case HANDSHAKE:
241                                 Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
242                                 //Check if the peer is in our connection list
243                                 if (remotePeer == null) {
244                                         peers.put(message.peerId, new Connection(message.peerId));
245                                         sendHandshake(message.mailbox);
246                                 }
247                                 //Send our bitfield to the pair
248                                 sendBitfield(message.mailbox);
249                         break;
250                         case BITFIELD:
251                                 Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
252                                 //update the pieces list
253                                 updatePiecesCountFromBitfield(message.bitfield);
254                                 //Update the current piece
255                                 if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
256                                         updateCurrentPiece();
257                                 }                               
258                                 remotePeer.bitfield  = message.bitfield.clone();
259                         break;
260                         case INTERESTED:
261                                 Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
262                                 assert remotePeer != null;
263                                 remotePeer.interested = true;
264                         break;
265                         case NOTINTERESTED:
266                                 Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
267                                 assert remotePeer != null;
268                                 remotePeer.interested = false;
269                         break;
270                         case UNCHOKE:
271                                 Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
272                                 assert remotePeer != null;
273                                 remotePeer.chokedDownload = false;
274                                 activePeers.put(remotePeer.id,remotePeer);
275                                 sendRequestsToPeer(remotePeer);
276                         break;
277                         case CHOKE:
278                                 Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
279                                 assert remotePeer != null;
280                                 remotePeer.chokedDownload = true;
281                                 activePeers.remove(remotePeer.id);
282                         break;
283                         case HAVE:
284                                 if (remotePeer.bitfield == null) {
285                                         return;
286                                 }
287                                 Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
288                                 assert message.index >= 0 && message.index < Common.FILE_PIECES;
289                                 assert remotePeer.bitfield != null;
290                                 remotePeer.bitfield[message.index] = '1';
291                                 piecesCount[message.index]++; 
292                                 //Send interested message to the peer if he has what we want
293                                 if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
294                                         remotePeer.amInterested = true;
295                                         sendInterested(remotePeer.mailbox);
296                                 }
297                                 
298                                 if (currentPieces.contains(message.index)) {
299                                         sendRequest(message.mailbox,message.index);
300                                 }
301                         break;
302                         case REQUEST:
303                                 assert message.index >= 0 && message.index < Common.FILE_PIECES;
304                                 if (!remotePeer.chokedUpload) {
305                                         Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
306                                         if (bitfield[message.index] == '1') {
307                                                 sendPiece(message.mailbox,message.index,false); 
308                                         }
309                                         else {
310                                                 Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
311                                         }
312                                 }
313                         break;
314                         case PIECE:
315                                 if (message.stalled) {
316                                         Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
317                                 }
318                                 else {
319                                         Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
320                                         if (bitfield[message.index] == '0') {
321                                                 piecesRequested--;
322                                                 //Removing the piece from our piece list.
323                                                 if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
324                                                 }
325                                                 //Setting the fact that we have the piece
326                                                 bitfield[message.index] = '1';
327                                                 pieces++;
328                                                 Msg.debug("My status is now " + getStatus());
329                                                 //Sending the information to all the peers we are connected to
330                                                 sendHave(message.index);
331                                                 //sending UNINTERESTED to peers that doesn't have what we want.
332                                                 updateInterestedAfterReceive();
333                                         }
334                                         else {
335                                                 Msg.debug("However, we already have it.");
336                                         }
337                                 }
338                         break;
339                 }
340         }
341         /**
342          * Wait for the node to receive interesting bitfield messages (ie: non empty)
343          * to be received
344          */
345         void waitForPieces() {
346                 boolean finished = false;
347                 while (Msg.getClock() < deadline && !finished) {
348                         if (commReceived == null) {
349                                 commReceived = Task.irecv(mailbox);
350                         }
351                         try {
352                                 commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
353                                 handleMessage(commReceived.getTask());
354                                 if (currentPiece != -1) {
355                                         finished = true;
356                                 }
357                                 commReceived = null;
358                         }
359                         catch (MsgException e) {
360                                 commReceived = null;
361                         }
362                 }
363         }
364         
365         private boolean hasFinished() {
366                 for (int i = 0; i < bitfield.length; i++) {
367                         if (bitfield[i] == '1') {
368                                 return true;
369                         }
370                 }
371                 return false;
372         }
373         /**
374          * Updates the list of who has a piece from a bitfield
375          * @param bitfield bitfield
376          */
377         private void updatePiecesCountFromBitfield(char bitfield[]) {
378                 for (int i = 0; i < Common.FILE_PIECES; i++) {
379                         if (bitfield[i] == '1') {
380                                 piecesCount[i]++;
381                         }
382                 }
383         }
384         /**
385          * Update the piece the peer is currently interested in.
386          * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
387          * If the peer has less than 3 pieces, he chooses a piece at random.
388          * If the peer has more than pieces, he downloads the pieces that are the less
389          * replicated
390          */
391         void updateCurrentPiece() {
392                 if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
393                         return;
394                 }
395                 if (true || pieces < 3) {
396                         int i = 0, peerPiece;
397                         do {
398                                 currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
399                                 i++;
400                         } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
401                 }
402                 else {
403                         //trivial min algorithm.
404                         //TODO
405                 }
406                 currentPieces.add(currentPiece);
407                 Msg.debug("New interested piece: " + currentPiece);
408                 assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
409         }
410         /**
411          * Update the list of current choked and unchoked peers, using the
412          * choke algorithm
413          */
414         private void updateChokedPeers() {
415                 round = (round + 1) % 3;
416                 if (peers.size() == 0) {
417                         return;
418                 }
419                 //remove a peer from the list
420                 Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
421                 if (it.hasNext()) {
422                         Entry<Integer,Connection> e = it.next();
423                         Connection peerChoked = e.getValue();
424                         sendChoked(peerChoked.mailbox);
425                         peerChoked.chokedUpload = true;
426                         activePeers.remove(e.getKey());
427                 }
428                 //Random optimistic unchoking
429                 if (round == 0 || true) {
430                         int j = 0, i;
431                         Connection peerChoosed = null;
432                         do {
433                                 i = 0;
434                                 int idChosen = stream.randInt(0,peers.size() - 1);
435                                 for (Connection connection : peers.values()) {
436                                         if (i == idChosen) {
437                                                 peerChoosed = connection;
438                                                 break;
439                                         }
440                                         i++;
441                                 } //TODO: Not really the best way ever
442                                 if (!peerChoosed.interested) {
443                                         peerChoosed = null;
444                                 }
445                                 j++;
446                         } while (peerChoosed == null && j < 
447 Common.MAXIMUM_PEERS);
448                         if (peerChoosed != null) {
449                                 activePeers.put(peerChoosed.id,peerChoosed);
450                                 peerChoosed.chokedUpload = false;
451                                 sendUnchoked(peerChoosed.mailbox);
452                         }
453                 }
454                 //TODO: Use the leecher choke algorithm.
455         }
456         /**     
457          * Updates our "interested" state about peers: send "not interested" to peers
458          * that don't have any more pieces we want.
459          */
460         private void updateInterestedAfterReceive() {
461                 boolean interested;
462                 for (Connection connection : peers.values()) {
463                         interested = false;
464                         if (connection.amInterested) {
465                                 for (Integer piece : currentPieces) {
466                                         if (connection.bitfield[piece] == '1') {
467                                                 interested = true;
468                                                 break;
469                                         }
470                                 }       
471                                 if (!interested) {
472                                         connection.amInterested = false;
473                                         sendNotInterested(connection.mailbox);
474                                 }
475                         }
476                 }
477         }
478         /**
479          * Send request messages to a peer that have unchoked us
480          * @param remotePeer peer data to the peer we want to send the request
481          */
482         private void sendRequestsToPeer(Connection remotePeer) {
483                 for (Integer piece : currentPieces) {
484                         if (remotePeer.bitfield != null && remotePeer.bitfield[piece] == '1') {
485                                 sendRequest(remotePeer.mailbox, piece);
486                         }                       
487                 }
488         }       
489         /**
490          * Find the peers that have the current interested piece and send them
491          * the "interested" message
492          */
493         private void sendInterestedToPeers() {
494                 if (currentPiece == -1) {
495                         return;
496                 }
497                 for (Connection connection : peers.values()) {
498                         if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
499                                 connection.amInterested = true;                         
500                                 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
501                                 task.dsend(connection.mailbox);                         
502                         }
503                 }
504                 currentPiece = -1;
505                 piecesRequested++;
506         }
507         /**
508          * Send a "interested" message to a peer.
509          */
510         private void sendInterested(String mailbox) {
511                 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
512                 task.dsend(mailbox);                                            
513         }
514         /**
515          * Send a "not interested" message to a peer
516          * @param mailbox mailbox destination mailbox
517          */
518         private void sendNotInterested(String mailbox) {
519                 MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
520                 task.dsend(mailbox);                            
521         }
522         /**
523          * Send a handshake message to all the peers the peer has.
524          * @param peer peer data
525          */
526         private void sendHandshakeAll() {
527                 for (Connection remotePeer : peers.values()) {
528                         MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
529                         id);
530                         task.dsend(remotePeer.mailbox);
531                 }
532         }
533         /**
534          * Send a "handshake" message to an user
535          * @param mailbox mailbox where to we send the message
536          */
537         private void sendHandshake(String mailbox) {
538                 Msg.debug("Sending a HANDSHAKE to " + mailbox);
539                 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
540                 task.dsend(mailbox);            
541         }
542         /**
543          * Send a "choked" message to a peer
544          */
545         private void sendChoked(String mailbox) {
546                 Msg.debug("Sending a CHOKE to " + mailbox);
547                 MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
548                 task.dsend(mailbox);
549         }
550         /**
551          * Send a "unchoked" message to a peer
552          */
553         private void sendUnchoked(String mailbox) {
554                 Msg.debug("Sending a UNCHOKE to " + mailbox);
555                 MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
556                 task.dsend(mailbox);
557         }
558         /**
559          * Send a "HAVE" message to all peers we are connected to
560          */
561         private void sendHave(int piece) {
562                 Msg.debug("Sending HAVE message to all my peers");
563                 for (Connection remotePeer : peers.values()) {
564                         MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
565                         task.dsend(remotePeer.mailbox);
566                 }
567         }
568         /**
569          * Send a bitfield message to all the peers the peer has.
570          * @param peer peer data
571          */
572         private void sendBitfield(String mailbox) {
573                 Msg.debug("Sending a BITFIELD to " + mailbox);
574                 MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
575                 task.dsend(mailbox);
576         }
577         /**
578          * Send a "request" message to a pair, containing a request for a piece
579          */
580         private void sendRequest(String mailbox, int piece) {
581                 Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece);
582                 MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece);
583                 task.dsend(mailbox);
584         }
585         /**
586          * Send a "piece" message to a pair, containing a piece of the file
587          */
588         private void sendPiece(String mailbox, int piece, boolean stalled) {
589                 Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
590                 MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled);
591                 task.dsend(mailbox);
592         }
593         
594         private String getStatus() {
595                 String s = "";
596                 for (int i = 0; i < Common.FILE_PIECES; i++) {
597                         s = s + bitfield[i];
598                 }
599                 return s;
600         }
601 }
602