Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines with new year.
[simgrid.git] / teshsuite / msg / app-bittorrent / bittorrent-peer.c
index 69818a5..4c2bdef 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2012-2018. The SimGrid Team. All rights reserved.          */
+/* Copyright (c) 2012-2020. The SimGrid Team. All rights reserved.          */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
@@ -8,7 +8,6 @@
 #include "connection.h"
 #include "tracker.h"
 #include <simgrid/msg.h>
-#include <xbt/RngStream.h>
 
 #include <limits.h>
 #include <stdio.h> /* snprintf */
@@ -27,7 +26,6 @@ static const unsigned long int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_S
 /** Number of blocks asked by each request */
 #define BLOCKS_REQUESTED 2
 
-#define ENABLE_END_GAME_MODE 1
 #define SLEEP_DURATION 1
 
 int count_pieces(unsigned int bitfield)
@@ -41,13 +39,13 @@ int count_pieces(unsigned int bitfield)
   return count;
 }
 
-int peer_has_not_piece(peer_t peer, unsigned int piece)
+int peer_has_not_piece(const s_peer_t* peer, unsigned int piece)
 {
   return !(peer->bitfield & 1U << piece);
 }
 
 /** Check that a piece is not currently being download by the peer. */
-int peer_is_not_downloading_piece(peer_t peer, unsigned int piece)
+int peer_is_not_downloading_piece(const s_peer_t* peer, unsigned int piece)
 {
   return !(peer->current_pieces & 1U << piece);
 }
@@ -176,7 +174,7 @@ void seed_loop(peer_t peer, double deadline)
 /** @brief Retrieves the peer list from the tracker
  *  @param peer current peer data
  */
-int get_peers_data(peer_t peer)
+int get_peers_data(const s_peer_t* peer)
 {
   int success    = 0;
   double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT;
@@ -205,7 +203,7 @@ int get_peers_data(peer_t peer)
       // Add the peers the tracker gave us to our peer list.
       xbt_dynar_foreach (data->peers, i, peer_id) {
         if (peer_id != peer->id)
-          xbt_dict_set_ext(peer->peers, (char*)&peer_id, sizeof(int), connection_new(peer_id), NULL);
+          xbt_dict_set_ext(peer->peers, (char*)&peer_id, sizeof(int), connection_new(peer_id));
       }
       success = 1;
       // free the communication and the task
@@ -246,7 +244,6 @@ peer_t peer_init(int id, int seed)
 
   peer->pieces_count = xbt_new0(short, FILE_PIECES);
 
-  peer->stream        = (RngStream)MSG_host_get_data(MSG_host_self());
   peer->comm_received = NULL;
 
   peer->round = 0;
@@ -277,7 +274,7 @@ int has_finished(unsigned int bitfield)
   return bitfield == (1U << FILE_PIECES) - 1U;
 }
 
-int nb_interested_peers(peer_t peer)
+int nb_interested_peers(const s_peer_t* peer)
 {
   xbt_dict_cursor_t cursor = NULL;
   char* key;
@@ -290,11 +287,11 @@ int nb_interested_peers(peer_t peer)
   return nb;
 }
 
-void update_active_peers_set(peer_t peer, connection_t remote_peer)
+void update_active_peers_set(const s_peer_t* peer, connection_t remote_peer)
 {
   if ((remote_peer->interested != 0) && (remote_peer->choked_upload == 0)) {
     // add in the active peers set
-    xbt_dict_set_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int), remote_peer, NULL);
+    xbt_dict_set_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int), remote_peer);
   } else if (xbt_dict_get_or_null_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int))) {
     xbt_dict_remove_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int));
   }
@@ -319,7 +316,7 @@ void handle_message(peer_t peer, msg_task_t task)
     case MESSAGE_HANDSHAKE:
       // Check if the peer is in our connection list.
       if (remote_peer == 0) {
-        xbt_dict_set_ext(peer->peers, (char*)&message->peer_id, sizeof(int), connection_new(message->peer_id), NULL);
+        xbt_dict_set_ext(peer->peers, (char*)&message->peer_id, sizeof(int), connection_new(message->peer_id));
         send_handshake(peer, message->mailbox);
       }
       // Send our bitfield to the peer
