Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Remove unused notion of stalled messages for "piece" messages
[simgrid.git] / examples / msg / bittorrent / peer.c
index 06fabc5..6f5481c 100644 (file)
@@ -23,6 +23,7 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
 #define FILE_PIECES  10
 #define PIECES_BLOCKS 5
 #define BLOCK_SIZE  16384
+#define ENABLE_END_GAME_MODE 1
 
 /**
  *  Number of blocks asked by each request
@@ -30,13 +31,10 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
 #define BLOCKS_REQUESTED 2
 
 
-static const int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
+static const unsigned long int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
 
-void request_new_piece_to_peer(peer_t peer, connection_t remote_peer);
-void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece);
-void remove_current_piece(peer_t peer, connection_t remote_peer,
-                          int current_piece);
 
+#define SLEEP_DURATION 1
 
   /**
  * Peer main function
@@ -119,12 +117,12 @@ void leech_loop(peer_t peer, double deadline)
         update_choked_peers(peer);
         next_choked_update += UPDATE_CHOKED_INTERVAL;
       } else {
-        MSG_process_sleep(1);
+        MSG_process_sleep(SLEEP_DURATION);
       }
     }
   }
-//  if (peer->pieces == FILE_PIECES)
-//    XBT_INFO("%d becomes a seeder", peer->id);
+  if (peer->pieces == FILE_PIECES)
+    XBT_DEBUG("%d becomes a seeder", peer->id);
 
 }
 
@@ -156,7 +154,7 @@ void seed_loop(peer_t peer, double deadline)
         //TODO: Change the choked peer algorithm when seeding.
         next_choked_update += UPDATE_CHOKED_INTERVAL;
       } else {
-        MSG_process_sleep(1);
+        MSG_process_sleep(SLEEP_DURATION);
       }
     }
   }
@@ -379,6 +377,7 @@ void handle_message(peer_t peer, msg_task_t task)
                "A non-in-our-list peer has sent us a message. WTH ?");
     XBT_DEBUG("Received a UNCHOKE message from %s (%s)", message->mailbox,
               message->issuer_host_name);
+    xbt_assert(remote_peer->choked_download, "WTF !!!");
     remote_peer->choked_download = 0;
     //Send requests to the peer, since it has unchoked us
     if (remote_peer->am_interested)
@@ -389,6 +388,7 @@ void handle_message(peer_t peer, msg_task_t task)
                "A non-in-our-list peer has sent us a message. WTH ?");
     XBT_DEBUG("Received a CHOKE message from %s (%s)", message->mailbox,
               message->issuer_host_name);
+    xbt_assert(!remote_peer->choked_download, "WTF !!!");
     remote_peer->choked_download = 1;
     remove_current_piece(peer, remote_peer, remote_peer->current_piece);
     break;
@@ -410,15 +410,17 @@ void handle_message(peer_t peer, msg_task_t task)
     }
     break;
   case MESSAGE_REQUEST:
+    xbt_assert(remote_peer->interested, "WTF !!!");
+
     xbt_assert((message->index >= 0
                 && message->index < FILE_PIECES), "Wrong request received");
     if (!remote_peer->choked_upload) {
       XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)",
-                message->mailbox, message->issuer_host_name, message->peer_id,
+                message->mailbox, message->issuer_host_name, message->index,
                 message->block_index,
                 message->block_index + message->block_length);
       if (peer->bitfield[message->index] == '1') {
-        send_piece(peer, message->mailbox, message->index, 0,
+        send_piece(peer, message->mailbox, message->index,
                    message->block_index, message->block_length);
       }
     } else {
@@ -427,17 +429,16 @@ void handle_message(peer_t peer, msg_task_t task)
     }
     break;
   case MESSAGE_PIECE:
+    XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
+              message->block_index,
+              message->block_index + message->block_length,
+              message->mailbox, message->issuer_host_name);
+    xbt_assert(!remote_peer->choked_download, "WTF !!!");
+    xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE, "Can't received a piece if I'm not interested wihtout end-game mode! piece (%d) bitfield(%s) remote bitfield(%s)", message->index, peer->bitfield, remote_peer->bitfield);
+    xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
     xbt_assert((message->index >= 0
                 && message->index < FILE_PIECES), "Wrong piece received");
     //TODO: Execute à computation.
-    if (message->stalled) {
-      XBT_DEBUG("The received piece %d from %s (%s) is STALLED",
-                message->index, message->mailbox, message->issuer_host_name);
-    } else {
-      XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
-                message->block_index,
-                message->block_index + message->block_length,
-                message->mailbox, message->issuer_host_name);
       if (peer->bitfield[message->index] == '0') {
         update_bitfield_blocks(peer, message->index, message->block_index,
                                message->block_length);
@@ -457,9 +458,9 @@ void handle_message(peer_t peer, msg_task_t task)
         }
       } else {
         XBT_DEBUG("However, we already have it");
+        xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
         request_new_piece_to_peer(peer, remote_peer);
       }
-    }
     break;
   case MESSAGE_CANCEL:
     XBT_DEBUG("The received CANCEL from %s (%s)",
@@ -477,6 +478,9 @@ void handle_message(peer_t peer, msg_task_t task)
   task_message_free(task);
 }
 
+/**
+ * Selects the appropriate piece to download and requests it to the remote_peer
+ */
 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
 {
   int piece = select_piece_to_download(peer, remote_peer);
@@ -486,10 +490,14 @@ void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
   }
 }
 
+/**
+ * remove current_piece from the list of currently downloaded pieces.
+ */
 void remove_current_piece(peer_t peer, connection_t remote_peer,
                           int current_piece)
 {
-  int piece_index = -1, piece, i;
+  int piece_index = -1, piece;
+  unsigned int i;
   xbt_dynar_foreach(peer->current_pieces, i, piece) {
     if (piece == current_piece) {
       piece_index = i;
@@ -542,6 +550,8 @@ int select_piece_to_download(peer_t peer, connection_t remote_peer)
   // end game mode
   if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces)
       && is_interested(peer, remote_peer)) {
+    if(!ENABLE_END_GAME_MODE)
+      return -1;
     int i;
     int nb_interesting_pieces = 0;
     int random_piece_index, current_index = 0;
@@ -765,8 +775,7 @@ void update_interested_after_receive(peer_t peer)
   char *key;
   xbt_dict_cursor_t cursor;
   connection_t connection;
-  unsigned cpt;
-  int interested, piece;
+  int interested;
   xbt_dict_foreach(peer->peers, cursor, key, connection) {
     interested = 0;
     if (connection->am_interested) {
@@ -888,7 +897,6 @@ int partially_downloaded_piece(peer_t peer, connection_t remote_peer)
 void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece)
 {
   remote_peer->current_piece = piece;
-  unsigned i;
   int block_index, block_length;
   xbt_assert(remote_peer->bitfield, "bitfield not received");
   xbt_assert(remote_peer->bitfield[piece] == '1', "WTF !!!");
@@ -1061,7 +1069,7 @@ void send_request(peer_t peer, const char *mailbox, int piece,
 /**
  * Send a "piece" message to a pair, containing a piece of the file
  */
-void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
+void send_piece(peer_t peer, const char *mailbox, int piece,
                 int block_index, int block_length)
 {
   XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index,
@@ -1071,6 +1079,6 @@ void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
              "Tried to send a piece that we doesn't have.");
   msg_task_t task =
       task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece,
-                             stalled, block_index, block_length, BLOCK_SIZE);
+                             block_index, block_length, BLOCK_SIZE);
   MSG_task_dsend(task, mailbox, task_message_free);
 }