Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Use Mailbox::get_unique<>(), and save a few delete.
[simgrid.git] / examples / s4u / app-bittorrent / s4u-peer.cpp
index 3225496..fa06d8e 100644 (file)
@@ -1,10 +1,11 @@
-/* Copyright (c) 2012-2017. The SimGrid Team. All rights reserved.          */
+/* Copyright (c) 2012-2020. The SimGrid Team. All rights reserved.          */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
+#include <algorithm>
+#include <array>
 #include <climits>
-#include <xbt/ex.hpp>
 
 #include "s4u-peer.hpp"
 #include "s4u-tracker.hpp"
@@ -15,61 +16,78 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_bt_peer, "Messages specific for the peers");
  * User parameters for transferred file data. For the test, the default values are :
  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
  */
-#define FILE_PIECES 10UL
-#define PIECES_BLOCKS 5UL
-#define BLOCK_SIZE 16384
+constexpr unsigned long FILE_PIECES   = 10UL;
+constexpr unsigned long PIECES_BLOCKS = 5UL;
+constexpr int BLOCK_SIZE              = 16384;
 
 /** Number of blocks asked by each request */
-#define BLOCKS_REQUESTED 2
+constexpr unsigned long BLOCKS_REQUESTED = 2UL;
 
-#define ENABLE_END_GAME_MODE 1
-#define SLEEP_DURATION 1
+constexpr double SLEEP_DURATION     = 1.0;
 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
 
+/** Message sizes
+ * Sizes based on report by A. Legout et al, Understanding BitTorrent: An Experimental Perspective
+ * http://hal.inria.fr/inria-00000156/en
+ */
+constexpr unsigned message_size(MessageType type)
+{
+  constexpr std::array<unsigned, 10> sizes{{/* HANDSHAKE     */ 68,
+                                            /* CHOKE         */ 5,
+                                            /* UNCHOKE       */ 5,
+                                            /* INTERESTED    */ 5,
+                                            /* NOTINTERESTED */ 5,
+                                            /* HAVE          */ 9,
+                                            /* BITFIELD      */ 5,
+                                            /* REQUEST       */ 17,
+                                            /* PIECE         */ 13,
+                                            /* CANCEL        */ 17}};
+  return sizes[static_cast<int>(type)];
+}
+
+constexpr const char* message_name(MessageType type)
+{
+  constexpr std::array<const char*, 10> names{{"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "HAVE",
+                                               "BITFIELD", "REQUEST", "PIECE", "CANCEL"}};
+  return names[static_cast<int>(type)];
+}
+
 Peer::Peer(std::vector<std::string> args)
 {
   // Check arguments
   xbt_assert(args.size() == 3 || args.size() == 4, "Wrong number of arguments");
   try {
     id       = std::stoi(args[1]);
-    mailbox_ = simgrid::s4u::Mailbox::byName(std::to_string(id));
-  } catch (std::invalid_argument& ia) {
-    throw std::invalid_argument(std::string("Invalid ID:") + args[1].c_str());
+    mailbox_ = simgrid::s4u::Mailbox::by_name(std::to_string(id));
+  } catch (const std::invalid_argument&) {
+    throw std::invalid_argument("Invalid ID:" + args[1]);
   }
+  random.set_seed(id);
 
   try {
     deadline = std::stod(args[2]);
-  } catch (std::invalid_argument& ia) {
-    throw std::invalid_argument(std::string("Invalid deadline:") + args[2].c_str());
+  } catch (const std::invalid_argument&) {
+    throw std::invalid_argument("Invalid deadline:" + args[2]);
   }
   xbt_assert(deadline > 0, "Wrong deadline supplied");
 
-  stream = simgrid::s4u::this_actor::getHost()->extension<HostBittorrent>()->getStream();
-
   if (args.size() == 4 && args[3] == "1") {
     bitfield_       = (1U << FILE_PIECES) - 1U;
     bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
   }
-  pieces_count = new short[FILE_PIECES]{0};
+  pieces_count.resize(FILE_PIECES);
 
   XBT_INFO("Hi, I'm joining the network with id %d", id);
 }
 
