Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
BitTorrent changes:
[simgrid.git] / examples / msg / bittorrent / peer.c
index 70e2a98..f770a85 100644 (file)
 
 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
 
+//TODO: Let users change this
+/*
+ * File transfered data
+ *
+ * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
+ */
+static int FILE_SIZE = 10 * 5 * 16384;
+static int FILE_PIECES = 10;
 
+static int PIECES_BLOCKS = 5;
+static int BLOCK_SIZE = 16384;
+static int BLOCKS_REQUESTED = 2;
 
 /**
  * Peer main function
@@ -171,7 +182,8 @@ int get_peers_data(peer_t peer)
     msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
     if (status == MSG_OK) {
       tracker_task_data_t data = MSG_task_get_data(task_received);
-      int i, peer_id;
+      unsigned i;
+      int peer_id;
       //Add the peers the tracker gave us to our peer list.
       xbt_dynar_foreach(data->peers, i, peer_id) {
         if (peer_id != peer->id)
@@ -392,7 +404,8 @@ void handle_message(peer_t peer, msg_task_t task)
         if (piece_complete(peer, message->index)) {
           peer->pieces_requested--;
           //Removing the piece from our piece list
-          int piece_index = -1, i, piece;
+          unsigned i;
+          int piece_index = -1, piece;
           xbt_dynar_foreach(peer->current_pieces, i, piece) {
             if (piece == message->index) {
               piece_index = i;
@@ -415,6 +428,8 @@ void handle_message(peer_t peer, msg_task_t task)
       }
     }
     break;
+  case MESSAGE_CANCEL:
+    break;
   }
   //Update the peer speed.
   if (remote_peer) {
@@ -446,7 +461,7 @@ void wait_for_pieces(peer_t peer, double deadline)
     MSG_comm_destroy(peer->comm_received);
     peer->comm_received = NULL;
     if (status == MSG_OK) {
-      message_t message = MSG_task_get_data(peer->task_received);
+      MSG_task_get_data(peer->task_received);
       handle_message(peer, peer->task_received);
       if (peer->current_piece != -1) {
         finished = 1;
@@ -513,9 +528,8 @@ void update_current_piece(peer_t peer)
   }
   xbt_dynar_push_as(peer->current_pieces, int, peer->current_piece);
   XBT_DEBUG("New interested piece: %d", peer->current_piece);
-  xbt_assert((peer->current_piece >= 0
-              && peer->current_piece < FILE_PIECES,
-              "Peer want to retrieve a piece that doesn't exist."));
+  xbt_assert((peer->current_piece >= 0 && peer->current_piece < FILE_PIECES),
+             "Peer want to retrieve a piece that doesn't exist.");
 }
 
 /**
@@ -527,7 +541,6 @@ void update_choked_peers(peer_t peer)
 {
   //update the current round
   peer->round = (peer->round + 1) % 3;
-  int i;
   char *key;
   connection_t peer_choosed = NULL;
   //remove a peer from the list
@@ -616,7 +629,8 @@ void update_interested_after_receive(peer_t peer)
   char *key;
   xbt_dict_cursor_t cursor;
   connection_t connection;
-  int interested, cpt, piece;
+  unsigned cpt;
+  int interested, piece;
   xbt_dict_foreach(peer->peers, cursor, key, connection) {
     interested = 0;
     if (connection->am_interested) {
@@ -684,7 +698,8 @@ int get_first_block(peer_t peer, int piece)
  */
 void send_requests_to_peer(peer_t peer, connection_t remote_peer)
 {
-  int i, piece, block_index, block_length;
+  unsigned i;
+  int piece, block_index, block_length;
   xbt_dynar_foreach(peer->current_pieces, i, piece) {
     if (remote_peer->bitfield && remote_peer->bitfield[piece] == '1') {
       block_index = get_first_block(peer, piece);
@@ -716,7 +731,7 @@ void send_interested_to_peers(peer_t peer)
       connection->am_interested = 1;
       msg_task_t task =
           task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
-                           peer->id);
+                           peer->id, task_message_size(MESSAGE_INTERESTED));
       MSG_task_dsend(task, connection->mailbox, task_message_free);
       XBT_DEBUG("Send INTERESTED to %s", connection->mailbox);
     }
@@ -734,7 +749,7 @@ void send_interested(peer_t peer, const char *mailbox)
 {
   msg_task_t task =
       task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
-                       peer->id);
+                       peer->id, task_message_size(MESSAGE_INTERESTED));
   MSG_task_dsend(task, mailbox, task_message_free);
   XBT_DEBUG("Sending INTERESTED to %s", mailbox);
 
@@ -749,7 +764,7 @@ void send_notinterested(peer_t peer, const char *mailbox)
 {
   msg_task_t task =
       task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox,
-                       peer->id);
+                       peer->id, task_message_size(MESSAGE_NOTINTERESTED));
   MSG_task_dsend(task, mailbox, task_message_free);
   XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox);
 
@@ -767,7 +782,7 @@ void send_handshake_all(peer_t peer)
   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
     msg_task_t task =
         task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
-                         peer->id);
+                         peer->id, task_message_size(MESSAGE_HANDSHAKE));
     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
     XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox);
   }
@@ -782,7 +797,7 @@ void send_handshake(peer_t peer, const char *mailbox)
 {
   msg_task_t task =
       task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
-                       peer->id);
+                       peer->id, task_message_size(MESSAGE_HANDSHAKE));
   MSG_task_dsend(task, mailbox, task_message_free);
   XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox);
 }
@@ -794,7 +809,8 @@ void send_choked(peer_t peer, const char *mailbox)
 {
   XBT_DEBUG("Sending a CHOKE to %s", mailbox);
   msg_task_t task =
-      task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, peer->id);
+      task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, 
+                       peer->id, task_message_size(MESSAGE_CHOKE));
   MSG_task_dsend(task, mailbox, task_message_free);
 }
 
@@ -806,7 +822,7 @@ void send_unchoked(peer_t peer, const char *mailbox)
   XBT_DEBUG("Sending a UNCHOKE to %s", mailbox);
   msg_task_t task =
       task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox,
-                       peer->id);
+                       peer->id, task_message_size(MESSAGE_UNCHOKE));
   MSG_task_dsend(task, mailbox, task_message_free);
 }
 
@@ -822,7 +838,7 @@ void send_have(peer_t peer, int piece)
   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
     msg_task_t task =
         task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox,
-                               peer->id, piece);
+                               peer->id, piece, task_message_size(MESSAGE_HAVE));
     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
   }
 }
@@ -836,7 +852,7 @@ void send_bitfield(peer_t peer, const char *mailbox)
   XBT_DEBUG("Sending a BITFIELD to %s", mailbox);
   msg_task_t task =
       task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id,
-                                peer->bitfield);
+                                peer->bitfield, FILE_PIECES);
   MSG_task_dsend(task, mailbox, task_message_free);
 }
 
@@ -867,13 +883,14 @@ void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
              "Tried to send a piece that we doesn't have.");
   msg_task_t task =
       task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece,
-                             stalled, block_index, block_length);
+                             stalled, block_index, block_length, BLOCK_SIZE);
   MSG_task_dsend(task, mailbox, task_message_free);
 }
 
 int in_current_pieces(peer_t peer, int piece)
 {
-  int is_in = 0, i, peer_piece;
+  unsigned i;
+  int is_in = 0, peer_piece;
   xbt_dynar_foreach(peer->current_pieces, i, peer_piece) {
     if (peer_piece == piece) {
       is_in = 1;