@@ -392,10 +389,6 @@ void handle_message(peer_t peer, msg_task_t task)
       XBT_DEBUG(" \t for piece %d (%d,%d)", message->index, message->block_index,
                 message->block_index + message->block_length);
       xbt_assert(!remote_peer->choked_download);
-      xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE,
-                 "Can't received a piece if I'm not interested wihtout end-game mode!"
-                 "piece (%d) bitfield(%u) remote bitfield(%u)",
-                 message->index, peer->bitfield, remote_peer->bitfield);
       xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
       xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong piece received");
       // TODO: Execute à computation.
@@ -419,7 +412,6 @@ void handle_message(peer_t peer, msg_task_t task)
         }
       } else {
         XBT_DEBUG("However, we already have it");
-        xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
         request_new_piece_to_peer(peer, remote_peer);
       }
       break;
@@ -458,7 +450,7 @@ void remove_current_piece(peer_t peer, connection_t remote_peer, unsigned int cu
  *  @param peer peer we want to update the list
  *  @param bitfield bitfield
  */
-void update_pieces_count_from_bitfield(peer_t peer, unsigned int bitfield)
+void update_pieces_count_from_bitfield(const s_peer_t* peer, unsigned int bitfield)
 {
   for (int i = 0; i < FILE_PIECES; i++) {
     if (bitfield & (1U << i)) {
@@ -477,7 +469,7 @@ void update_pieces_count_from_bitfield(peer_t peer, unsigned int bitfield)
  * @param remote_peer: information about the connection
  * @return the piece to download if possible. -1 otherwise
  */
-int select_piece_to_download(peer_t peer, connection_t remote_peer)
+int select_piece_to_download(const s_peer_t* peer, const s_connection_t* remote_peer)
 {
   int piece = partially_downloaded_piece(peer, remote_peer);
   // strict priority policy
@@ -487,9 +479,6 @@ int select_piece_to_download(peer_t peer, connection_t remote_peer)
   // end game mode
   if (count_pieces(peer->current_pieces) >= (FILE_PIECES - count_pieces(peer->bitfield)) &&
       (is_interested(peer, remote_peer) != 0)) {
-#if ENABLE_END_GAME_MODE == 0
-    return -1;
-#endif
     int nb_interesting_pieces = 0;
     // compute the number of interesting pieces
     for (int i = 0; i < FILE_PIECES; i++) {
@@ -499,7 +488,7 @@ int select_piece_to_download(peer_t peer, connection_t remote_peer)
     }
     xbt_assert(nb_interesting_pieces != 0);
     // get a random interesting piece
-    int random_piece_index = RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
+    int random_piece_index = rand() % nb_interesting_pieces;
     int current_index      = 0;
     for (int i = 0; i < FILE_PIECES; i++) {
       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i)) {
@@ -525,7 +514,7 @@ int select_piece_to_download(peer_t peer, connection_t remote_peer)
     }
     xbt_assert(nb_interesting_pieces != 0);
     // get a random interesting piece
-    int random_piece_index = RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
+    int random_piece_index = rand() % nb_interesting_pieces;
     int current_index      = 0;
     for (int i = 0; i < FILE_PIECES; i++) {
       if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
@@ -558,7 +547,10 @@ int select_piece_to_download(peer_t peer, connection_t remote_peer)
     }
     xbt_assert(nb_min_pieces != 0 || (is_interested_and_free(peer, remote_peer) == 0));
     // get a random rarest piece
-    int random_rarest_index = RngStream_RandInt(peer->stream, 0, nb_min_pieces - 1);
+    int random_rarest_index = 0;
+    if (nb_min_pieces > 0) {
+      random_rarest_index = rand() % nb_min_pieces;
+    }
     for (int i = 0; i < FILE_PIECES; i++) {
       if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
           peer_is_not_downloading_piece(peer, i)) {
@@ -585,15 +577,15 @@ void update_choked_peers(peer_t peer)
   // update the current round
   peer->round = (peer->round + 1) % 3;
   char* key;
-  char* key_choked          = NULL;
-  connection_t peer_choosed = NULL;
-  connection_t peer_choked  = NULL;
+  char* choked_key         = NULL;
+  connection_t chosen_peer = NULL;
+  connection_t choked_peer = NULL;
   // remove a peer from the list
   xbt_dict_cursor_t cursor = NULL;
   xbt_dict_cursor_first(peer->active_peers, &cursor);
-  if (not xbt_dict_is_empty(peer->active_peers)) {
-    key_choked  = xbt_dict_cursor_get_key(cursor);
-    peer_choked = xbt_dict_cursor_get_data(cursor);
+  if (!xbt_dict_is_empty(peer->active_peers)) {
+    choked_key  = xbt_dict_cursor_get_key(cursor);
+    choked_peer = xbt_dict_cursor_get_data(cursor);
   }
   xbt_dict_cursor_free(&cursor);
 
@@ -606,7 +598,7 @@ void update_choked_peers(peer_t peer)
       if (connection->last_unchoke < unchoke_time && (connection->interested != 0) &&
           (connection->choked_upload != 0)) {
         unchoke_time = connection->last_unchoke;
-        peer_choosed = connection;
+        chosen_peer  = connection;
       }
     }
   } else {
@@ -615,25 +607,27 @@ void update_choked_peers(peer_t peer)
       int j = 0;
       do {
         // We choose a random peer to unchoke.
-        int id_chosen = RngStream_RandInt(peer->stream, 0, xbt_dict_length(peer->peers) - 1);
+        int id_chosen = 0;
+        if (xbt_dict_length(peer->peers) > 0) {
+          id_chosen = rand() % xbt_dict_length(peer->peers);
+        }
         int i         = 0;
         connection_t connection;
         xbt_dict_foreach (peer->peers, cursor, key, connection) {
           if (i == id_chosen) {
-            peer_choosed = connection;
+            chosen_peer = connection;
             break;
           }
           i++;
         }
         xbt_dict_cursor_free(&cursor);
-        if (peer_choosed == NULL)
-          THROWF(unknown_error, 0, "A peer should have be selected at this point");
-        else if ((peer_choosed->interested == 0) || (peer_choosed->choked_upload == 0))
-          peer_choosed = NULL;
+        xbt_assert(chosen_peer != NULL, "A peer should have been selected at this point");
+        if ((chosen_peer->interested == 0) || (chosen_peer->choked_upload == 0))
+          chosen_peer = NULL;
         else
           XBT_DEBUG("Nothing to do, keep going");
         j++;
-      } while (peer_choosed == NULL && j < MAXIMUM_PEERS);
+      } while (chosen_peer == NULL && j < MAXIMUM_PEERS);
     } else {
       // Use the "fastest download" policy.
       connection_t connection;
@@ -641,34 +635,34 @@ void update_choked_peers(peer_t peer)
       xbt_dict_foreach (peer->peers, cursor, key, connection) {
         if (connection->peer_speed > fastest_speed && (connection->choked_upload != 0) &&
             (connection->interested != 0)) {
-          peer_choosed  = connection;
+          chosen_peer   = connection;
           fastest_speed = connection->peer_speed;
         }
       }
     }
   }
 
-  if (peer_choosed != NULL)
-    XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", peer->id, peer_choosed->id,
-              peer_choosed->interested, peer_choosed->choked_upload);
-
-  if (peer_choked != peer_choosed) {
-    if (peer_choked != NULL) {
-      xbt_assert((!peer_choked->choked_upload), "Tries to choked a choked peer");
-      peer_choked->choked_upload = 1;
-      xbt_assert((*((int*)key_choked) == peer_choked->id));
-      update_active_peers_set(peer, peer_choked);
-      XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, peer_choked->id);
-      send_choked(peer, peer_choked->mailbox);
+  if (chosen_peer != NULL)
+    XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", peer->id, chosen_peer->id,
+              chosen_peer->interested, chosen_peer->choked_upload);
+
+  if (choked_peer != chosen_peer) {
+    if (choked_peer != NULL) {
+      xbt_assert((!choked_peer->choked_upload), "Tries to choked a choked peer");
+      choked_peer->choked_upload = 1;
+      xbt_assert((*((int*)choked_key) == choked_peer->id));
+      update_active_peers_set(peer, choked_peer);
+      XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, choked_peer->id);
+      send_choked(peer, choked_peer->mailbox);
     }
-    if (peer_choosed != NULL) {
-      xbt_assert((peer_choosed->choked_upload), "Tries to unchoked an unchoked peer");
-      peer_choosed->choked_upload = 0;
-      xbt_dict_set_ext(peer->active_peers, (char*)&peer_choosed->id, sizeof(int), peer_choosed, NULL);
-      peer_choosed->last_unchoke = MSG_get_clock();
-      XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, peer_choosed->id);
-      update_active_peers_set(peer, peer_choosed);
-      send_unchoked(peer, peer_choosed->mailbox);
+    if (chosen_peer != NULL) {
+      xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
+      chosen_peer->choked_upload = 0;
+      xbt_dict_set_ext(peer->active_peers, (char*)&chosen_peer->id, sizeof(int), chosen_peer);
+      chosen_peer->last_unchoke = MSG_get_clock();
+      XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, chosen_peer->id);
+      update_active_peers_set(peer, chosen_peer);
+      send_unchoked(peer, chosen_peer->mailbox);
     }
   }
 }