-Peer::~Peer()
-{
-  for (auto const& peer : connected_peers)
-    delete peer.second;
-  delete[] pieces_count;
-}
-
 /** Peer main function */
 void Peer::operator()()
 {
   // Getting peer data from the tracker.
   if (getPeersFromTracker()) {
     XBT_DEBUG("Got %zu peers from the tracker. Current status is: %s", connected_peers.size(), getStatus().c_str());
-    begin_receive_time = simgrid::s4u::Engine::getClock();
-    mailbox_->setReceiver(simgrid::s4u::Actor::self());
+    begin_receive_time = simgrid::s4u::Engine::get_clock();
+    mailbox_->set_receiver(simgrid::s4u::Actor::self());
     if (hasFinished()) {
       sendHandshakeToAllPeers();
     } else {
@@ -85,32 +103,27 @@ void Peer::operator()()
 
 bool Peer::getPeersFromTracker()
 {
-  simgrid::s4u::MailboxPtr tracker_mailbox = simgrid::s4u::Mailbox::byName(TRACKER_MAILBOX);
+  simgrid::s4u::Mailbox* tracker_mailbox = simgrid::s4u::Mailbox::by_name(TRACKER_MAILBOX);
   // Build the task to send to the tracker
-  TrackerQuery* peer_request = new TrackerQuery(id, mailbox_);
+  auto* peer_request = new TrackerQuery(id, mailbox_);
   try {
     XBT_DEBUG("Sending a peer request to the tracker.");
     tracker_mailbox->put(peer_request, TRACKER_COMM_SIZE, GET_PEERS_TIMEOUT);
-  } catch (xbt_ex& e) {
-    if (e.category == timeout_error) {
-      XBT_DEBUG("Timeout expired when requesting peers to tracker");
-      delete peer_request;
-      return false;
-    }
+  } catch (const simgrid::TimeoutException&) {
+    XBT_DEBUG("Timeout expired when requesting peers to tracker");
+    delete peer_request;
+    return false;
   }
 
   try {
-    TrackerAnswer* answer = static_cast<TrackerAnswer*>(mailbox_->get(GET_PEERS_TIMEOUT));
+    auto answer = mailbox_->get_unique<TrackerAnswer>(GET_PEERS_TIMEOUT);
     // Add the peers the tracker gave us to our peer list.
-    for (auto const& peer_id : *answer->getPeers())
+    for (auto const& peer_id : answer->getPeers())
       if (id != peer_id)
-        connected_peers[peer_id] = new Connection(peer_id);
-    delete answer;
-  } catch (xbt_ex& e) {
-    if (e.category == timeout_error) {
-      XBT_DEBUG("Timeout expired when requesting peers to tracker");
-      return false;
-    }
+        connected_peers.emplace(peer_id, Connection(peer_id));
+  } catch (const simgrid::TimeoutException&) {
+    XBT_DEBUG("Timeout expired when requesting peers to tracker");
+    return false;
   }
   return true;
 }
@@ -118,42 +131,43 @@ bool Peer::getPeersFromTracker()
 void Peer::sendHandshakeToAllPeers()
 {
   for (auto const& kv : connected_peers) {
-    Connection* remote_peer = kv.second;
-    Message* handshake      = new Message(MESSAGE_HANDSHAKE, id, mailbox_);
-    remote_peer->mailbox_->put_init(handshake, MESSAGE_HANDSHAKE_SIZE)->detach();
-    XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer->id);
+    const Connection& remote_peer = kv.second;
+    auto* handshake               = new Message(MessageType::HANDSHAKE, id, mailbox_);
+    remote_peer.mailbox_->put_init(handshake, message_size(MessageType::HANDSHAKE))->detach();
+    XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer.id);
   }
 }
 
-void Peer::sendMessage(simgrid::s4u::MailboxPtr mailbox, e_message_type type, uint64_t size)
+void Peer::sendMessage(simgrid::s4u::Mailbox* mailbox, MessageType type, uint64_t size)
 {
-  const char* type_names[6] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "CANCEL"};
-  XBT_DEBUG("Sending %s to %s", type_names[type], mailbox->getCname());
+  XBT_DEBUG("Sending %s to %s", message_name(type), mailbox->get_cname());
   mailbox->put_init(new Message(type, id, bitfield_, mailbox_), size)->detach();
 }
 
