Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Some doc on how to use the coroutines context factory
[simgrid.git] / examples / bittorrent / Peer.java
1 package bittorrent;
2
3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.Iterator;
6 import java.util.Map.Entry;
7
8 import org.simgrid.msg.Comm;
9 import org.simgrid.msg.Host;
10 import org.simgrid.msg.Msg;
11 import org.simgrid.msg.MsgException;
12 import org.simgrid.msg.RngStream;
13 import org.simgrid.msg.Process;
14 import org.simgrid.msg.Task;
15
16 /**
17  * Main class for peers execution
18  */
19 public class Peer extends Process {
20         protected int round = 0;
21         
22         protected double deadline;
23         
24         protected static RngStream stream = new RngStream();
25         
26         protected int id;
27         protected String mailbox;
28         protected String mailboxTracker;
29         protected String hostname;
30         protected int pieces = 0;
31         protected char[] bitfield = new char[Common.FILE_PIECES];
32         protected char[][] bitfield_blocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
33         
34         protected short[] piecesCount = new short[Common.FILE_PIECES];
35         
36         protected int piecesRequested = 0;
37         
38         protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
39         protected int currentPiece = -1;
40
41         protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();        
42         protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
43         
44         protected Comm commReceived = null;
45
46         public Peer(Host host, String name, String[]args) {
47                 super(host,name,args);
48         }       
49         
50         @Override
51         public void main(String[] args) throws MsgException {
52                 //Check arguments
53                 if (args.length != 3 && args.length != 2) {
54                         Msg.info("Wrong number of arguments");
55                 }
56                 if (args.length == 3) {
57                         init(Integer.valueOf(args[0]),true);
58                 }
59                 else {
60                         init(Integer.valueOf(args[0]),false);
61                 }
62                 //Retrieve the deadline
63                 deadline = Double.valueOf(args[1]);
64                 if (deadline < 0) {
65                         Msg.info("Wrong deadline supplied");
66                         return;
67                 }
68                 Msg.info("Hi, I'm joining the network with id " + id);
69                 //Getting peer data from the tracker
70                 if (getPeersData()) {
71                         Msg.debug("Got " + peers.size() + " peers from the tracker");
72                         Msg.debug("Here is my current status: " + getStatus());
73                         if (hasFinished()) {
74                                 pieces = Common.FILE_PIECES;
75                                 sendHandshakeAll();
76                                 seedLoop();
77                         }
78                         else {
79                                 leechLoop();
80                                 seedLoop();
81                         }
82                 }
83                 else {
84                         Msg.info("Couldn't contact the tracker.");
85                 }
86                 Msg.info("Here is my current status: " + getStatus());
87         }
88         /**
89          * Peer main loop when it is leeching.
90          */
91         private void leechLoop() {
92                 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
93                 Msg.debug("Start downloading.");
94                 /**
95                  * Send a "handshake" message to all the peers it got
96                  * (it couldn't have gotten more than 50 peers anyway)
97                  */
98                 sendHandshakeAll();
99                 //Wait for at least one "bitfield" message.
100                 waitForPieces();
101                 Msg.debug("Starting main leech loop");
102                 while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
103                         if (commReceived == null) {
104                                 commReceived = Task.irecv(mailbox);
105                         }
106                         try {
107                                 if (commReceived.test()) {
108                                         handleMessage(commReceived.getTask());
109                                         commReceived = null;
110                                 }
111                                 else {
112                                         //If the user has a pending interesting
113                                         if (currentPiece != -1) {
114                                                 sendInterestedToPeers();
115                                         }
116                                         else {
117                                                 if (currentPieces.size() < Common.MAX_PIECES) {
118                                                         updateCurrentPiece();
119                                                 }
120                                         }
121                                         //We don't execute the choke algorithm if we don't already have a piece
122                                         if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
123                                                 updateChokedPeers();
124                                                 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
125                                         }
126                                         else {
127                                                 waitFor(1);
128                                         }
129                                 }
130                         }
131                         catch (MsgException e) {
132                                 commReceived = null;                            
133                         }
134                 }
135         }
136         
137         /**
138          * Peer main loop when it is seeding
139          */
140         private void seedLoop() {
141                 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
142                 Msg.debug("Start seeding.");
143                 //start the main seed loop
144                 while (Msg.getClock() < deadline) {
145                         if (commReceived == null) {
146                                 commReceived = Task.irecv(mailbox);
147                         }
148                         try {
149                                 if (commReceived.test()) {
150                                         handleMessage(commReceived.getTask());
151                                         commReceived = null;
152                                 }
153                                 else {
154                                         if (Msg.getClock() >= nextChokedUpdate) {
155                                                 updateChokedPeers();
156                                                 //TODO: Change the choked peer algorithm when seeding
157                                                 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
158                                         }
159                                         else {
160                                                 waitFor(1);
161                                         }
162                                 }
163                         }
164                         catch (MsgException e) {
165                                 commReceived = null;                            
166                         }
167
168                 }
169         }
170         
171         /**
172          * Initialize the various peer data
173          * @param id id of the peer to take in the network
174          * @param seed indicates if the peer is a seed
175          */
176         private void init(int id, boolean seed) {
177                 this.id = id;
178                 this.mailbox = Integer.toString(id);
179                 this.mailboxTracker = "tracker_" + Integer.toString(id);
180                 if (seed) {
181                         for (int i = 0; i < bitfield.length; i++) {
182                                 bitfield[i] = '1';
183                         }
184                 }
185                 else {
186                         for (int i = 0; i < bitfield.length; i++) {
187                                 bitfield[i] = '0';
188                         }                       
189                 }
190                 this.hostname = host.getName();
191         }
192         /**
193          * Retrieves the peer list from the tracker
194          */
195         private boolean getPeersData() {
196                 
197                 boolean success = false, sendSuccess = false;
198                 double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
199                 //Build the task to send to the tracker
200                 TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
201                         
202                 while (!sendSuccess && Msg.getClock() < timeout) {
203                         try {
204                                 Msg.debug("Sending a peer request to the tracker.");
205                                 taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
206                                 sendSuccess = true;
207                         }
208                         catch (MsgException e) {
209                                 
210                         }
211                 }
212                 while (!success && Msg.getClock() < timeout) {
213                         commReceived = Task.irecv(this.mailboxTracker);
214                         try {
215                                 commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
216                                 if (commReceived.getTask() instanceof TrackerTask) {
217                                         TrackerTask task = (TrackerTask)commReceived.getTask();
218                                         for (Integer peerId: task.peers) {
219                                                 if (peerId != this.id) {
220                                                         peers.put(peerId, new Connection(peerId));
221                                                 }       
222                                         }
223                                         success = true;
224                                 }
225                         }
226                         catch (MsgException e) {
227                                 
228                         }
229                         commReceived = null;
230                 }
231                 commReceived = null;
232                 return success;
233         }
234         /**
235          * Handle a received message sent by another peer
236          * @param task task received.
237          */
238         void handleMessage(Task task) {
239                 MessageTask message = (MessageTask)task;
240                 Connection remotePeer = peers.get(message.peerId);
241                 switch (message.type) {
242                         case HANDSHAKE:
243                                 Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
244                                 //Check if the peer is in our connection list
245                                 if (remotePeer == null) {
246                                         peers.put(message.peerId, new Connection(message.peerId));
247                                         sendHandshake(message.mailbox);
248                                 }
249                                 //Send our bitfield to the pair
250                                 sendBitfield(message.mailbox);
251                         break;
252                         case BITFIELD:
253                                 Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
254                                 //update the pieces list
255                                 updatePiecesCountFromBitfield(message.bitfield);
256                                 //Update the current piece
257                                 if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
258                                         updateCurrentPiece();
259                                 }                               
260                                 remotePeer.bitfield  = message.bitfield.clone();
261                         break;
262                         case INTERESTED:
263                                 Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
264                                 assert remotePeer != null;
265                                 remotePeer.interested = true;
266                         break;
267                         case NOTINTERESTED:
268                                 Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
269                                 assert remotePeer != null;
270                                 remotePeer.interested = false;
271                         break;
272                         case UNCHOKE:
273                                 Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
274                                 assert remotePeer != null;
275                                 remotePeer.chokedDownload = false;
276                                 activePeers.put(remotePeer.id,remotePeer);
277                                 sendRequestsToPeer(remotePeer);
278                         break;
279                         case CHOKE:
280                                 Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
281                                 assert remotePeer != null;
282                                 remotePeer.chokedDownload = true;
283                                 activePeers.remove(remotePeer.id);
284                         break;
285                         case HAVE:
286                                 if (remotePeer.bitfield == null) {
287                                         return;
288                                 }
289                                 Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
290                                 assert message.index >= 0 && message.index < Common.FILE_PIECES;
291                                 assert remotePeer.bitfield != null;
292                                 remotePeer.bitfield[message.index] = '1';
293                                 piecesCount[message.index]++; 
294                                 //Send interested message to the peer if he has what we want
295                                 if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
296                                         remotePeer.amInterested = true;
297                                         sendInterested(remotePeer.mailbox);
298                                 }
299                                 
300                                 if (currentPieces.contains(message.index)) {
301                                         sendRequest(message.mailbox,message.index);
302                                 }
303                         break;
304                         case REQUEST:
305                                 assert message.index >= 0 && message.index < Common.FILE_PIECES;
306                                 if (!remotePeer.chokedUpload) {
307                                         Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
308                                         if (bitfield[message.index] == '1') {
309                                                 sendPiece(message.mailbox,message.index,false); 
310                                         }
311                                         else {
312                                                 Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
313                                         }
314                                 }
315                         break;
316                         case PIECE:
317                                 if (message.stalled) {
318                                         Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
319                                 }
320                                 else {
321                                         Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
322                                         if (bitfield[message.index] == '0') {
323                                                 piecesRequested--;
324                                                 //Removing the piece from our piece list.
325                                                 if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
326                                                 }
327                                                 //Setting the fact that we have the piece
328                                                 bitfield[message.index] = '1';
329                                                 pieces++;
330                                                 Msg.debug("My status is now " + getStatus());
331                                                 //Sending the information to all the peers we are connected to
332                                                 sendHave(message.index);
333                                                 //sending UNINTERESTED to peers that doesn't have what we want.
334                                                 updateInterestedAfterReceive();
335                                         }
336                                         else {
337                                                 Msg.debug("However, we already have it.");
338                                         }
339                                 }
340                         break;
341                 }
342         }
343         /**
344          * Wait for the node to receive interesting bitfield messages (ie: non empty)
345          * to be received
346          */
347         void waitForPieces() {
348                 boolean finished = false;
349                 while (Msg.getClock() < deadline && !finished) {
350                         if (commReceived == null) {
351                                 commReceived = Task.irecv(mailbox);
352                         }
353                         try {
354                                 commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
355                                 handleMessage(commReceived.getTask());
356                                 if (currentPiece != -1) {
357                                         finished = true;
358                                 }
359                                 commReceived = null;
360                         }
361                         catch (MsgException e) {
362                                 commReceived = null;
363                         }
364                 }
365         }
366         
367         private boolean hasFinished() {
368                 for (int i = 0; i < bitfield.length; i++) {
369                         if (bitfield[i] == '1') {
370                                 return true;
371                         }
372                 }
373                 return false;
374         }
375         /**
376          * Updates the list of who has a piece from a bitfield
377          * @param bitfield bitfield
378          */
379         private void updatePiecesCountFromBitfield(char bitfield[]) {
380                 for (int i = 0; i < Common.FILE_PIECES; i++) {
381                         if (bitfield[i] == '1') {
382                                 piecesCount[i]++;
383                         }
384                 }
385         }
386         /**
387          * Update the piece the peer is currently interested in.
388          * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
389          * If the peer has less than 3 pieces, he chooses a piece at random.
390          * If the peer has more than pieces, he downloads the pieces that are the less
391          * replicated
392          */
393         void updateCurrentPiece() {
394                 if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
395                         return;
396                 }
397                 if (true || pieces < 3) {
398                         int i = 0, peerPiece;
399                         do {
400                                 currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
401                                 i++;
402                         } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
403                 }
404                 else {
405                         //trivial min algorithm.
406                         //TODO
407                 }
408                 currentPieces.add(currentPiece);
409                 Msg.debug("New interested piece: " + currentPiece);
410                 assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
411         }
412         /**
413          * Update the list of current choked and unchoked peers, using the
414          * choke algorithm
415          */
416         private void updateChokedPeers() {
417                 round = (round + 1) % 3;
418                 if (peers.size() == 0) {
419                         return;
420                 }
421                 //remove a peer from the list
422                 Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
423                 if (it.hasNext()) {
424                         Entry<Integer,Connection> e = it.next();
425                         Connection peerChoked = e.getValue();
426                         sendChoked(peerChoked.mailbox);
427                         peerChoked.chokedUpload = true;
428                         activePeers.remove(e.getKey());
429                 }
430                 //Random optimistic unchoking
431                 if (round == 0 || true) {
432                         int j = 0, i;
433                         Connection peerChoosed = null;
434                         do {
435                                 i = 0;
436                                 int idChosen = stream.randInt(0,peers.size() - 1);
437                                 for (Connection connection : peers.values()) {
438                                         if (i == idChosen) {
439                                                 peerChoosed = connection;
440                                                 break;
441                                         }
442                                         i++;
443                                 } //TODO: Not really the best way ever
444                                 if (!peerChoosed.interested) {
445                                         peerChoosed = null;
446                                 }
447                                 j++;
448                         } while (peerChoosed == null && j < 
449 Common.MAXIMUM_PEERS);
450                         if (peerChoosed != null) {
451                                 activePeers.put(peerChoosed.id,peerChoosed);
452                                 peerChoosed.chokedUpload = false;
453                                 sendUnchoked(peerChoosed.mailbox);
454                         }
455                 }
456                 //TODO: Use the leecher choke algorithm.
457         }
458         /**     
459          * Updates our "interested" state about peers: send "not interested" to peers
460          * that don't have any more pieces we want.
461          */
462         private void updateInterestedAfterReceive() {
463                 boolean interested;
464                 for (Connection connection : peers.values()) {
465                         interested = false;
466                         if (connection.amInterested) {
467                                 for (Integer piece : currentPieces) {
468                                         if (connection.bitfield[piece] == '1') {
469                                                 interested = true;
470                                                 break;
471                                         }
472                                 }       
473                                 if (!interested) {
474                                         connection.amInterested = false;
475                                         sendNotInterested(connection.mailbox);
476                                 }
477                         }
478                 }
479         }
480         /**
481          * Send request messages to a peer that have unchoked us
482          * @param remotePeer peer data to the peer we want to send the request
483          */
484         private void sendRequestsToPeer(Connection remotePeer) {
485                 for (Integer piece : currentPieces) {
486                         if (remotePeer.bitfield != null && remotePeer.bitfield[piece] == '1') {
487                                 sendRequest(remotePeer.mailbox, piece);
488                         }                       
489                 }
490         }       
491         /**
492          * Find the peers that have the current interested piece and send them
493          * the "interested" message
494          */
495         private void sendInterestedToPeers() {
496                 if (currentPiece == -1) {
497                         return;
498                 }
499                 for (Connection connection : peers.values()) {
500                         if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
501                                 connection.amInterested = true;                         
502                                 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
503                                 task.dsend(connection.mailbox);                         
504                         }
505                 }
506                 currentPiece = -1;
507                 piecesRequested++;
508         }
509         /**
510          * Send a "interested" message to a peer.
511          */
512         private void sendInterested(String mailbox) {
513                 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
514                 task.dsend(mailbox);                                            
515         }
516         /**
517          * Send a "not interested" message to a peer
518          * @param mailbox mailbox destination mailbox
519          */
520         private void sendNotInterested(String mailbox) {
521                 MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
522                 task.dsend(mailbox);                            
523         }
524         /**
525          * Send a handshake message to all the peers the peer has.
526          * @param peer peer data
527          */
528         private void sendHandshakeAll() {
529                 for (Connection remotePeer : peers.values()) {
530                         MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
531                         id);
532                         task.dsend(remotePeer.mailbox);
533                 }
534         }
535         /**
536          * Send a "handshake" message to an user
537          * @param mailbox mailbox where to we send the message
538          */
539         private void sendHandshake(String mailbox) {
540                 Msg.debug("Sending a HANDSHAKE to " + mailbox);
541                 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
542                 task.dsend(mailbox);            
543         }
544         /**
545          * Send a "choked" message to a peer
546          */
547         private void sendChoked(String mailbox) {
548                 Msg.debug("Sending a CHOKE to " + mailbox);
549                 MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
550                 task.dsend(mailbox);
551         }
552         /**
553          * Send a "unchoked" message to a peer
554          */
555         private void sendUnchoked(String mailbox) {
556                 Msg.debug("Sending a UNCHOKE to " + mailbox);
557                 MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
558                 task.dsend(mailbox);
559         }
560         /**
561          * Send a "HAVE" message to all peers we are connected to
562          */
563         private void sendHave(int piece) {
564                 Msg.debug("Sending HAVE message to all my peers");
565                 for (Connection remotePeer : peers.values()) {
566                         MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
567                         task.dsend(remotePeer.mailbox);
568                 }
569         }
570         /**
571          * Send a bitfield message to all the peers the peer has.
572          * @param peer peer data
573          */
574         private void sendBitfield(String mailbox) {
575                 Msg.debug("Sending a BITFIELD to " + mailbox);
576                 MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
577                 task.dsend(mailbox);
578         }
579         /**
580          * Send a "request" message to a pair, containing a request for a piece
581          */
582         private void sendRequest(String mailbox, int piece) {
583                 Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece);
584                 MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece);
585                 task.dsend(mailbox);
586         }
587         /**
588          * Send a "piece" message to a pair, containing a piece of the file
589          */
590         private void sendPiece(String mailbox, int piece, boolean stalled) {
591                 Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
592                 MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled);
593                 task.dsend(mailbox);
594         }
595         
596         private String getStatus() {
597                 String s = "";
598                 for (int i = 0; i < Common.FILE_PIECES; i++) {
599                         s = s + bitfield[i];
600                 }
601                 return s;
602         }
603 }
604