@@ -676,7 +670,7 @@ void update_choked_peers(peer_t peer)
 /** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.
  *  @param peer our peer data
  */
-void update_interested_after_receive(peer_t peer)
+void update_interested_after_receive(const s_peer_t* peer)
 {
   char* key;
   xbt_dict_cursor_t cursor;
@@ -709,7 +703,7 @@ void update_bitfield_blocks(peer_t peer, int index, int block_index, int block_l
 }
 
 /** Returns if a peer has completed the download of a piece */
-int piece_complete(peer_t peer, int index)
+int piece_complete(const s_peer_t* peer, int index)
 {
   for (int i = 0; i < PIECES_BLOCKS; i++) {
     if (!(peer->bitfield_blocks & 1ULL << (index * PIECES_BLOCKS + i))) {
@@ -720,7 +714,7 @@ int piece_complete(peer_t peer, int index)
 }
 
 /** Returns the first block that a peer doesn't have in a piece. If the peer has all blocks of the piece, returns -1. */
-int get_first_block(peer_t peer, int piece)
+int get_first_block(const s_peer_t* peer, int piece)
 {
   for (int i = 0; i < PIECES_BLOCKS; i++) {
     if (!(peer->bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i))) {
@@ -731,13 +725,13 @@ int get_first_block(peer_t peer, int piece)
 }
 
 /** Indicates if the remote peer has a piece not stored by the local peer */
-int is_interested(peer_t peer, connection_t remote_peer)
+int is_interested(const s_peer_t* peer, const s_connection_t* remote_peer)
 {
   return remote_peer->bitfield & (peer->bitfield ^ ((1 << FILE_PIECES) - 1));
 }
 
 /** Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer */
-int is_interested_and_free(peer_t peer, connection_t remote_peer)
+int is_interested_and_free(const s_peer_t* peer, const s_connection_t* remote_peer)
 {
   for (int i = 0; i < FILE_PIECES; i++) {
     if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i)) {
@@ -748,7 +742,7 @@ int is_interested_and_free(peer_t peer, connection_t remote_peer)
 }
 
 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
-int partially_downloaded_piece(peer_t peer, connection_t remote_peer)
+int partially_downloaded_piece(const s_peer_t* peer, const s_connection_t* remote_peer)
 {
   for (int i = 0; i < FILE_PIECES; i++) {
     if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i) &&
@@ -762,7 +756,7 @@ int partially_downloaded_piece(peer_t peer, connection_t remote_peer)
  *  @param peer peer
  *  @param remote_peer peer data to the peer we want to send the request
  */
-void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece)
+void send_request_to_peer(const s_peer_t* peer, connection_t remote_peer, int piece)
 {
   remote_peer->current_piece = piece;
   xbt_assert(connection_has_piece(remote_peer, piece));
@@ -783,7 +777,7 @@ void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece)
  *  @param peer peer data
  *  @param mailbox destination mailbox
  */
-void send_interested(peer_t peer, const char* mailbox)
+void send_interested(const s_peer_t* peer, const char* mailbox)
 {
   msg_task_t task = task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox, peer->id,
                                      task_message_size(MESSAGE_INTERESTED));
@@ -795,7 +789,7 @@ void send_interested(peer_t peer, const char* mailbox)
  *  @param peer peer data
  *  @param mailbox destination mailbox
  */
-void send_notinterested(peer_t peer, const char* mailbox)
+void send_notinterested(const s_peer_t* peer, const char* mailbox)
 {
   msg_task_t task = task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox, peer->id,
                                      task_message_size(MESSAGE_NOTINTERESTED));
@@ -806,7 +800,7 @@ void send_notinterested(peer_t peer, const char* mailbox)
 /** @brief Send a handshake message to all the peers the peer has.
  *  @param peer peer data
  */
-void send_handshake_all(peer_t peer)
+void send_handshake_all(const s_peer_t* peer)
 {
   connection_t remote_peer;
   xbt_dict_cursor_t cursor = NULL;
@@ -823,7 +817,7 @@ void send_handshake_all(peer_t peer)
  *  @param peer peer data
  *  @param mailbox mailbox where to we send the message
  */
-void send_handshake(peer_t peer, const char* mailbox)
+void send_handshake(const s_peer_t* peer, const char* mailbox)
 {
   XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox);
   msg_task_t task = task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox, peer->id,
@@ -832,7 +826,7 @@ void send_handshake(peer_t peer, const char* mailbox)
 }
 
 /** Send a "choked" message to a peer. */
-void send_choked(peer_t peer, const char* mailbox)
+void send_choked(const s_peer_t* peer, const char* mailbox)
 {
   XBT_DEBUG("Sending a CHOKE to %s", mailbox);
   msg_task_t task =
@@ -841,7 +835,7 @@ void send_choked(peer_t peer, const char* mailbox)
 }
 
 /** Send a "unchoked" message to a peer */
-void send_unchoked(peer_t peer, const char* mailbox)
+void send_unchoked(const s_peer_t* peer, const char* mailbox)
 {
   XBT_DEBUG("Sending a UNCHOKE to %s", mailbox);
   msg_task_t task =
@@ -850,7 +844,7 @@ void send_unchoked(peer_t peer, const char* mailbox)
 }
 
 /** Send a "HAVE" message to all peers we are connected to */
-void send_have(peer_t peer, int piece)
+void send_have(const s_peer_t* peer, int piece)
 {
   XBT_DEBUG("Sending HAVE message to all my peers");
   connection_t remote_peer;
@@ -866,7 +860,7 @@ void send_have(peer_t peer, int piece)
 /** @brief Send a bitfield message to all the peers the peer has.
  *  @param peer peer data
  */
-void send_bitfield(peer_t peer, const char* mailbox)
+void send_bitfield(const s_peer_t* peer, const char* mailbox)
 {
   XBT_DEBUG("Sending a BITFIELD to %s", mailbox);
   msg_task_t task = task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id, peer->bitfield, FILE_PIECES);
@@ -874,7 +868,7 @@ void send_bitfield(peer_t peer, const char* mailbox)
 }
 
 /** Send a "request" message to a pair, containing a request for a piece */
-void send_request(peer_t peer, const char* mailbox, int piece, int block_index, int block_length)
+void send_request(const s_peer_t* peer, const char* mailbox, int piece, int block_index, int block_length)
 {
   XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", mailbox, piece, block_index, block_length);
   msg_task_t task = task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece, block_index, block_length);
@@ -882,7 +876,7 @@ void send_request(peer_t peer, const char* mailbox, int piece, int block_index,
 }
 
 /** Send a "piece" message to a pair, containing a piece of the file */
-void send_piece(peer_t peer, const char* mailbox, int piece, int block_index, int block_length)
+void send_piece(const s_peer_t* peer, const char* mailbox, int piece, int block_index, int block_length)
 {
   XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index, block_length, mailbox);
   xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");