-void Peer::sendBitfield(simgrid::s4u::MailboxPtr mailbox)
+void Peer::sendBitfield(simgrid::s4u::Mailbox* mailbox)
 {
-  XBT_DEBUG("Sending a BITFIELD to %s", mailbox->getCname());
+  XBT_DEBUG("Sending a BITFIELD to %s", mailbox->get_cname());
   mailbox
-      ->put_init(new Message(MESSAGE_BITFIELD, id, bitfield_, mailbox_),
-                 MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES))
+      ->put_init(new Message(MessageType::BITFIELD, id, bitfield_, mailbox_),
+                 message_size(MessageType::BITFIELD) + BITS_TO_BYTES(FILE_PIECES))
       ->detach();
 }
 
-void Peer::sendPiece(simgrid::s4u::MailboxPtr mailbox, unsigned int piece, int block_index, int block_length)
+void Peer::sendPiece(simgrid::s4u::Mailbox* mailbox, unsigned int piece, int block_index, int block_length)
 {
   xbt_assert(not hasNotPiece(piece), "Tried to send a unavailable piece.");
-  XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->getCname());
-  mailbox->put_init(new Message(MESSAGE_PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)->detach();
+  XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->get_cname());
+  mailbox->put_init(new Message(MessageType::PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)
+      ->detach();
 }
 
 void Peer::sendHaveToAllPeers(unsigned int piece)
 {
   XBT_DEBUG("Sending HAVE message to all my peers");
   for (auto const& kv : connected_peers) {
-    Connection* remote_peer = kv.second;
-    remote_peer->mailbox_->put_init(new Message(MESSAGE_HAVE, id, mailbox_, piece), MESSAGE_HAVE_SIZE)->detach();
+    const Connection& remote_peer = kv.second;
+    remote_peer.mailbox_->put_init(new Message(MessageType::HAVE, id, mailbox_, piece), message_size(MessageType::HAVE))
+        ->detach();
   }
 }
 
@@ -163,35 +177,36 @@ void Peer::sendRequestTo(Connection* remote_peer, unsigned int piece)
   xbt_assert(remote_peer->hasPiece(piece));
   int block_index = getFirstMissingBlockFrom(piece);
   if (block_index != -1) {
-    int block_length = MIN(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
-    XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", remote_peer->mailbox_->getCname(), piece, block_index,
+    int block_length = static_cast<int>(std::min(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index));
+    XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", remote_peer->mailbox_->get_cname(), piece, block_index,
               block_length);
     remote_peer->mailbox_
-        ->put_init(new Message(MESSAGE_REQUEST, id, mailbox_, piece, block_index, block_length), MESSAGE_REQUEST_SIZE)
+        ->put_init(new Message(MessageType::REQUEST, id, mailbox_, piece, block_index, block_length),
+                   message_size(MessageType::REQUEST))
         ->detach();
   }
 }
 
-std::string Peer::getStatus()
+std::string Peer::getStatus() const
 {
-  std::string res = std::string("");
-  for (int i = FILE_PIECES - 1; i >= 0; i--)
-    res = std::string((bitfield_ & (1U << i)) ? "1" : "0") + res;
+  std::string res;
+  for (unsigned i = 0; i < FILE_PIECES; i++)
+    res += (bitfield_ & (1U << i)) ? '1' : '0';
   return res;
 }
 
