Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Bugfix in Bittorrent example
[simgrid.git] / examples / bittorrent / Peer.java
1 package bittorrent;
2
3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.HashSet;
6 import java.util.Iterator;
7 import java.util.Map.Entry;
8
9 import org.simgrid.msg.Comm;
10 import org.simgrid.msg.Host;
11 import org.simgrid.msg.Msg;
12 import org.simgrid.msg.MsgException;
13 import org.simgrid.msg.RngStream;
14 import org.simgrid.msg.Process;
15 import org.simgrid.msg.Task;
16
17 import bittorrent.Connection;
18
19 /**
20  * Main class for peers execution
21  */
22 public class Peer extends Process {
23         protected int round = 0;
24         
25         protected double beginReceiveTime;
26         protected double deadline;
27         
28         protected static RngStream stream = new RngStream();
29         
30         protected int id;
31         protected String mailbox;
32         protected String mailboxTracker;
33         protected String hostname;
34         protected int pieces = 0;
35         protected char[] bitfield = new char[Common.FILE_PIECES];
36         protected char[][] bitfieldBlocks = new char[Common.FILE_PIECES][Common.PIECES_BLOCKS];
37         
38         protected short[] piecesCount = new short[Common.FILE_PIECES];
39         
40         protected int piecesRequested = 0;
41         
42         protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
43         protected int currentPiece = -1;
44
45         protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();        
46         protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
47         
48         protected Comm commReceived = null;
49
50         public Peer(Host host, String name, String[]args) {
51                 super(host,name,args);
52         }       
53         
54         @Override
55         public void main(String[] args) throws MsgException {
56                 //Check arguments
57                 if (args.length != 3 && args.length != 2) {
58                         Msg.info("Wrong number of arguments");
59                 }
60                 if (args.length == 3) {
61                         init(Integer.valueOf(args[0]),true);
62                 }
63                 else {
64                         init(Integer.valueOf(args[0]),false);
65                 }
66                 //Retrieve the deadline
67                 deadline = Double.valueOf(args[1]);
68                 if (deadline < 0) {
69                         Msg.info("Wrong deadline supplied");
70                         return;
71                 }
72                 Msg.info("Hi, I'm joining the network with id " + id);
73                 //Getting peer data from the tracker
74                 if (getPeersData()) {
75                         Msg.debug("Got " + peers.size() + " peers from the tracker");
76                         Msg.debug("Here is my current status: " + getStatus());
77                         beginReceiveTime = Msg.getClock();                      
78                         if (hasFinished()) {
79                                 pieces = Common.FILE_PIECES;
80                                 sendHandshakeAll();
81                                 seedLoop();
82                         }
83                         else {
84                                 leechLoop();
85                                 seedLoop();
86                         }
87                 }
88                 else {
89                         Msg.info("Couldn't contact the tracker.");
90                 }
91                 Msg.info("Here is my current status: " + getStatus());
92         }
93         /**
94          * Peer main loop when it is leeching.
95          */
96         private void leechLoop() {
97                 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
98                 Msg.debug("Start downloading.");
99                 /**
100                  * Send a "handshake" message to all the peers it got
101                  * (it couldn't have gotten more than 50 peers anyway)
102                  */
103                 sendHandshakeAll();
104                 //Wait for at least one "bitfield" message.
105                 waitForPieces();
106                 Msg.debug("Starting main leech loop");
107                 while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
108                         if (commReceived == null) {
109                                 commReceived = Task.irecv(mailbox);
110                         }
111                         try {
112                                 if (commReceived.test()) {
113                                         handleMessage(commReceived.getTask());
114                                         commReceived = null;
115                                 }
116                                 else {
117                                         //If the user has a pending interesting
118                                         if (currentPiece != -1) {
119                                                 sendInterestedToPeers();
120                                         }
121                                         else {
122                                                 if (currentPieces.size() < Common.MAX_PIECES) {
123                                                         updateCurrentPiece();
124                                                 }
125                                         }
126                                         //We don't execute the choke algorithm if we don't already have a piece
127                                         if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
128                                                 updateChokedPeers();
129                                                 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
130                                         }
131                                         else {
132                                                 waitFor(1);
133                                         }
134                                 }
135                         }
136                         catch (MsgException e) {
137                                 commReceived = null;                            
138                         }
139                 }
140         }
141         
142         /**
143          * Peer main loop when it is seeding
144          */
145         private void seedLoop() {
146                 double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
147                 Msg.debug("Start seeding.");
148                 //start the main seed loop
149                 while (Msg.getClock() < deadline) {
150                         if (commReceived == null) {
151                                 commReceived = Task.irecv(mailbox);
152                         }
153                         try {
154                                 if (commReceived.test()) {
155                                         handleMessage(commReceived.getTask());
156                                         commReceived = null;
157                                 }
158                                 else {
159                                         if (Msg.getClock() >= nextChokedUpdate) {
160                                                 updateChokedPeers();
161                                                 //TODO: Change the choked peer algorithm when seeding
162                                                 nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
163                                         }
164                                         else {
165                                                 waitFor(1);
166                                         }
167                                 }
168                         }
169                         catch (MsgException e) {
170                                 commReceived = null;                            
171                         }
172
173                 }
174         }
175         
176         /**
177          * Initialize the various peer data
178          * @param id id of the peer to take in the network
179          * @param seed indicates if the peer is a seed
180          */
181         private void init(int id, boolean seed) {
182                 this.id = id;
183                 this.mailbox = Integer.toString(id);
184                 this.mailboxTracker = "tracker_" + Integer.toString(id);
185                 if (seed) {
186                         for (int i = 0; i < bitfield.length; i++) {
187                                 bitfield[i] = '1';
188                                 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
189                                         bitfieldBlocks[i][j] = '1';
190                                 }
191                         }
192                 }
193                 else {
194                         for (int i = 0; i < bitfield.length; i++) {
195                                 bitfield[i] = '0';
196                                 for (int j = 0; j < bitfieldBlocks[i].length; j++) {
197                                         bitfieldBlocks[i][j] = '0'      ;
198                                 }
199                         }                       
200                 }
201                 this.hostname = host.getName();
202         }
203         /**
204          * Retrieves the peer list from the tracker
205          */
206         private boolean getPeersData() {
207                 
208                 boolean success = false, sendSuccess = false;
209                 double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
210                 //Build the task to send to the tracker
211                 TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
212                         
213                 while (!sendSuccess && Msg.getClock() < timeout) {
214                         try {
215                                 Msg.debug("Sending a peer request to the tracker.");
216                                 taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
217                                 sendSuccess = true;
218                         }
219                         catch (MsgException e) {
220                                 
221                         }
222                 }
223                 while (!success && Msg.getClock() < timeout) {
224                         commReceived = Task.irecv(this.mailboxTracker);
225                         try {
226                                 commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
227                                 if (commReceived.getTask() instanceof TrackerTask) {
228                                         TrackerTask task = (TrackerTask)commReceived.getTask();
229                                         for (Integer peerId: task.peers) {
230                                                 if (peerId != this.id) {
231                                                         peers.put(peerId, new Connection(peerId));
232                                                 }       
233                                         }
234                                         success = true;
235                                 }
236                         }
237                         catch (MsgException e) {
238                                 
239                         }
240                         commReceived = null;
241                 }
242                 commReceived = null;
243                 return success;
244         }
245         /**
246          * Handle a received message sent by another peer
247          * @param task task received.
248          */
249         void handleMessage(Task task) {
250                 MessageTask message = (MessageTask)task;
251                 Connection remotePeer = peers.get(message.peerId);
252                 switch (message.type) {
253                         case HANDSHAKE:
254                                 Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
255                                 //Check if the peer is in our connection list
256                                 if (remotePeer == null) {
257                                         peers.put(message.peerId, new Connection(message.peerId));
258                                         sendHandshake(message.mailbox);
259                                 }
260                                 //Send our bitfield to the pair
261                                 sendBitfield(message.mailbox);
262                         break;
263                         case BITFIELD:
264                                 Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
265                                 //update the pieces list
266                                 updatePiecesCountFromBitfield(message.bitfield);
267                                 //Update the current piece
268                                 if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
269                                         updateCurrentPiece();
270                                 }                               
271                                 remotePeer.bitfield  = message.bitfield.clone();
272                         break;
273                         case INTERESTED:
274                                 Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
275                                 assert remotePeer != null;
276                                 remotePeer.interested = true;
277                         break;
278                         case NOTINTERESTED:
279                                 Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
280                                 assert remotePeer != null;
281                                 remotePeer.interested = false;
282                         break;
283                         case UNCHOKE:
284                                 Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
285                                 assert remotePeer != null;
286                                 remotePeer.chokedDownload = false;
287                                 activePeers.put(remotePeer.id,remotePeer);
288                                 sendRequestsToPeer(remotePeer);
289                         break;
290                         case CHOKE:
291                                 Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
292                                 assert remotePeer != null;
293                                 remotePeer.chokedDownload = true;
294                                 activePeers.remove(remotePeer.id);
295                         break;
296                         case HAVE:
297                                 if (remotePeer.bitfield == null) {
298                                         return;
299                                 }
300                                 Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
301                                 assert message.index >= 0 && message.index < Common.FILE_PIECES;
302                                 assert remotePeer.bitfield != null;
303                                 remotePeer.bitfield[message.index] = '1';
304                                 piecesCount[message.index]++; 
305                                 //Send interested message to the peer if he has what we want
306                                 if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
307                                         remotePeer.amInterested = true;
308                                         sendInterested(remotePeer.mailbox);
309                                 }
310                                 
311                                 if (currentPieces.contains(message.index)) {
312                                         int blockIndex = getFirstBlock(message.index);                  
313                                         int blockLength = Common.PIECES_BLOCKS - blockIndex ;
314                                         blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;            
315                                         sendRequest(message.mailbox,message.index,blockIndex,blockLength);
316                                 }
317                         break;
318                         case REQUEST:
319                                 assert message.index >= 0 && message.index < Common.FILE_PIECES;
320                                 if (!remotePeer.chokedUpload) {
321                                         Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
322                                         if (bitfield[message.index] == '1') {
323                                                 sendPiece(message.mailbox,message.index,false,message.blockIndex,message.blockLength);  
324                                         }
325                                         else {
326                                                 Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
327                                         }
328                                 }
329                         break;
330                         case PIECE:
331                                 if (message.stalled) {
332                                         Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ") is stalled");
333                                 }
334                                 else {
335                                         Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
336                                         if (bitfield[message.index] == '0') {
337                                                 updateBitfieldBlocks(message.index,message.blockIndex,message.blockLength);
338                                                 if (pieceComplete(message.index)) {
339                                                         piecesRequested--;
340                                                         //Removing the piece from our piece list.
341                                                         if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
342                                                         }
343                                                         //Setting the fact that we have the piece
344                                                         bitfield[message.index] = '1';
345                                                         pieces++;
346                                                         Msg.debug("My status is now " + getStatus());
347                                                         //Sending the information to all the peers we are connected to
348                                                         sendHave(message.index);
349                                                         //sending UNINTERESTED to peers that doesn't have what we want.
350                                                         updateInterestedAfterReceive();
351                                                 }
352                                         }
353                                         else {
354                                                 Msg.debug("However, we already have it.");
355                                         }
356                                 }
357                         break;
358                 }
359                 if (remotePeer != null) {
360                         remotePeer.addSpeedValue(1 / (Msg.getClock() - beginReceiveTime));
361                 }
362                 beginReceiveTime = Msg.getClock();
363         }
364         /**
365          * Wait for the node to receive interesting bitfield messages (ie: non empty)
366          * to be received
367          */
368         void waitForPieces() {
369                 boolean finished = false;
370                 while (Msg.getClock() < deadline && !finished) {
371                         if (commReceived == null) {
372                                 commReceived = Task.irecv(mailbox);
373                         }
374                         try {
375                                 commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
376                                 handleMessage(commReceived.getTask());
377                                 if (currentPiece != -1) {
378                                         finished = true;
379                                 }
380                                 commReceived = null;
381                         }
382                         catch (MsgException e) {
383                                 commReceived = null;
384                         }
385                 }
386         }
387         
388         private boolean hasFinished() {
389                 for (int i = 0; i < bitfield.length; i++) {
390                         if (bitfield[i] == '1') {
391                                 return true;
392                         }
393                 }
394                 return false;
395         }
396         /**
397          * Updates the list of who has a piece from a bitfield
398          * @param bitfield bitfield
399          */
400         private void updatePiecesCountFromBitfield(char bitfield[]) {
401                 for (int i = 0; i < Common.FILE_PIECES; i++) {
402                         if (bitfield[i] == '1') {
403                                 piecesCount[i]++;
404                         }
405                 }
406         }
407         /**
408          * Update the piece the peer is currently interested in.
409          * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
410          * If the peer has less than 3 pieces, he chooses a piece at random.
411          * If the peer has more than pieces, he downloads the pieces that are the less
412          * replicated
413          */
414         void updateCurrentPiece() {
415                 if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
416                         return;
417                 }
418                 if (true || pieces < 3) {
419                         int i = 0, peerPiece;
420                         do {
421                                 currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
422                                 i++;
423                         } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
424                 }
425                 else {
426                         //trivial min algorithm.
427                         //TODO
428                 }
429                 currentPieces.add(currentPiece);
430                 Msg.debug("New interested piece: " + currentPiece);
431                 assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
432         }
433         /**
434          * Update the list of current choked and unchoked peers, using the
435          * choke algorithm
436          */
437         private void updateChokedPeers() {
438                 round = (round + 1) % 3;
439                 if (peers.size() == 0) {
440                         return;
441                 }
442                 //remove a peer from the list
443                 Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
444                 if (it.hasNext()) {
445                         Entry<Integer,Connection> e = it.next();
446                         Connection peerChoked = e.getValue();
447                         sendChoked(peerChoked.mailbox);
448                         peerChoked.chokedUpload = true;
449                         activePeers.remove(e.getKey());
450                 }
451                 Connection peerChoosed = null;
452                 //Separate the case from when the peer is seeding.
453                 if (pieces == Common.FILE_PIECES) {
454                         //Find the last unchoked peer.
455                         double unchokeTime = deadline + 1;
456                         for (Connection connection : peers.values()) {
457                                 if (connection.lastUnchoke < unchokeTime && connection.chokedUpload && connection.interested) {
458                                         peerChoosed = connection;
459                                         unchokeTime = connection.lastUnchoke;
460                                 }
461                         }
462                 }
463                 else {
464                         //Random optimistic unchoking
465                         if (round == 0) {
466                                 int j = 0, i;
467                                 do {
468                                         i = 0;
469                                         int idChosen = stream.randInt(0,peers.size() - 1);
470                                         for (Connection connection : peers.values()) {
471                                                 if (i == idChosen) {
472                                                         peerChoosed = connection;
473                                                         break;
474                                                 }
475                                                 i++;
476                                         } //TODO: Not really the best way ever
477                                         if (!peerChoosed.interested) {
478                                                 peerChoosed = null;
479                                         }
480                                         j++;
481                                 } while (peerChoosed == null && j < 
482         Common.MAXIMUM_PEERS);
483                         }
484                         else {
485                                 Connection fastest = null;
486                                 double fastestSpeed = 0;
487                                 for (Connection c : peers.values()) {
488                                         if (c.peerSpeed > fastestSpeed && c.chokedUpload && c.interested) {
489                                                 fastest = c;
490                                                 fastestSpeed = c.peerSpeed;
491                                         }
492                                 }
493                                 peerChoosed = fastest;
494                         }
495                 }
496                 if (peerChoosed != null) {
497                         activePeers.put(peerChoosed.id,peerChoosed);
498                         peerChoosed.chokedUpload = false;
499                         peerChoosed.lastUnchoke = Msg.getClock();
500                         sendUnchoked(peerChoosed.mailbox);
501                 }
502         }
503         /**     
504          * Updates our "interested" state about peers: send "not interested" to peers
505          * that don't have any more pieces we want.
506          */
507         private void updateInterestedAfterReceive() {
508                 boolean interested;
509                 for (Connection connection : peers.values()) {
510                         interested = false;
511                         if (connection.amInterested) {
512                                 for (Integer piece : currentPieces) {
513                                         if (connection.bitfield[piece] == '1') {
514                                                 interested = true;
515                                                 break;
516                                         }
517                                 }       
518                                 if (!interested) {
519                                         connection.amInterested = false;
520                                         sendNotInterested(connection.mailbox);
521                                 }
522                         }
523                 }
524         }
525         private void updateBitfieldBlocks(int index, int blockIndex, int blockLength) {
526                 for (int i = blockIndex; i < (blockIndex + blockLength); i++) {
527                         bitfieldBlocks[index][i] = '1';
528                 }
529         }
530         /**
531          * Returns if a piece is complete in the peer's bitfield.
532          * @param index the index of the piece.
533          */
534         private boolean pieceComplete(int index) {
535                 for (int i = 0; i < bitfieldBlocks[index].length; i++) {
536                         if (bitfieldBlocks[index][i] == '0') {
537                                 return false;
538                         }
539                 }
540                 return true;
541         }
542         /**
543          * Returns the first block of a piece that we don't have. 
544          */
545         private int getFirstBlock(int piece) {
546                 int blockIndex = -1;
547                 for (int i = 0; i < Common.PIECES_BLOCKS; i++) {
548                         if (bitfieldBlocks[piece][i] == '0') {
549                                 blockIndex = i;
550                                 break;
551                         }
552                 }       
553                 return blockIndex;
554         }
555         /**
556          * Send request messages to a peer that have unchoked us
557          * @param remotePeer peer data to the peer we want to send the request
558          */
559         private void sendRequestsToPeer(Connection remotePeer) {
560                 if (remotePeer.bitfield == null) {
561                         return;
562                 }
563                 for (Integer piece : currentPieces) {
564                         //Getting the block to send.    
565                         int blockIndex = -1, blockLength = 0;
566                         blockIndex = getFirstBlock(piece);                      
567                         blockLength = Common.PIECES_BLOCKS - blockIndex ;
568                         blockLength = blockLength > Common.BLOCKS_REQUESTED ? Common.BLOCKS_REQUESTED : blockLength;            
569                         if (remotePeer.bitfield[piece] == '1') {
570                                 sendRequest(remotePeer.mailbox, piece, blockIndex, blockLength);
571                         }                       
572                 }
573         }       
574         /**
575          * Find the peers that have the current interested piece and send them
576          * the "interested" message
577          */
578         private void sendInterestedToPeers() {
579                 if (currentPiece == -1) {
580                         return;
581                 }
582                 for (Connection connection : peers.values()) {
583                         if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
584                                 connection.amInterested = true;                         
585                                 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
586                                 task.dsend(connection.mailbox);                         
587                         }
588                 }
589                 currentPiece = -1;
590                 piecesRequested++;
591         }
592         /**
593          * Send a "interested" message to a peer.
594          */
595         private void sendInterested(String mailbox) {
596                 MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
597                 task.dsend(mailbox);                                            
598         }
599         /**
600          * Send a "not interested" message to a peer
601          * @param mailbox mailbox destination mailbox
602          */
603         private void sendNotInterested(String mailbox) {
604                 MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
605                 task.dsend(mailbox);                            
606         }
607         /**
608          * Send a handshake message to all the peers the peer has.
609          * @param peer peer data
610          */
611         private void sendHandshakeAll() {
612                 for (Connection remotePeer : peers.values()) {
613                         MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
614                         id);
615                         task.dsend(remotePeer.mailbox);
616                 }
617         }
618         /**
619          * Send a "handshake" message to an user
620          * @param mailbox mailbox where to we send the message
621          */
622         private void sendHandshake(String mailbox) {
623                 Msg.debug("Sending a HANDSHAKE to " + mailbox);
624                 MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
625                 task.dsend(mailbox);            
626         }
627         /**
628          * Send a "choked" message to a peer
629          */
630         private void sendChoked(String mailbox) {
631                 Msg.debug("Sending a CHOKE to " + mailbox);
632                 MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
633                 task.dsend(mailbox);
634         }
635         /**
636          * Send a "unchoked" message to a peer
637          */
638         private void sendUnchoked(String mailbox) {
639                 Msg.debug("Sending a UNCHOKE to " + mailbox);
640                 MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
641                 task.dsend(mailbox);
642         }
643         /**
644          * Send a "HAVE" message to all peers we are connected to
645          */
646         private void sendHave(int piece) {
647                 Msg.debug("Sending HAVE message to all my peers");
648                 for (Connection remotePeer : peers.values()) {
649                         MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
650                         task.dsend(remotePeer.mailbox);
651                 }
652         }
653         /**
654          * Send a bitfield message to all the peers the peer has.
655          * @param peer peer data
656          */
657         private void sendBitfield(String mailbox) {
658                 Msg.debug("Sending a BITFIELD to " + mailbox);
659                 MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
660                 task.dsend(mailbox);
661         }
662         /**
663          * Send a "request" message to a pair, containing a request for a piece
664          */
665         private void sendRequest(String mailbox, int piece, int blockIndex, int blockLength) {
666                 Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece + " and blocks " + blockIndex + "," + (blockIndex + blockLength));
667                 MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece, blockIndex, blockLength);
668                 task.dsend(mailbox);
669         }
670         /**
671          * Send a "piece" message to a pair, containing a piece of the file
672          */
673         private void sendPiece(String mailbox, int piece, boolean stalled, int blockIndex, int blockLength) {
674                 Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
675                 MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled, blockIndex, blockLength);
676                 task.dsend(mailbox);
677         }
678         
679         private String getStatus() {
680                 String s = "";
681                 for (int i = 0; i < Common.FILE_PIECES; i++) {
682                         s = s + bitfield[i];
683                 }
684                 return s;
685         }
686 }
687