Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
few fixes and improvements in bittorrent
[simgrid.git] / examples / msg / app-bittorrent / peer.c
index 0ef1d35..4aaf7c0 100644 (file)
@@ -156,7 +156,8 @@ void seed_loop(peer_t peer, double deadline)
  */
 int get_peers_data(peer_t peer)
 {
  */
 int get_peers_data(peer_t peer)
 {
-  int success = 0, send_success = 0;
+  int success = 0;
+  int send_success = 0;
   double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT;
   //Build the task to send to the tracker
   tracker_task_data_t data = tracker_task_data_new(MSG_host_get_name(MSG_host_self()), peer->mailbox_tracker,
   double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT;
   //Build the task to send to the tracker
   tracker_task_data_t data = tracker_task_data_new(MSG_host_get_name(MSG_host_self()), peer->mailbox_tracker,
@@ -165,14 +166,14 @@ int get_peers_data(peer_t peer)
   msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data);
   msg_task_t task_received = NULL;
   msg_comm_t comm_received;
   msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data);
   msg_task_t task_received = NULL;
   msg_comm_t comm_received;
-  while (!send_success && MSG_get_clock() < timeout) {
+  while ((send_success == 0) && MSG_get_clock() < timeout) {
     XBT_DEBUG("Sending a peer request to the tracker.");
     msg_error_t status = MSG_task_send_with_timeout(task_send, TRACKER_MAILBOX, GET_PEERS_TIMEOUT);
     if (status == MSG_OK) {
       send_success = 1;
     }
   }
     XBT_DEBUG("Sending a peer request to the tracker.");
     msg_error_t status = MSG_task_send_with_timeout(task_send, TRACKER_MAILBOX, GET_PEERS_TIMEOUT);
     if (status == MSG_OK) {
       send_success = 1;
     }
   }
-  while (!success && MSG_get_clock() < timeout) {
+  while ((success ==0) && MSG_get_clock() < timeout) {
     comm_received = MSG_task_irecv(&task_received, peer->mailbox_tracker);
     msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
     if (status == MSG_OK) {
     comm_received = MSG_task_irecv(&task_received, peer->mailbox_tracker);
     msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
     if (status == MSG_OK) {
@@ -204,8 +205,8 @@ int get_peers_data(peer_t peer)
 void peer_init(peer_t peer, int id, int seed)
 {
   peer->id = id;
 void peer_init(peer_t peer, int id, int seed)
 {
   peer->id = id;
-  sprintf(peer->mailbox, "%d", id);
-  sprintf(peer->mailbox_tracker, "tracker_%d", id);
+  snprintf(peer->mailbox,MAILBOX_SIZE-1, "%d", id);
+  snprintf(peer->mailbox_tracker,MAILBOX_SIZE-1, "tracker_%d", id);
   peer->peers = xbt_dict_new();
   peer->active_peers = xbt_dict_new();
   peer->hostname = MSG_host_get_name(MSG_host_self());
   peer->peers = xbt_dict_new();
   peer->active_peers = xbt_dict_new();
   peer->hostname = MSG_host_get_name(MSG_host_self());
@@ -273,18 +274,11 @@ int nb_interested_peers(peer_t peer)
 
 void update_active_peers_set(peer_t peer, connection_t remote_peer)
 {
 
 void update_active_peers_set(peer_t peer, connection_t remote_peer)
 {
-  if (remote_peer->interested && !remote_peer->choked_upload) {
+  if ((remote_peer->interested != 0) && (remote_peer->choked_upload == 0)) {
     //add in the active peers set
     xbt_dict_set_ext(peer->active_peers, (char *) &remote_peer->id, sizeof(int), remote_peer, NULL);
     //add in the active peers set
     xbt_dict_set_ext(peer->active_peers, (char *) &remote_peer->id, sizeof(int), remote_peer, NULL);
-  } else {
-    //remove
-    xbt_ex_t e;
-    TRY {
-      xbt_dict_remove_ext(peer->active_peers, (char *) &remote_peer->id, sizeof(int));
-    }
-    CATCH(e) {
-      xbt_ex_free(e);
-    }
+  } else if (xbt_dict_get_or_null_ext(peer->active_peers, (char *) &remote_peer->id, sizeof(int))) {
+    xbt_dict_remove_ext(peer->active_peers, (char *) &remote_peer->id, sizeof(int));
   }
 }
 
   }
 }
 
@@ -301,7 +295,7 @@ void handle_message(peer_t peer, msg_task_t task)
   case MESSAGE_HANDSHAKE:
     XBT_DEBUG("Received a HANDSHAKE from %s (%s)", message->mailbox, message->issuer_host_name);
     //Check if the peer is in our connection list.
   case MESSAGE_HANDSHAKE:
     XBT_DEBUG("Received a HANDSHAKE from %s (%s)", message->mailbox, message->issuer_host_name);
     //Check if the peer is in our connection list.
-    if (!remote_peer) {
+    if (remote_peer == 0) {
       xbt_dict_set_ext(peer->peers, (char *) &message->peer_id, sizeof(int), connection_new(message->peer_id), NULL);
       send_handshake(peer, message->mailbox);
     }
       xbt_dict_set_ext(peer->peers, (char *) &message->peer_id, sizeof(int), connection_new(message->peer_id), NULL);
       send_handshake(peer, message->mailbox);
     }
@@ -357,10 +351,11 @@ void handle_message(peer_t peer, msg_task_t task)
     remote_peer->bitfield[message->index] = '1';
     peer->pieces_count[message->index]++;
     //If the piece is in our pieces, we tell the peer that we are interested.
     remote_peer->bitfield[message->index] = '1';
     peer->pieces_count[message->index]++;
     //If the piece is in our pieces, we tell the peer that we are interested.
-    if (!remote_peer->am_interested && peer->bitfield[message->index] == '0') {
+    if ((remote_peer->am_interested == 0) &&
+         peer->bitfield[message->index] == '0') {
       remote_peer->am_interested = 1;
       send_interested(peer, message->mailbox);
       remote_peer->am_interested = 1;
       send_interested(peer, message->mailbox);
-      if (!remote_peer->choked_download)
+      if (remote_peer->choked_download == 0)
         request_new_piece_to_peer(peer, remote_peer);
     }
     break;
         request_new_piece_to_peer(peer, remote_peer);
     }
     break;
@@ -368,7 +363,7 @@ void handle_message(peer_t peer, msg_task_t task)
     xbt_assert(remote_peer->interested, "WTF !!!");
 
     xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong request received");
     xbt_assert(remote_peer->interested, "WTF !!!");
 
     xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong request received");
-    if (!remote_peer->choked_upload) {
+    if (remote_peer->choked_upload == 0) {
       XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)", message->mailbox, message->issuer_host_name,
                 message->index, message->block_index, message->block_index + message->block_length);
       if (peer->bitfield[message->index] == '1') {
       XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)", message->mailbox, message->issuer_host_name,
                 message->index, message->block_index, message->block_index + message->block_length);
       if (peer->bitfield[message->index] == '1') {
@@ -414,6 +409,8 @@ void handle_message(peer_t peer, msg_task_t task)
   case MESSAGE_CANCEL:
     XBT_DEBUG("The received CANCEL from %s (%s)", message->mailbox, message->issuer_host_name);
     break;
   case MESSAGE_CANCEL:
     XBT_DEBUG("The received CANCEL from %s (%s)", message->mailbox, message->issuer_host_name);
     break;
+  default:
+    THROW_IMPOSSIBLE;
   }
   //Update the peer speed.
   if (remote_peer) {
   }
   //Update the peer speed.
   if (remote_peer) {
@@ -437,14 +434,7 @@ void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
 /** remove current_piece from the list of currently downloaded pieces. */
 void remove_current_piece(peer_t peer, connection_t remote_peer, int current_piece)
 {
 /** remove current_piece from the list of currently downloaded pieces. */
 void remove_current_piece(peer_t peer, connection_t remote_peer, int current_piece)
 {
-  int piece_index = -1, piece;
-  unsigned int i;
-  xbt_dynar_foreach(peer->current_pieces, i, piece) {
-    if (piece == current_piece) {
-      piece_index = i;
-      break;
-    }
-  }
+  int piece_index = xbt_dynar_search_or_negative(peer->current_pieces, &current_piece);
   if (piece_index != -1)
     xbt_dynar_remove_at(peer->current_pieces, piece_index, NULL);
   remote_peer->current_piece = -1;
   if (piece_index != -1)
     xbt_dynar_remove_at(peer->current_pieces, piece_index, NULL);
   remote_peer->current_piece = -1;
@@ -484,8 +474,9 @@ int select_piece_to_download(peer_t peer, connection_t remote_peer)
     return piece;
 
   // end game mode
     return piece;
 
   // end game mode
-  if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces) && is_interested(peer, remote_peer)) {
-    if(!ENABLE_END_GAME_MODE)
+  if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces) &&
+      (is_interested(peer, remote_peer) != 0)) {
+    if(ENABLE_END_GAME_MODE == 0)
       return -1;
     int i;
     int nb_interesting_pieces = 0;
       return -1;
     int i;
     int nb_interesting_pieces = 0;
@@ -512,13 +503,14 @@ int select_piece_to_download(peer_t peer, connection_t remote_peer)
     return piece;
   }
   // Random first policy
     return piece;
   }
   // Random first policy
-  if (peer->pieces < 4 && is_interested_and_free(peer, remote_peer)) {
+  if (peer->pieces < 4 && (is_interested_and_free(peer, remote_peer) != 0)) {
     int i;
     int nb_interesting_pieces = 0;
     int random_piece_index, current_index = 0;
     // compute the number of interesting pieces
     for (i = 0; i < FILE_PIECES; i++) {
     int i;
     int nb_interesting_pieces = 0;
     int random_piece_index, current_index = 0;
     // compute the number of interesting pieces
     for (i = 0; i < FILE_PIECES; i++) {
-      if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i)) {
+      if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1' &&
+          (in_current_pieces(peer, i) == 0)) {
         nb_interesting_pieces++;
       }
     }
         nb_interesting_pieces++;
       }
     }
@@ -526,7 +518,8 @@ int select_piece_to_download(peer_t peer, connection_t remote_peer)
     // get a random interesting piece
     random_piece_index = RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
     for (i = 0; i < FILE_PIECES; i++) {
     // get a random interesting piece
     random_piece_index = RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
     for (i = 0; i < FILE_PIECES; i++) {
-      if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1' && !in_current_pieces(peer, i)) {
+      if (peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1' &&
+          (in_current_pieces(peer, i) == 0)) {
         if (random_piece_index == current_index) {
           piece = i;
           break;
         if (random_piece_index == current_index) {
           piece = i;
           break;
@@ -543,23 +536,25 @@ int select_piece_to_download(peer_t peer, connection_t remote_peer)
     int random_rarest_index, current_index = 0;
     // compute the smallest number of copies of available pieces
     for (i = 0; i < FILE_PIECES; i++) {
     int random_rarest_index, current_index = 0;
     // compute the smallest number of copies of available pieces
     for (i = 0; i < FILE_PIECES; i++) {
-      if (peer->pieces_count[i] < min && peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1'
-          && !in_current_pieces(peer, i))
+      if (peer->pieces_count[i] < min && peer->bitfield[i] == '0' &&
+          remote_peer->bitfield[i] == '1' && (in_current_pieces(peer, i) == 0))
         min = peer->pieces_count[i];
     }
         min = peer->pieces_count[i];
     }
-    xbt_assert(min != SHRT_MAX || !is_interested_and_free(peer, remote_peer), "WTF !!!");
+    xbt_assert(min != SHRT_MAX ||
+               (is_interested_and_free(peer, remote_peer) ==0), "WTF !!!");
     // compute the number of rarest pieces
     for (i = 0; i < FILE_PIECES; i++) {
     // compute the number of rarest pieces
     for (i = 0; i < FILE_PIECES; i++) {
-      if (peer->pieces_count[i] == min && peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1'
-          && !in_current_pieces(peer, i))
+      if (peer->pieces_count[i] == min && peer->bitfield[i] == '0' &&
+          remote_peer->bitfield[i] == '1' && (in_current_pieces(peer, i) ==0))
         nb_min_pieces++;
     }
         nb_min_pieces++;
     }
-    xbt_assert(nb_min_pieces != 0 || !is_interested_and_free(peer, remote_peer), "WTF !!!");
+    xbt_assert(nb_min_pieces != 0 ||
+               (is_interested_and_free(peer, remote_peer)==0), "WTF !!!");
     // get a random rarest piece
     random_rarest_index = RngStream_RandInt(peer->stream, 0, nb_min_pieces - 1);
     for (i = 0; i < FILE_PIECES; i++) {
     // get a random rarest piece
     random_rarest_index = RngStream_RandInt(peer->stream, 0, nb_min_pieces - 1);
     for (i = 0; i < FILE_PIECES; i++) {
-      if (peer->pieces_count[i] == min && peer->bitfield[i] == '0' && remote_peer->bitfield[i] == '1'
-          && !in_current_pieces(peer, i)) {
+      if (peer->pieces_count[i] == min && peer->bitfield[i] == '0' &&
+          remote_peer->bitfield[i] == '1' && (in_current_pieces(peer, i)==0)) {
         if (random_rarest_index == current_index) {
           piece = i;
           break;
         if (random_rarest_index == current_index) {
           piece = i;
           break;
@@ -567,7 +562,8 @@ int select_piece_to_download(peer_t peer, connection_t remote_peer)
         current_index++;
       }
     }
         current_index++;
       }
     }
-    xbt_assert(piece != -1 || !is_interested_and_free(peer, remote_peer), "WTF !!!");
+    xbt_assert(piece != -1 ||
+               (is_interested_and_free(peer, remote_peer) == 0), "WTF !!!");
     return piece;
   }
 }
     return piece;
   }
 }
@@ -600,7 +596,8 @@ void update_choked_peers(peer_t peer)
     double unchoke_time = MSG_get_clock() + 1;
 
     xbt_dict_foreach(peer->peers, cursor, key, connection) {
     double unchoke_time = MSG_get_clock() + 1;
 
     xbt_dict_foreach(peer->peers, cursor, key, connection) {
-      if (connection->last_unchoke < unchoke_time && connection->interested && connection->choked_upload) {
+      if (connection->last_unchoke < unchoke_time &&
+          (connection->interested != 0) && (connection->choked_upload != 0)) {
         unchoke_time = connection->last_unchoke;
         peer_choosed = connection;
       }
         unchoke_time = connection->last_unchoke;
         peer_choosed = connection;
       }
@@ -622,7 +619,8 @@ void update_choked_peers(peer_t peer)
           i++;
         }
         xbt_dict_cursor_free(&cursor);
           i++;
         }
         xbt_dict_cursor_free(&cursor);
-        if (!peer_choosed->interested || !peer_choosed->choked_upload) {
+        if ((peer_choosed->interested == 0) ||
+            (peer_choosed->choked_upload == 0)) {
           peer_choosed = NULL;
         }
         j++;
           peer_choosed = NULL;
         }
         j++;
@@ -632,7 +630,8 @@ void update_choked_peers(peer_t peer)
       connection_t connection;
       double fastest_speed = 0.0;
       xbt_dict_foreach(peer->peers, cursor, key, connection) {
       connection_t connection;
       double fastest_speed = 0.0;
       xbt_dict_foreach(peer->peers, cursor, key, connection) {
-        if (connection->peer_speed > fastest_speed && connection->choked_upload && connection->interested) {
+        if (connection->peer_speed > fastest_speed &&
+            (connection->choked_upload != 0) && (connection->interested != 0)) {
           peer_choosed = connection;
           fastest_speed = connection->peer_speed;
         }
           peer_choosed = connection;
           fastest_speed = connection->peer_speed;
         }
@@ -676,7 +675,7 @@ void update_interested_after_receive(peer_t peer)
   int interested;
   xbt_dict_foreach(peer->peers, cursor, key, connection) {
     interested = 0;
   int interested;
   xbt_dict_foreach(peer->peers, cursor, key, connection) {
     interested = 0;
-    if (connection->am_interested) {
+    if (connection->am_interested != 0) {
       xbt_assert(connection->bitfield, "Bitfield not received");
       //Check if the peer still has a piece we want.
       int i;
       xbt_assert(connection->bitfield, "Bitfield not received");
       //Check if the peer still has a piece we want.
       int i;
@@ -732,8 +731,7 @@ int get_first_block(peer_t peer, int piece)
 int is_interested(peer_t peer, connection_t remote_peer)
 {
   xbt_assert(remote_peer->bitfield, "Bitfield not received");
 int is_interested(peer_t peer, connection_t remote_peer)
 {
   xbt_assert(remote_peer->bitfield, "Bitfield not received");
-  int i;
-  for (i = 0; i < FILE_PIECES; i++) {
+  for (int i = 0; i < FILE_PIECES; i++) {
     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0') {
       return 1;
     }
     if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0') {
       return 1;
     }
@@ -745,9 +743,9 @@ int is_interested(peer_t peer, connection_t remote_peer)
 int is_interested_and_free(peer_t peer, connection_t remote_peer)
 {
   xbt_assert(remote_peer->bitfield, "Bitfield not received");
 int is_interested_and_free(peer_t peer, connection_t remote_peer)
 {
   xbt_assert(remote_peer->bitfield, "Bitfield not received");
-  int i;
-  for (i = 0; i < FILE_PIECES; i++) {
-    if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0' && !in_current_pieces(peer, i)) {
+  for (int i = 0; i < FILE_PIECES; i++) {
+    if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0' &&
+        (in_current_pieces(peer, i) == 0)) {
       return 1;
     }
   }
       return 1;
     }
   }
@@ -758,9 +756,9 @@ int is_interested_and_free(peer_t peer, connection_t remote_peer)
 int partially_downloaded_piece(peer_t peer, connection_t remote_peer)
 {
   xbt_assert(remote_peer->bitfield, "Bitfield not received");
 int partially_downloaded_piece(peer_t peer, connection_t remote_peer)
 {
   xbt_assert(remote_peer->bitfield, "Bitfield not received");
-  int i;
-  for (i = 0; i < FILE_PIECES; i++) {
-    if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0' && !in_current_pieces(peer, i)) {
+  for (int i = 0; i < FILE_PIECES; i++) {
+    if (remote_peer->bitfield[i] == '1' && peer->bitfield[i] == '0' &&
+        (in_current_pieces(peer, i) == 0)) {
       if (get_first_block(peer, i) > 0)
         return i;
     }
       if (get_first_block(peer, i) > 0)
         return i;
     }
@@ -789,14 +787,7 @@ void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece)
 /** Indicates if a piece is currently being downloaded by the peer. */
 int in_current_pieces(peer_t peer, int piece)
 {
 /** Indicates if a piece is currently being downloaded by the peer. */
 int in_current_pieces(peer_t peer, int piece)
 {
-  unsigned i;
-  int peer_piece;
-  xbt_dynar_foreach(peer->current_pieces, i, peer_piece) {
-    if (peer_piece == piece) {
-      return 1;
-    }
-  }
-  return 0;
+  return xbt_dynar_member(peer->current_pieces, &piece);
 }
 
 /***********************************************************
 }
 
 /***********************************************************
@@ -805,8 +796,6 @@ int in_current_pieces(peer_t peer, int piece)
  *
  ***********************************************************/
 
  *
  ***********************************************************/
 
-
-
 /** @brief Send a "interested" message to a peer
  *  @param peer peer data
  *  @param mailbox destination mailbox
 /** @brief Send a "interested" message to a peer
  *  @param peer peer data
  *  @param mailbox destination mailbox