-bool Peer::hasFinished()
+bool Peer::hasFinished() const
 {
   return bitfield_ == (1U << FILE_PIECES) - 1U;
 }
 
 /** Indicates if the remote peer has a piece not stored by the local peer */
-bool Peer::isInterestedBy(Connection* remote_peer)
+bool Peer::isInterestedBy(const Connection* remote_peer) const
 {
   return remote_peer->bitfield & (bitfield_ ^ ((1 << FILE_PIECES) - 1));
 }
 
-bool Peer::isInterestedByFree(Connection* remote_peer)
+bool Peer::isInterestedByFree(const Connection* remote_peer) const
 {
   for (unsigned int i = 0; i < FILE_PIECES; i++)
     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
@@ -206,7 +221,7 @@ void Peer::updatePiecesCountFromBitfield(unsigned int bitfield)
       pieces_count[i]++;
 }
 
-unsigned int Peer::countPieces(unsigned int bitfield)
+unsigned int Peer::countPieces(unsigned int bitfield) const
 {
   unsigned int count = 0U;
   unsigned int n     = bitfield;
@@ -217,37 +232,35 @@ unsigned int Peer::countPieces(unsigned int bitfield)
   return count;
 }
 
-int Peer::nbInterestedPeers()
+int Peer::nbInterestedPeers() const
 {
   int nb = 0;
   for (auto const& kv : connected_peers)
-    if (kv.second->interested)
+    if (kv.second.interested)
       nb++;
   return nb;
 }
 
 void Peer::leech()
 {
-  double next_choked_update = simgrid::s4u::Engine::getClock() + UPDATE_CHOKED_INTERVAL;
+  double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
   XBT_DEBUG("Start downloading.");
 
   /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
   sendHandshakeToAllPeers();
-  XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->getCname());
+  XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->get_cname());
 
