Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
f1f10c15d63629ca421a91b0fe47fde779122324
[simgrid.git] / examples / java / bittorrent / Peer.java
1 /*
2  * Copyright (c) 2006-2013. The SimGrid Team.
3  * All rights reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. 
7  */
8 package bittorrent;
9
10 import java.util.ArrayList;
11 import java.util.HashMap;
12 import java.util.Iterator;
13 import java.util.Map.Entry;
14
15 import org.simgrid.msg.Comm;
16 import org.simgrid.msg.Host;
17 import org.simgrid.msg.Msg;
18 import org.simgrid.msg.MsgException;
19 import org.simgrid.msg.Process;
20 import org.simgrid.msg.RngStream;
21 import org.simgrid.msg.Task;
22
23 /**
24  * Main class for peers execution
25  */
26 public class Peer extends Process {
27         protected int round = 0;
28         
29         protected double beginReceiveTime;
30         protected double deadline;
31         
32         protected static RngStream stream = new RngStream();
33         
34         protected int id;
35         protected String mailbox;
36         protected String mailboxTracker;
37         protected String hostname;
38         protected int pieces = 0;
39         protected char[] bitfield = new char[Common.FILE_PIECES];
40         protected char[][] bitfieldBlocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
41         
42         protected short[] piecesCount = new short[Common.FILE_PIECES];
43         
44         protected int piecesRequested = 0;
45         
46         protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
47         protected int currentPiece = -1;
48
49         protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();        
50         protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
51         
52         protected Comm commReceived = null;
53
54         public Peer(Host host, String name, String[]args) {
55                 super(host,name,args);
56         }       
57         
58         @Override
59         public void main(String[] args) throws MsgException {
60                 //Check arguments
61                 if (args.length != 3 && args.length != 2) {
62                         Msg.info("Wrong number of arguments");
63                 }
64                 if (args.length == 3) {
65                         init(Integer.valueOf(args[0]),true);
66                 }
67                 else {
68                         init(Integer.valueOf(args[0]),false);
69                 }
70                 //Retrieve the deadline
71                 deadline = Double.valueOf(args[1]);
72                 if (deadline < 0) {
73                         Msg.info("Wrong deadline supplied");
74                         return;
75                 }
76                 Msg.info("Hi, I'm joining the network with id " + id);
77                 //Getting peer data from the tracker
78                 if (getPeersData()) {
79                         Msg.debug("Got " + peers.size() + " peers from the tracker");
80                         Msg.debug("Here is my current status: " + getStatus());
81                         beginReceiveTime = Msg.getClock();                      
82                         if (hasFinished()) {
83                                 pieces = Common.FILE_PIECES;
84                                 sendHandshakeAll();
85                                 seedLoop();
86                         }
87                         else {
88                                 leechLoop();
89                                 seedLoop();
90                         }
91                 }
92                 else {
93                         Msg.info("Couldn't contact the tracker.");
94                 }
95                 Msg.info("Here is my current status: " + getStatus());
96         }
97         /**
98          * Peer main loop when it is leeching.
99          */
100         private void leechLoop() {
101                 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
102                 Msg.debug("Start downloading.");
103                 /**
104                  * Send a "handshake" message to all the peers it got
105                  * (it couldn't have gotten more than 50 peers anyway)
106                  */
107                 sendHandshakeAll();
108                 //Wait for at least one "bitfield" message.
109                 waitForPieces();
110                 Msg.debug("Starting main leech loop");
111                 while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
112                         if (commReceived == null) {
113                                 commReceived = Task.irecv(mailbox);
114                         }
115                         try {
116                                 if (commReceived.test()) {
117                                         handleMessage(commReceived.getTask());
118                                         commReceived = null;
119                                 }
120                                 else {
121                                         //If the user has a pending interesting
122                                         if (currentPiece != -1) {
123                                                 sendInterestedToPeers();
124                                         }
125                                         else {
126                                                 if (currentPieces.size() < Common.MAX_PIECES) {
127                                                         updateCurrentPiece();
128                                                 }
129                                         }
130                                         //We don't execute the choke algorithm if we don't already have a piece
131                                         if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
132                                                 updateChokedPeers();
133                                                 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
134                                         }
135                                         else {
136                                                 waitFor(1);
137                                         }
138                                 }
139                         }
140                         catch (MsgException e) {
141                                 commReceived = null;                            
142                         }
143                 }
144         }
145         
146         /**
147          * Peer main loop when it is seeding
148          */
149         private void seedLoop() {
150                 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
151                 Msg.debug("Start seeding.");
152                 //start the main seed loop
153                 while (Msg.getClock() < deadline) {
154                         if (commReceived == null) {
155                                 commReceived = Task.irecv(mailbox);
156                         }
157                         try {
158                                 if (commReceived.test()) {
159                                         handleMessage(commReceived.getTask());
160                                         commReceived = null;
161                                 }
162                                 else {
163                                         if (Msg.getClock() >= nextChokedUpdate) {
164                                                 updateChokedPeers();
165                                                 //TODO: Change the choked peer algorithm when seeding
166                                                 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
167                                         }
168                                         else {
169                                                 waitFor(1);
170                                         }
171                                 }
172                         }
173                         catch (MsgException e) {
174                                 commReceived = null;                            
175                         }
176
177                 }
178         }
179         
180         /**
181          * Initialize the various peer data
182          * @param id id of the peer to take in the network
183          * @param seed indicates if the peer is a seed
184          */
185         private void init(int id, boolean seed) {
186                 this.id = id;
187                 this.mailbox = Integer.toString(id);
188                 this.mailboxTracker = "tracker_" + Integer.toString(id);
189                 if (seed) {
190                         for (int i = 0; i < bitfield.length; i++) {
191                                 bitfield[i] = '1';
192                                 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
193                                         bitfieldBlocks[i][j] = '1';
194                                 }
195                         }
196                 }
197                 else {
198                         for (int i = 0; i < bitfield.length; i++) {
199                                 bitfield[i] = '0';
200                                 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
201                                         bitfieldBlocks[i][j] = '0'      ;
202                                 }
203                         }                       
204                 }
205                 this.hostname = host.getName();
206         }
207         /**
208          * Retrieves the peer list from the tracker
209          */
210         private boolean getPeersData() {
211                 
212                 boolean success = false, sendSuccess = false;
213                 double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
214                 //Build the task to send to the tracker
215                 TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
216                         
217                 while (!sendSuccess && Msg.getClock() < timeout) {
218                         try {
219                                 Msg.debug("Sending a peer request to the tracker.");
220                                 taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
221                                 sendSuccess = true;
222                         }
223                         catch (MsgException e) {
224                                 
225                         }
226                 }
227                 while (!success && Msg.getClock() < timeout) {
228                         commReceived = Task.irecv(this.mailboxTracker);
229                         try {
230                                 commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
231                                 if (commReceived.getTask() instanceof TrackerTask) {
232                                         TrackerTask task = (TrackerTask)commReceived.getTask();
233                                         for (Integer peerId: task.peers) {
234                                                 if (peerId != this.id) {
235                                                         peers.put(peerId, new Connection(peerId));
236                                                 }       
237                                         }
238                                         success = true;
239                                 }
240                         }
241                         catch (MsgException e) {
242                                 
243                         }
244                         commReceived = null;
245                 }
246                 commReceived = null;
247                 return success;
248         }
249         /**
250          * Handle a received message sent by another peer
251          * @param task task received.
252          */
253         void handleMessage(Task task) {
254                 MessageTask message = (MessageTask)task;
255                 Connection remotePeer = peers.get(message.peerId);
256                 switch (message.type) {
257                         case HANDSHAKE:
258                                 Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
259                                 //Check if the peer is in our connection list
260                                 if (remotePeer == null) {
261                                         peers.put(message.peerId, new Connection(message.peerId));
262                                         sendHandshake(message.mailbox);
263                                 }
264                                 //Send our bitfield to the pair
265                                 sendBitfield(message.mailbox);
266                         break;
267                         case BITFIELD:
268                                 Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
269                                 //update the pieces list
270                                 updatePiecesCountFromBitfield(message.bitfield);
271                                 //Update the current piece
272                                 if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
273                                         updateCurrentPiece();
274                                 }                               
275                                 remotePeer.bitfield  = message.bitfield.clone();
276                         break;
277                         case INTERESTED:
278                                 Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
279                                 assert remotePeer != null;
280                                 remotePeer.interested = true;
281                         break;
282                         case NOTINTERESTED:
283                                 Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
284                                 assert remotePeer != null;
285                                 remotePeer.interested = false;
286                         break;
287                         case UNCHOKE:
288                                 Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
289                                 assert remotePeer != null;
290                                 remotePeer.chokedDownload = false;
291                                 activePeers.put(remotePeer.id,remotePeer);
292                                 sendRequestsToPeer(remotePeer);
293                         break;
294                         case CHOKE:
295                                 Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
296                                 assert remotePeer != null;
297                                 remotePeer.chokedDownload = true;
298                                 activePeers.remove(remotePeer.id);
299                         break;
300                         case HAVE:
301                                 if (remotePeer.bitfield == null) {
302                                         return;
303                                 }
304                                 Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
305                                 assert message.index >= 0 && message.index < Common.FILE_PIECES;
306                                 assert remotePeer.bitfield != null;
307                                 remotePeer.bitfield[message.index] = '1';
308                                 piecesCount[message.index]++; 
309                                 //Send interested message to the peer if he has what we want
310                                 if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
311                                         remotePeer.amInterested = true;
312                                         sendInterested(remotePeer.mailbox);
313                                 }
314                                 
315                                 if (currentPieces.contains(message.index)) {
316                                         int blockIndex = getFirstBlock(message.index);                  
317                                         int blockLength = Common.PIECES_BLOCKS - blockIndex ;
318                                         blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;            
319                                         sendRequest(message.mailbox,message.index,blockIndex,blockLength);
320                                 }
321                         break;
322                         case REQUEST:
323                                 assert message.index >= 0 && message.index < Common.FILE_PIECES;
324                                 if (!remotePeer.chokedUpload) {
325                                         Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
326                                         if (bitfield[message.index] == '1') {
327                                                 sendPiece(message.mailbox,message.index,false,message.blockIndex,message.blockLength);  
328                                         }
329                                         else {
330                                                 Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
331                                         }
332                                 }
333                         break;
334                         case PIECE:
335                                 if (message.stalled) {
336                                         Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ") is stalled");
337                                 }
338                                 else {
339                                         Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
340                                         if (bitfield[message.index] == '0') {
341                                                 updateBitfieldBlocks(message.index,message.blockIndex,message.blockLength);
342                                                 if (pieceComplete(message.index)) {
343                                                         piecesRequested--;
344                                                         //Removing the piece from our piece list.
345                                                         if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
346                                                         }
347                                                         //Setting the fact that we have the piece
348                                                         bitfield[message.index] = '1';
349                                                         pieces++;
350                                                         Msg.debug("My status is now " + getStatus());
351                                                         //Sending the information to all the peers we are connected to
352                                                         sendHave(message.index);
353                                                         //sending UNINTERESTED to peers that doesn't have what we want.
354                                                         updateInterestedAfterReceive();
355                                                 }
356                                         }
357                                         else {
358                                                 Msg.debug("However, we already have it.");
359                                         }
360                                 }
361                         break;
362                 }
363                 if (remotePeer != null) {
364                         remotePeer.addSpeedValue(1 / (Msg.getClock() - beginReceiveTime));
365                 }
366                 beginReceiveTime = Msg.getClock();
367         }
368         /**
369          * Wait for the node to receive interesting bitfield messages (ie: non empty)
370          * to be received
371          */
372         void waitForPieces() {
373                 boolean finished = false;
374                 while (Msg.getClock() < deadline && !finished) {
375                         if (commReceived == null) {
376                                 commReceived = Task.irecv(mailbox);
377                         }
378                         try {
379                                 commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
380                                 handleMessage(commReceived.getTask());
381                                 if (currentPiece != -1) {
382                                         finished = true;
383                                 }
384                                 commReceived = null;
385                         }
386                         catch (MsgException e) {
387                                 commReceived = null;
388                         }
389                 }
390         }
391         
392         private boolean hasFinished() {
393                 for (int i = 0; i < bitfield.length; i++) {
394                         if (bitfield[i] == '1') {
395                                 return true;
396                         }
397                 }
398                 return false;
399         }
400         /**
401          * Updates the list of who has a piece from a bitfield
402          * @param bitfield bitfield
403          */
404         private void updatePiecesCountFromBitfield(char bitfield[]) {
405                 for (int i = 0; i < Common.FILE_PIECES; i++) {
406                         if (bitfield[i] == '1') {
407                                 piecesCount[i]++;
408                         }
409                 }
410         }
411         /**
412          * Update the piece the peer is currently interested in.
413          * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
414          * If the peer has less than 3 pieces, he chooses a piece at random.
415          * If the peer has more than pieces, he downloads the pieces that are the less
416          * replicated
417          */
418         void updateCurrentPiece() {
419                 if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
420                         return;
421                 }
422                 if (true || pieces < 3) {
423                         int i = 0, peerPiece;
424                         do {
425                                 currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
426                                 i++;
427                         } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
428                 }
429                 else {
430                         //trivial min algorithm.
431                         //TODO
432                 }
433                 currentPieces.add(currentPiece);
434                 Msg.debug("New interested piece: " + currentPiece);
435                 assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
436         }
437         /**
438          * Update the list of current choked and unchoked peers, using the
439          * choke algorithm
440          */
441         private void updateChokedPeers() {
442                 round = (round + 1) % 3;
443                 if (peers.size() == 0) {
444                         return;
445                 }
446                 //remove a peer from the list
447                 Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
448                 if (it.hasNext()) {
449                         Entry<Integer,Connection> e = it.next();
450                         Connection peerChoked = e.getValue();
451                         peerChoked.chokedUpload = true;
452                         sendChoked(peerChoked.mailbox);
453                         activePeers.remove(e.getKey());
454                 }
455                 Connection peerChoosed = null;
456                 //Separate the case from when the peer is seeding.
457                 if (pieces == Common.FILE_PIECES) {
458                         //Find the last unchoked peer.
459                         double unchokeTime = deadline + 1;
460                         for (Connection connection : peers.values()) {
461                                 if (connection.lastUnchoke < unchokeTime && connection.interested) {
462                                         peerChoosed = connection;
463                                         unchokeTime = connection.lastUnchoke;
464                                 }
465                         }
466                 }
467                 else {
468                         //Random optimistic unchoking
469                         if (round == 0) {
470                                 int j = 0, i;
471                                 do {
472                                         i = 0;
473                                         int idChosen = stream.randInt(0,peers.size() - 1);
474                                         for (Connection connection : peers.values()) {
475                                                 if (i == idChosen) {
476                                                         peerChoosed = connection;
477                                                         break;
478                                                 }
479                                                 i++;
480                                         } //TODO: Not really the best way ever
481                                         if (!peerChoosed.interested) {
482                                                 peerChoosed = null;
483                                         }
484                                         j++;
485                                 } while (peerChoosed == null && j < Common.MAXIMUM_PEERS);
486                         }
487                         else {
488                                 Connection fastest = null;
489                                 double fastestSpeed = 0;
490                                 for (Connection c : peers.values()) {
491                                         if (c.peerSpeed > fastestSpeed && c.interested && c.chokedUpload) {
492                                                 fastest = c;
493                                                 fastestSpeed = c.peerSpeed;
494                                         }
495                                 }
496                                 peerChoosed = fastest;
497                         }
498                 }
499                 if (peerChoosed != null) {
500                         activePeers.put(peerChoosed.id,peerChoosed);
501                         peerChoosed.chokedUpload = false;
502                         peerChoosed.lastUnchoke = Msg.getClock();
503                         sendUnchoked(peerChoosed.mailbox);
504                 }
505         }
506         /**     
507          * Updates our "interested" state about peers: send "not interested" to peers
508          * that don't have any more pieces we want.
509          */
510         private void updateInterestedAfterReceive() {
511                 boolean interested;
512                 for (Connection connection : peers.values()) {
513                         interested = false;
514                         if (connection.amInterested) {
515                                 for (Integer piece : currentPieces) {
516                                         if (connection.bitfield[piece] == '1') {
517                                                 interested = true;
518                                                 break;
519                                         }
520                                 }       
521                                 if (!interested) {
522                                         connection.amInterested = false;
523                                         sendNotInterested(connection.mailbox);
524                                 }
525                         }
526                 }
527         }
528         private void updateBitfieldBlocks(int index, int blockIndex, int blockLength) {
529                 for (int i = blockIndex; i < (blockIndex + blockLength); i++) {
530                         bitfieldBlocks[index][i] = '1';
531                 }
532         }
533         /**
534          * Returns if a piece is complete in the peer's bitfield.
535          * @param index the index of the piece.
536          */
537         private boolean pieceComplete(int index) {
538                 for (int i = 0; i < bitfieldBlocks[index].length; i++) {
539                         if (bitfieldBlocks[index][i] == '0') {
540                                 return false;
541                         }
542                 }
543                 return true;
544         }
545         /**
546          * Returns the first block of a piece that we don't have. 
547          */
548         private int getFirstBlock(int piece) {
549                 int blockIndex = -1;
550                 for (int i = 0; i < Common.PIECES_BLOCKS; i++) {
551                         if (bitfieldBlocks[piece][i] == '0') {
552                                 blockIndex = i;
553                                 break;
554                         }
555                 }       
556                 return blockIndex;
557         }
558         /**
559          * Send request messages to a peer that have unchoked us
560          * @param remotePeer peer data to the peer we want to send the request
561          */
562         private void sendRequestsToPeer(Connection remotePeer) {
563                 if (remotePeer.bitfield == null) {
564                         return;
565                 }
566                 for (Integer piece : currentPieces) {
567                         //Getting the block to send.    
568                         int blockIndex = -1, blockLength = 0;
569                         blockIndex = getFirstBlock(piece);                      
570                         blockLength = Common.PIECES_BLOCKS - blockIndex ;
571                         blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;            
572                         if (remotePeer.bitfield[piece] == '1') {
573                                 sendRequest(remotePeer.mailbox, piece, blockIndex, blockLength);
574                         }                       
575                 }
576         }       
577         /**
578          * Find the peers that have the current interested piece and send them
579          * the "interested" message
580          */
581         private void sendInterestedToPeers() {
582                 if (currentPiece == -1) {
583                         return;
584                 }
585                 for (Connection connection : peers.values()) {
586                         if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
587                                 connection.amInterested = true;                         
588                                 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
589                                 task.dsend(connection.mailbox);                         
590                         }
591                 }
592                 currentPiece = -1;
593                 piecesRequested++;
594         }
595         /**
596          * Send a "interested" message to a peer.
597          */
598         private void sendInterested(String mailbox) {
599                 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
600                 task.dsend(mailbox);                                            
601         }
602         /**
603          * Send a "not interested" message to a peer
604          * @param mailbox mailbox destination mailbox
605          */
606         private void sendNotInterested(String mailbox) {
607                 MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
608                 task.dsend(mailbox);                            
609         }
610         /**
611          * Send a handshake message to all the peers the peer has.
612          * @param peer peer data
613          */
614         private void sendHandshakeAll() {
615                 for (Connection remotePeer : peers.values()) {
616                         MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
617                         id);
618                         task.dsend(remotePeer.mailbox);
619                 }
620         }
621         /**
622          * Send a "handshake" message to an user
623          * @param mailbox mailbox where to we send the message
624          */
625         private void sendHandshake(String mailbox) {
626                 Msg.debug("Sending a HANDSHAKE to " + mailbox);
627                 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
628                 task.dsend(mailbox);            
629         }
630         /**
631          * Send a "choked" message to a peer
632          */
633         private void sendChoked(String mailbox) {
634                 Msg.debug("Sending a CHOKE to " + mailbox);
635                 MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
636                 task.dsend(mailbox);
637         }
638         /**
639          * Send a "unchoked" message to a peer
640          */
641         private void sendUnchoked(String mailbox) {
642                 Msg.debug("Sending a UNCHOKE to " + mailbox);
643                 MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
644                 task.dsend(mailbox);
645         }
646         /**
647          * Send a "HAVE" message to all peers we are connected to
648          */
649         private void sendHave(int piece) {
650                 Msg.debug("Sending HAVE message to all my peers");
651                 for (Connection remotePeer : peers.values()) {
652                         MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
653                         task.dsend(remotePeer.mailbox);
654                 }
655         }
656         /**
657          * Send a bitfield message to all the peers the peer has.
658          * @param peer peer data
659          */
660         private void sendBitfield(String mailbox) {
661                 Msg.debug("Sending a BITFIELD to " + mailbox);
662                 MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
663                 task.dsend(mailbox);
664         }
665         /**
666          * Send a "request" message to a pair, containing a request for a piece
667          */
668         private void sendRequest(String mailbox, int piece, int blockIndex, int blockLength) {
669                 Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece + " and blocks " + blockIndex + "," + (blockIndex + blockLength));
670                 MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece, blockIndex, blockLength);
671                 task.dsend(mailbox);
672         }
673         /**
674          * Send a "piece" message to a pair, containing a piece of the file
675          */
676         private void sendPiece(String mailbox, int piece, boolean stalled, int blockIndex, int blockLength) {
677                 Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
678                 MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled, blockIndex, blockLength);
679                 task.dsend(mailbox);
680         }
681         
682         private String getStatus() {
683                 String s = "";
684                 for (int i = 0; i < Common.FILE_PIECES; i++) {
685                         s = s + bitfield[i];
686                 }
687                 return s;
688         }
689 }
690