Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Remove unused notion of stalled messages for "piece" messages
authorNicolas Bonichon <bonichon@labri.fr>
Fri, 26 Apr 2013 15:30:19 +0000 (17:30 +0200)
committerNicolas Bonichon <bonichon@labri.fr>
Fri, 26 Apr 2013 17:30:45 +0000 (19:30 +0200)
Add some xtb_assert and correct a debug message

examples/msg/bittorrent/messages.c
examples/msg/bittorrent/messages.h
examples/msg/bittorrent/peer.c
examples/msg/bittorrent/peer.h

index 374b686..fae73a7 100644 (file)
@@ -77,14 +77,13 @@ msg_task_t task_message_request_new(const char *issuer_host_name,
 
 msg_task_t task_message_piece_new(const char *issuer_host_name,
                                   const char *mailbox, int peer_id, int index,
-                                  int stalled, int block_index,
+                                  int block_index,
                                   int block_length, int block_size)
 {
   msg_task_t task =
       task_message_index_new(MESSAGE_PIECE, issuer_host_name, mailbox, peer_id,
                              index, block_length * block_size);
   message_t message = MSG_task_get_data(task);
-  message->stalled = stalled;
   message->block_index = block_index;
   message->block_length = block_length;
   return task;
index 46e166b..6f191a4 100644 (file)
@@ -52,7 +52,6 @@ typedef struct s_message {
   int index;
   int block_index;
   int block_length;
-  int stalled:1;
 } s_message_t, *message_t;
 /**
  * Builds a new value-less message
@@ -86,7 +85,7 @@ msg_task_t task_message_request_new(const char *issuer_host_name,
  */
 msg_task_t task_message_piece_new(const char *issuer_host_name,
                                   const char *mailbox, int peer_id, int index,
-                                  int stalled, int block_index,
+                                  int block_index,
                                   int block_length, int block_size);
 /**
  * Free a message task
index 06fabc5..6f5481c 100644 (file)
@@ -23,6 +23,7 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
 #define FILE_PIECES  10
 #define PIECES_BLOCKS 5
 #define BLOCK_SIZE  16384
+#define ENABLE_END_GAME_MODE 1
 
 /**
  *  Number of blocks asked by each request
@@ -30,13 +31,10 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
 #define BLOCKS_REQUESTED 2
 
 
-static const int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
+static const unsigned long int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
 
-void request_new_piece_to_peer(peer_t peer, connection_t remote_peer);
-void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece);
-void remove_current_piece(peer_t peer, connection_t remote_peer,
-                          int current_piece);
 
+#define SLEEP_DURATION 1
 
   /**
  * Peer main function
@@ -119,12 +117,12 @@ void leech_loop(peer_t peer, double deadline)
         update_choked_peers(peer);
         next_choked_update += UPDATE_CHOKED_INTERVAL;
       } else {
-        MSG_process_sleep(1);
+        MSG_process_sleep(SLEEP_DURATION);
       }
     }
   }
-//  if (peer->pieces == FILE_PIECES)
-//    XBT_INFO("%d becomes a seeder", peer->id);
+  if (peer->pieces == FILE_PIECES)
+    XBT_DEBUG("%d becomes a seeder", peer->id);
 
 }
 
@@ -156,7 +154,7 @@ void seed_loop(peer_t peer, double deadline)
         //TODO: Change the choked peer algorithm when seeding.
         next_choked_update += UPDATE_CHOKED_INTERVAL;
       } else {
-        MSG_process_sleep(1);
+        MSG_process_sleep(SLEEP_DURATION);
       }
     }
   }
@@ -379,6 +377,7 @@ void handle_message(peer_t peer, msg_task_t task)
                "A non-in-our-list peer has sent us a message. WTH ?");
     XBT_DEBUG("Received a UNCHOKE message from %s (%s)", message->mailbox,
               message->issuer_host_name);
+    xbt_assert(remote_peer->choked_download, "WTF !!!");
     remote_peer->choked_download = 0;
     //Send requests to the peer, since it has unchoked us
     if (remote_peer->am_interested)
@@ -389,6 +388,7 @@ void handle_message(peer_t peer, msg_task_t task)
                "A non-in-our-list peer has sent us a message. WTH ?");
     XBT_DEBUG("Received a CHOKE message from %s (%s)", message->mailbox,
               message->issuer_host_name);
+    xbt_assert(!remote_peer->choked_download, "WTF !!!");
     remote_peer->choked_download = 1;
     remove_current_piece(peer, remote_peer, remote_peer->current_piece);
     break;
@@ -410,15 +410,17 @@ void handle_message(peer_t peer, msg_task_t task)
     }
     break;
   case MESSAGE_REQUEST:
+    xbt_assert(remote_peer->interested, "WTF !!!");
+
     xbt_assert((message->index >= 0
                 && message->index < FILE_PIECES), "Wrong request received");
     if (!remote_peer->choked_upload) {
       XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)",
-                message->mailbox, message->issuer_host_name, message->peer_id,
+                message->mailbox, message->issuer_host_name, message->index,
                 message->block_index,
                 message->block_index + message->block_length);
       if (peer->bitfield[message->index] == '1') {
-        send_piece(peer, message->mailbox, message->index, 0,
+        send_piece(peer, message->mailbox, message->index,
                    message->block_index, message->block_length);
       }
     } else {
@@ -427,17 +429,16 @@ void handle_message(peer_t peer, msg_task_t task)
     }
     break;
   case MESSAGE_PIECE:
+    XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
+              message->block_index,
+              message->block_index + message->block_length,
+              message->mailbox, message->issuer_host_name);
+    xbt_assert(!remote_peer->choked_download, "WTF !!!");
+    xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE, "Can't received a piece if I'm not interested wihtout end-game mode! piece (%d) bitfield(%s) remote bitfield(%s)", message->index, peer->bitfield, remote_peer->bitfield);
+    xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
     xbt_assert((message->index >= 0
                 && message->index < FILE_PIECES), "Wrong piece received");
     //TODO: Execute à computation.
-    if (message->stalled) {
-      XBT_DEBUG("The received piece %d from %s (%s) is STALLED",
-                message->index, message->mailbox, message->issuer_host_name);
-    } else {
-      XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
-                message->block_index,
-                message->block_index + message->block_length,
-                message->mailbox, message->issuer_host_name);
       if (peer->bitfield[message->index] == '0') {
         update_bitfield_blocks(peer, message->index, message->block_index,
                                message->block_length);
@@ -457,9 +458,9 @@ void handle_message(peer_t peer, msg_task_t task)
         }
       } else {
         XBT_DEBUG("However, we already have it");
+        xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
         request_new_piece_to_peer(peer, remote_peer);
       }
-    }
     break;
   case MESSAGE_CANCEL:
     XBT_DEBUG("The received CANCEL from %s (%s)",
@@ -477,6 +478,9 @@ void handle_message(peer_t peer, msg_task_t task)
   task_message_free(task);
 }
 
+/**
+ * Selects the appropriate piece to download and requests it to the remote_peer
+ */
 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
 {
   int piece = select_piece_to_download(peer, remote_peer);
@@ -486,10 +490,14 @@ void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
   }
 }
 
+/**
+ * remove current_piece from the list of currently downloaded pieces.
+ */
 void remove_current_piece(peer_t peer, connection_t remote_peer,
                           int current_piece)
 {
-  int piece_index = -1, piece, i;
+  int piece_index = -1, piece;
+  unsigned int i;
   xbt_dynar_foreach(peer->current_pieces, i, piece) {
     if (piece == current_piece) {
       piece_index = i;
@@ -542,6 +550,8 @@ int select_piece_to_download(peer_t peer, connection_t remote_peer)
   // end game mode
   if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces)
       && is_interested(peer, remote_peer)) {
+    if(!ENABLE_END_GAME_MODE)
+      return -1;
     int i;
     int nb_interesting_pieces = 0;
     int random_piece_index, current_index = 0;
@@ -765,8 +775,7 @@ void update_interested_after_receive(peer_t peer)
   char *key;
   xbt_dict_cursor_t cursor;
   connection_t connection;
-  unsigned cpt;
-  int interested, piece;
+  int interested;
   xbt_dict_foreach(peer->peers, cursor, key, connection) {
     interested = 0;
     if (connection->am_interested) {
@@ -888,7 +897,6 @@ int partially_downloaded_piece(peer_t peer, connection_t remote_peer)
 void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece)
 {
   remote_peer->current_piece = piece;
-  unsigned i;
   int block_index, block_length;
   xbt_assert(remote_peer->bitfield, "bitfield not received");
   xbt_assert(remote_peer->bitfield[piece] == '1', "WTF !!!");
@@ -1061,7 +1069,7 @@ void send_request(peer_t peer, const char *mailbox, int piece,
 /**
  * Send a "piece" message to a pair, containing a piece of the file
  */
-void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
+void send_piece(peer_t peer, const char *mailbox, int piece,
                 int block_index, int block_length)
 {
   XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index,
@@ -1071,6 +1079,6 @@ void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
              "Tried to send a piece that we doesn't have.");
   msg_task_t task =
       task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece,
-                             stalled, block_index, block_length, BLOCK_SIZE);
+                             block_index, block_length, BLOCK_SIZE);
   MSG_task_dsend(task, mailbox, task_message_free);
 }
index 31f21fc..07491a4 100644 (file)
@@ -75,7 +75,21 @@ int piece_complete(peer_t peer, int index);
 int get_first_block(peer_t peer, int piece);
 
 
-void send_requests_to_peer(peer_t peer, connection_t remote_peer);
+int nb_interested_peers(peer_t peer);
+int is_interested(peer_t peer, connection_t remote_peer);
+int is_interested_and_free(peer_t peer, connection_t remote_peer);
+int in_current_pieces(peer_t peer, int piece);
+int partially_downloaded_piece(peer_t peer, connection_t remote_peer);
+
+void request_new_piece_to_peer(peer_t peer, connection_t remote_peer);
+void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece);
+void remove_current_piece(peer_t peer, connection_t remote_peer,
+                          int current_piece);
+
+void update_active_peers_set(peer_t peer, connection_t remote_peer);
+int select_piece_to_download(peer_t peer, connection_t remote_peer);
+
+
 
 void send_interested_to_peers(peer_t peer);
 void send_handshake_all(peer_t peer);
@@ -91,8 +105,7 @@ void send_have(peer_t peer, int piece);
 
 void send_request(peer_t peer, const char *mailbox, int piece,
                   int block_index, int block_length);
-void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
+void send_piece(peer_t peer, const char *mailbox, int piece,
                 int block_index, int block_length);
 
-int in_current_pieces(peer_t peer, int piece);
 #endif                          /* BITTORRENT_PEER_H */