-  void* data = nullptr;
-  while (simgrid::s4u::Engine::getClock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
+  while (simgrid::s4u::Engine::get_clock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
     if (comm_received == nullptr) {
-      comm_received = mailbox_->get_async(&data);
+      comm_received = mailbox_->get_async<Message>(&message);
     }
     if (comm_received->test()) {
-      message = static_cast<Message*>(data);
       handleMessage();
       delete message;
       comm_received = nullptr;
     } else {
       // We don't execute the choke algorithm if we don't already have a piece
-      if (simgrid::s4u::Engine::getClock() >= next_choked_update && countPieces(bitfield_) > 0) {
+      if (simgrid::s4u::Engine::get_clock() >= next_choked_update && countPieces(bitfield_) > 0) {
         updateChokedPeers();
         next_choked_update += UPDATE_CHOKED_INTERVAL;
       } else {
@@ -261,21 +274,19 @@ void Peer::leech()
 
 void Peer::seed()
 {
-  double next_choked_update = simgrid::s4u::Engine::getClock() + UPDATE_CHOKED_INTERVAL;
+  double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
   XBT_DEBUG("Start seeding.");
   // start the main seed loop
-  void* data = nullptr;
-  while (simgrid::s4u::Engine::getClock() < deadline) {
+  while (simgrid::s4u::Engine::get_clock() < deadline) {
     if (comm_received == nullptr) {
-      comm_received = mailbox_->get_async(&data);
+      comm_received = mailbox_->get_async<Message>(&message);
     }
     if (comm_received->test()) {
-      message = static_cast<Message*>(data);
       handleMessage();
       delete message;
       comm_received = nullptr;
     } else {
-      if (simgrid::s4u::Engine::getClock() >= next_choked_update) {
+      if (simgrid::s4u::Engine::get_clock() >= next_choked_update) {
         updateChokedPeers();
         // TODO: Change the choked peer algorithm when seeding.
         next_choked_update += UPDATE_CHOKED_INTERVAL;
@@ -296,28 +307,25 @@ void Peer::updateActivePeersSet(Connection* remote_peer)
 
 void Peer::handleMessage()
 {
-  const char* type_names[10] = {"HANDSHAKE", "CHOKE",    "UNCHOKE", "INTERESTED", "NOTINTERESTED",
-                                "HAVE",      "BITFIELD", "REQUEST", "PIECE",      "CANCEL"};
-
-  XBT_DEBUG("Received a %s message from %s", type_names[message->type], message->return_mailbox->getCname());
+  XBT_DEBUG("Received a %s message from %s", message_name(message->type), message->return_mailbox->get_cname());
 
   auto known_peer         = connected_peers.find(message->peer_id);
-  Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : known_peer->second;
-  xbt_assert(remote_peer != nullptr || message->type == MESSAGE_HANDSHAKE,
+  Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : &known_peer->second;
+  xbt_assert(remote_peer != nullptr || message->type == MessageType::HANDSHAKE,
              "The impossible did happened: A not-in-our-list peer sent us a message.");
 
   switch (message->type) {
-    case MESSAGE_HANDSHAKE:
+    case MessageType::HANDSHAKE:
       // Check if the peer is in our connection list.
       if (remote_peer == nullptr) {
         XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id);
-        connected_peers[message->peer_id] = new Connection(message->peer_id);
-        sendMessage(message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE);
+        connected_peers.emplace(message->peer_id, Connection(message->peer_id));
+        sendMessage(message->return_mailbox, MessageType::HANDSHAKE, message_size(MessageType::HANDSHAKE));
       }
       // Send our bitfield to the peer
       sendBitfield(message->return_mailbox);
       break;
-    case MESSAGE_BITFIELD:
+    case MessageType::BITFIELD:
       // Update the pieces list
       updatePiecesCountFromBitfield(message->bitfield);
       // Store the bitfield
@@ -325,32 +333,32 @@ void Peer::handleMessage()
       xbt_assert(not remote_peer->am_interested, "Should not be interested at first");
       if (isInterestedBy(remote_peer)) {
         remote_peer->am_interested = true;
-        sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
+        sendMessage(message->return_mailbox, MessageType::INTERESTED, message_size(MessageType::INTERESTED));
       }
       break;
-    case MESSAGE_INTERESTED:
+    case MessageType::INTERESTED:
       // Update the interested state of the peer.
       remote_peer->interested = true;
       updateActivePeersSet(remote_peer);
       break;
-    case MESSAGE_NOTINTERESTED:
+    case MessageType::NOTINTERESTED:
       remote_peer->interested = false;
       updateActivePeersSet(remote_peer);
       break;
-    case MESSAGE_UNCHOKE:
+    case MessageType::UNCHOKE:
       xbt_assert(remote_peer->choked_download);
       remote_peer->choked_download = false;
       // Send requests to the peer, since it has unchoked us
       if (remote_peer->am_interested)
         requestNewPieceTo(remote_peer);
       break;
-    case MESSAGE_CHOKE:
+    case MessageType::CHOKE:
       xbt_assert(not remote_peer->choked_download);
       remote_peer->choked_download = true;
       if (remote_peer->current_piece != -1)
         removeCurrentPiece(remote_peer, remote_peer->current_piece);
       break;
-    case MESSAGE_HAVE:
+    case MessageType::HAVE:
       XBT_DEBUG("\t for piece %d", message->piece);
       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
                  "Wrong HAVE message received");
@@ -359,12 +367,12 @@ void Peer::handleMessage()
       // If the piece is in our pieces, we tell the peer that we are interested.
       if (not remote_peer->am_interested && hasNotPiece(message->piece)) {
         remote_peer->am_interested = true;
-        sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
+        sendMessage(message->return_mailbox, MessageType::INTERESTED, message_size(MessageType::INTERESTED));
         if (not remote_peer->choked_download)
           requestNewPieceTo(remote_peer);
       }
       break;
-    case MESSAGE_REQUEST:
+    case MessageType::REQUEST:
       xbt_assert(remote_peer->interested);
       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
                  "Wrong HAVE message received");
@@ -378,14 +386,10 @@ void Peer::handleMessage()
         XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
       }
       break;
-    case MESSAGE_PIECE:
+    case MessageType::PIECE:
       XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
                 message->block_index + message->block_length);
       xbt_assert(not remote_peer->choked_download);
-      xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE,
-                 "Can't received a piece if I'm not interested without end-game mode!"
-                 "piece (%d) bitfield (%u) remote bitfield (%u)",
-                 message->piece, bitfield_, remote_peer->bitfield);
       xbt_assert(not remote_peer->choked_download, "Can't received a piece if I'm choked !");
       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
                  "Wrong piece received");
@@ -407,20 +411,19 @@ void Peer::handleMessage()
         }
       } else {
         XBT_DEBUG("However, we already have it");
-        xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
         requestNewPieceTo(remote_peer);
       }
       break;
-    case MESSAGE_CANCEL:
+    case MessageType::CANCEL:
       break;
     default:
       THROW_IMPOSSIBLE;
   }
   // Update the peer speed.
   if (remote_peer) {
-    remote_peer->addSpeedValue(1.0 / (simgrid::s4u::Engine::getClock() - begin_receive_time));
+    remote_peer->addSpeedValue(1.0 / (simgrid::s4u::Engine::get_clock() - begin_receive_time));
   }
-  begin_receive_time = simgrid::s4u::Engine::getClock();
+  begin_receive_time = simgrid::s4u::Engine::get_clock();
 }
 
 /** Selects the appropriate piece to download and requests it to the remote_peer */
@@ -448,7 +451,7 @@ void Peer::removeCurrentPiece(Connection* remote_peer, unsigned int current_piec
  * @param remote_peer: information about the connection
  * @return the piece to download if possible. -1 otherwise
  */
-int Peer::selectPieceToDownload(Connection* remote_peer)
+int Peer::selectPieceToDownload(const Connection* remote_peer)
 {
   int piece = partiallyDownloadedPiece(remote_peer);
   // strict priority policy
@@ -457,21 +460,18 @@ int Peer::selectPieceToDownload(Connection* remote_peer)
 
   // end game mode
   if (countPieces(current_pieces) >= (FILE_PIECES - countPieces(bitfield_)) && isInterestedBy(remote_peer)) {
-#if ENABLE_END_GAME_MODE == 0
-    return -1;
-#endif
     int nb_interesting_pieces = 0;
     // compute the number of interesting pieces
     for (unsigned int i = 0; i < FILE_PIECES; i++)
-      if (hasNotPiece(i) && remote_peer->hasPiece(i))
+      if (remotePeerHasMissingPiece(remote_peer, i))
         nb_interesting_pieces++;
 
     xbt_assert(nb_interesting_pieces != 0);
     // get a random interesting piece
-    int random_piece_index = RngStream_RandInt(stream, 0, nb_interesting_pieces - 1);
+    int random_piece_index = random.uniform_int(0, nb_interesting_pieces - 1);
     int current_index      = 0;
     for (unsigned int i = 0; i < FILE_PIECES; i++) {
-      if (hasNotPiece(i) && remote_peer->hasPiece(i)) {
+      if (remotePeerHasMissingPiece(remote_peer, i)) {
         if (random_piece_index == current_index) {
           piece = i;
           break;
@@ -487,14 +487,14 @@ int Peer::selectPieceToDownload(Connection* remote_peer)
     int nb_interesting_pieces = 0;
     // compute the number of interesting pieces
     for (unsigned int i = 0; i < FILE_PIECES; i++)
-      if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
+      if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i))
         nb_interesting_pieces++;
     xbt_assert(nb_interesting_pieces != 0);
     // get a random interesting piece
-    int random_piece_index = RngStream_RandInt(stream, 0, nb_interesting_pieces - 1);
+    int random_piece_index = random.uniform_int(0, nb_interesting_pieces - 1);
     int current_index      = 0;
     for (unsigned int i = 0; i < FILE_PIECES; i++) {
-      if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
+      if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i)) {
         if (random_piece_index == current_index) {
           piece = i;
           break;
@@ -510,21 +510,24 @@ int Peer::selectPieceToDownload(Connection* remote_peer)
     int current_index = 0;
     // compute the smallest number of copies of available pieces
     for (unsigned int i = 0; i < FILE_PIECES; i++) {
-      if (pieces_count[i] < min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
+      if (pieces_count[i] < min && remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i))
         min = pieces_count[i];
     }
 
     xbt_assert(min != SHRT_MAX || not isInterestedByFree(remote_peer));
     // compute the number of rarest pieces
     for (unsigned int i = 0; i < FILE_PIECES; i++)
-      if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
+      if (pieces_count[i] == min && remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i))
         nb_min_pieces++;
 
     xbt_assert(nb_min_pieces != 0 || not isInterestedByFree(remote_peer));
     // get a random rarest piece
-    int random_rarest_index = RngStream_RandInt(stream, 0, nb_min_pieces - 1);
+    int random_rarest_index = 0;
+    if (nb_min_pieces > 0) {
+      random_rarest_index = random.uniform_int(0, nb_min_pieces - 1);
+    }
     for (unsigned int i = 0; i < FILE_PIECES; i++)
-      if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
+      if (pieces_count[i] == min && remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i)) {
         if (random_rarest_index == current_index) {
           piece = i;
           break;
@@ -556,13 +559,12 @@ void Peer::updateChokedPeers()
 
   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
   if (hasFinished()) {
-    Connection* remote_peer;
-    double unchoke_time = simgrid::s4u::Engine::getClock() + 1;
-    for (auto const& kv : connected_peers) {
-      remote_peer = kv.second;
-      if (remote_peer->last_unchoke < unchoke_time && remote_peer->interested && remote_peer->choked_upload) {
-        unchoke_time = remote_peer->last_unchoke;
-        chosen_peer  = remote_peer;
+    double unchoke_time = simgrid::s4u::Engine::get_clock() + 1;
+    for (auto& kv : connected_peers) {
+      Connection& remote_peer = kv.second;
+      if (remote_peer.last_unchoke < unchoke_time && remote_peer.interested && remote_peer.choked_upload) {
+        unchoke_time = remote_peer.last_unchoke;
+        chosen_peer  = &remote_peer;
       }
     }
   } else {
@@ -571,12 +573,10 @@ void Peer::updateChokedPeers()
       int j = 0;
       do {
         // We choose a random peer to unchoke.
-        std::unordered_map<int, Connection*>::iterator chosen_peer_it = connected_peers.begin();
-        std::advance(chosen_peer_it, RngStream_RandInt(stream, 0, connected_peers.size() - 1));
-        chosen_peer = chosen_peer_it->second;
-        if (chosen_peer == nullptr)
-          THROWF(unknown_error, 0, "A peer should have be selected at this point");
-        else if (not chosen_peer->interested || not chosen_peer->choked_upload)
+        auto chosen_peer_it = connected_peers.begin();
+        std::advance(chosen_peer_it, random.uniform_int(0, static_cast<int>(connected_peers.size() - 1)));
+        chosen_peer = &chosen_peer_it->second;
+        if (not chosen_peer->interested || not chosen_peer->choked_upload)
           chosen_peer = nullptr;
         else
           XBT_DEBUG("Nothing to do, keep going");
@@ -585,11 +585,11 @@ void Peer::updateChokedPeers()
     } else {
       // Use the "fastest download" policy.
       double fastest_speed = 0.0;
-      for (auto const& kv : connected_peers) {
-        Connection* remote_peer = kv.second;
-        if (remote_peer->peer_speed > fastest_speed && remote_peer->choked_upload && remote_peer->interested) {
-          chosen_peer   = remote_peer;
-          fastest_speed = remote_peer->peer_speed;
+      for (auto& kv : connected_peers) {
+        Connection& remote_peer = kv.second;
+        if (remote_peer.peer_speed > fastest_speed && remote_peer.choked_upload && remote_peer.interested) {
+          fastest_speed = remote_peer.peer_speed;
+          chosen_peer   = &remote_peer;
         }
       }
     }
@@ -605,16 +605,16 @@ void Peer::updateChokedPeers()
       choked_peer->choked_upload = true;
       updateActivePeersSet(choked_peer);
       XBT_DEBUG("(%d) Sending a CHOKE to %d", id, choked_peer->id);
-      sendMessage(choked_peer->mailbox_, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE);
+      sendMessage(choked_peer->mailbox_, MessageType::CHOKE, message_size(MessageType::CHOKE));
     }
     if (chosen_peer != nullptr) {
       xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
       chosen_peer->choked_upload = false;
       active_peers.insert(chosen_peer);
-      chosen_peer->last_unchoke = simgrid::s4u::Engine::getClock();
+      chosen_peer->last_unchoke = simgrid::s4u::Engine::get_clock();
       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id);
       updateActivePeersSet(chosen_peer);
-      sendMessage(chosen_peer->mailbox_, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE);
+      sendMessage(chosen_peer->mailbox_, MessageType::UNCHOKE, message_size(MessageType::UNCHOKE));
     }
   }
 }
@@ -622,20 +622,20 @@ void Peer::updateChokedPeers()
 /** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.*/
 void Peer::updateInterestedAfterReceive()
 {
-  for (auto const& kv : connected_peers) {
-    Connection* remote_peer = kv.second;
-    if (remote_peer->am_interested) {
+  for (auto& kv : connected_peers) {
+    Connection& remote_peer = kv.second;
+    if (remote_peer.am_interested) {
       bool interested = false;
       // Check if the peer still has a piece we want.
       for (unsigned int i = 0; i < FILE_PIECES; i++)
-        if (hasNotPiece(i) && remote_peer->hasPiece(i)) {
+        if (remotePeerHasMissingPiece(&remote_peer, i)) {
           interested = true;
           break;
         }
 
       if (not interested) { // no more piece to download from connection
-        remote_peer->am_interested = false;
-        sendMessage(remote_peer->mailbox_, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE);
+        remote_peer.am_interested = false;
+        sendMessage(remote_peer.mailbox_, MessageType::NOTINTERESTED, message_size(MessageType::NOTINTERESTED));
       }
     }
   }
@@ -650,7 +650,7 @@ void Peer::updateBitfieldBlocks(int piece, int block_index, int block_length)
     bitfield_blocks |= (1ULL << static_cast<unsigned int>(piece * PIECES_BLOCKS + i));
 }
 
-bool Peer::hasCompletedPiece(unsigned int piece)
+bool Peer::hasCompletedPiece(unsigned int piece) const
 {
   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
@@ -658,7 +658,7 @@ bool Peer::hasCompletedPiece(unsigned int piece)
   return true;
 }
 
-int Peer::getFirstMissingBlockFrom(int piece)
+int Peer::getFirstMissingBlockFrom(int piece) const
 {
   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
@@ -667,10 +667,10 @@ int Peer::getFirstMissingBlockFrom(int piece)
 }
 
 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
-int Peer::partiallyDownloadedPiece(Connection* remote_peer)
+int Peer::partiallyDownloadedPiece(const Connection* remote_peer) const
 {
   for (unsigned int i = 0; i < FILE_PIECES; i++)
-    if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i) && getFirstMissingBlockFrom(i) > 0)
+    if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i) && getFirstMissingBlockFrom(i) > 0)
       return i;
   return -1;
 }