* under the terms of the license (GNU LGPL) which comes with this package. */
#include <algorithm>
+#include <array>
#include <climits>
#include "s4u-peer.hpp"
constexpr double SLEEP_DURATION = 1.0;
#define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
+/** Message sizes
+ * Sizes based on report by A. Legout et al, Understanding BitTorrent: An Experimental Perspective
+ * http://hal.inria.fr/inria-00000156/en
+ */
+constexpr unsigned message_size(MessageType type)
+{
+ constexpr std::array<unsigned, 10> sizes{{/* HANDSHAKE */ 68,
+ /* CHOKE */ 5,
+ /* UNCHOKE */ 5,
+ /* INTERESTED */ 5,
+ /* NOTINTERESTED */ 5,
+ /* HAVE */ 9,
+ /* BITFIELD */ 5,
+ /* REQUEST */ 17,
+ /* PIECE */ 13,
+ /* CANCEL */ 17}};
+ return sizes[static_cast<int>(type)];
+}
+
+constexpr const char* message_name(MessageType type)
+{
+ constexpr std::array<const char*, 10> names{{"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "HAVE",
+ "BITFIELD", "REQUEST", "PIECE", "CANCEL"}};
+ return names[static_cast<int>(type)];
+}
+
Peer::Peer(std::vector<std::string> args)
{
// Check arguments
} catch (const std::invalid_argument&) {
throw std::invalid_argument("Invalid ID:" + args[1]);
}
+ random.set_seed(id);
try {
deadline = std::stod(args[2]);
{
simgrid::s4u::Mailbox* tracker_mailbox = simgrid::s4u::Mailbox::by_name(TRACKER_MAILBOX);
// Build the task to send to the tracker
- TrackerQuery* peer_request = new TrackerQuery(id, mailbox_);
+ auto* peer_request = new TrackerQuery(id, mailbox_);
try {
XBT_DEBUG("Sending a peer request to the tracker.");
tracker_mailbox->put(peer_request, TRACKER_COMM_SIZE, GET_PEERS_TIMEOUT);
}
try {
- TrackerAnswer* answer = static_cast<TrackerAnswer*>(mailbox_->get(GET_PEERS_TIMEOUT));
+ auto answer = mailbox_->get_unique<TrackerAnswer>(GET_PEERS_TIMEOUT);
// Add the peers the tracker gave us to our peer list.
for (auto const& peer_id : answer->getPeers())
if (id != peer_id)
connected_peers.emplace(peer_id, Connection(peer_id));
- delete answer;
} catch (const simgrid::TimeoutException&) {
XBT_DEBUG("Timeout expired when requesting peers to tracker");
return false;
{
for (auto const& kv : connected_peers) {
const Connection& remote_peer = kv.second;
- Message* handshake = new Message(MESSAGE_HANDSHAKE, id, mailbox_);
- remote_peer.mailbox_->put_init(handshake, MESSAGE_HANDSHAKE_SIZE)->detach();
+ auto* handshake = new Message(MessageType::HANDSHAKE, id, mailbox_);
+ remote_peer.mailbox_->put_init(handshake, message_size(MessageType::HANDSHAKE))->detach();
XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer.id);
}
}
-void Peer::sendMessage(simgrid::s4u::Mailbox* mailbox, e_message_type type, uint64_t size)
+void Peer::sendMessage(simgrid::s4u::Mailbox* mailbox, MessageType type, uint64_t size)
{
- const char* type_names[6] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "CANCEL"};
- XBT_DEBUG("Sending %s to %s", type_names[type], mailbox->get_cname());
+ XBT_DEBUG("Sending %s to %s", message_name(type), mailbox->get_cname());
mailbox->put_init(new Message(type, id, bitfield_, mailbox_), size)->detach();
}
{
XBT_DEBUG("Sending a BITFIELD to %s", mailbox->get_cname());
mailbox
- ->put_init(new Message(MESSAGE_BITFIELD, id, bitfield_, mailbox_),
- MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES))
+ ->put_init(new Message(MessageType::BITFIELD, id, bitfield_, mailbox_),
+ message_size(MessageType::BITFIELD) + BITS_TO_BYTES(FILE_PIECES))
->detach();
}
{
xbt_assert(not hasNotPiece(piece), "Tried to send a unavailable piece.");
XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->get_cname());
- mailbox->put_init(new Message(MESSAGE_PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)->detach();
+ mailbox->put_init(new Message(MessageType::PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)
+ ->detach();
}
void Peer::sendHaveToAllPeers(unsigned int piece)
XBT_DEBUG("Sending HAVE message to all my peers");
for (auto const& kv : connected_peers) {
const Connection& remote_peer = kv.second;
- remote_peer.mailbox_->put_init(new Message(MESSAGE_HAVE, id, mailbox_, piece), MESSAGE_HAVE_SIZE)->detach();
+ remote_peer.mailbox_->put_init(new Message(MessageType::HAVE, id, mailbox_, piece), message_size(MessageType::HAVE))
+ ->detach();
}
}
xbt_assert(remote_peer->hasPiece(piece));
int block_index = getFirstMissingBlockFrom(piece);
if (block_index != -1) {
- int block_length = std::min(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
+ int block_length = static_cast<int>(std::min(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index));
XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", remote_peer->mailbox_->get_cname(), piece, block_index,
block_length);
remote_peer->mailbox_
- ->put_init(new Message(MESSAGE_REQUEST, id, mailbox_, piece, block_index, block_length), MESSAGE_REQUEST_SIZE)
+ ->put_init(new Message(MessageType::REQUEST, id, mailbox_, piece, block_index, block_length),
+ message_size(MessageType::REQUEST))
->detach();
}
}
-std::string Peer::getStatus()
+std::string Peer::getStatus() const
{
std::string res;
for (unsigned i = 0; i < FILE_PIECES; i++)
return res;
}
-bool Peer::hasFinished()
+bool Peer::hasFinished() const
{
return bitfield_ == (1U << FILE_PIECES) - 1U;
}
pieces_count[i]++;
}
-unsigned int Peer::countPieces(unsigned int bitfield)
+unsigned int Peer::countPieces(unsigned int bitfield) const
{
unsigned int count = 0U;
unsigned int n = bitfield;
return count;
}
-int Peer::nbInterestedPeers()
+int Peer::nbInterestedPeers() const
{
int nb = 0;
for (auto const& kv : connected_peers)
sendHandshakeToAllPeers();
XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->get_cname());
- void* data = nullptr;
while (simgrid::s4u::Engine::get_clock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
if (comm_received == nullptr) {
- comm_received = mailbox_->get_async(&data);
+ comm_received = mailbox_->get_async<Message>(&message);
}
if (comm_received->test()) {
- message = static_cast<Message*>(data);
handleMessage();
delete message;
comm_received = nullptr;
double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
XBT_DEBUG("Start seeding.");
// start the main seed loop
- void* data = nullptr;
while (simgrid::s4u::Engine::get_clock() < deadline) {
if (comm_received == nullptr) {
- comm_received = mailbox_->get_async(&data);
+ comm_received = mailbox_->get_async<Message>(&message);
}
if (comm_received->test()) {
- message = static_cast<Message*>(data);
handleMessage();
delete message;
comm_received = nullptr;
void Peer::handleMessage()
{
- const char* type_names[10] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED",
- "HAVE", "BITFIELD", "REQUEST", "PIECE", "CANCEL"};
-
- XBT_DEBUG("Received a %s message from %s", type_names[message->type], message->return_mailbox->get_cname());
+ XBT_DEBUG("Received a %s message from %s", message_name(message->type), message->return_mailbox->get_cname());
auto known_peer = connected_peers.find(message->peer_id);
Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : &known_peer->second;
- xbt_assert(remote_peer != nullptr || message->type == MESSAGE_HANDSHAKE,
+ xbt_assert(remote_peer != nullptr || message->type == MessageType::HANDSHAKE,
"The impossible did happened: A not-in-our-list peer sent us a message.");
switch (message->type) {
- case MESSAGE_HANDSHAKE:
+ case MessageType::HANDSHAKE:
// Check if the peer is in our connection list.
if (remote_peer == nullptr) {
XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id);
connected_peers.emplace(message->peer_id, Connection(message->peer_id));
- sendMessage(message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE);
+ sendMessage(message->return_mailbox, MessageType::HANDSHAKE, message_size(MessageType::HANDSHAKE));
}
// Send our bitfield to the peer
sendBitfield(message->return_mailbox);
break;
- case MESSAGE_BITFIELD:
+ case MessageType::BITFIELD:
// Update the pieces list
updatePiecesCountFromBitfield(message->bitfield);
// Store the bitfield
xbt_assert(not remote_peer->am_interested, "Should not be interested at first");
if (isInterestedBy(remote_peer)) {
remote_peer->am_interested = true;
- sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
+ sendMessage(message->return_mailbox, MessageType::INTERESTED, message_size(MessageType::INTERESTED));
}
break;
- case MESSAGE_INTERESTED:
+ case MessageType::INTERESTED:
// Update the interested state of the peer.
remote_peer->interested = true;
updateActivePeersSet(remote_peer);
break;
- case MESSAGE_NOTINTERESTED:
+ case MessageType::NOTINTERESTED:
remote_peer->interested = false;
updateActivePeersSet(remote_peer);
break;
- case MESSAGE_UNCHOKE:
+ case MessageType::UNCHOKE:
xbt_assert(remote_peer->choked_download);
remote_peer->choked_download = false;
// Send requests to the peer, since it has unchoked us
if (remote_peer->am_interested)
requestNewPieceTo(remote_peer);
break;
- case MESSAGE_CHOKE:
+ case MessageType::CHOKE:
xbt_assert(not remote_peer->choked_download);
remote_peer->choked_download = true;
if (remote_peer->current_piece != -1)
removeCurrentPiece(remote_peer, remote_peer->current_piece);
break;
- case MESSAGE_HAVE:
+ case MessageType::HAVE:
XBT_DEBUG("\t for piece %d", message->piece);
xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
"Wrong HAVE message received");
// If the piece is in our pieces, we tell the peer that we are interested.
if (not remote_peer->am_interested && hasNotPiece(message->piece)) {
remote_peer->am_interested = true;
- sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
+ sendMessage(message->return_mailbox, MessageType::INTERESTED, message_size(MessageType::INTERESTED));
if (not remote_peer->choked_download)
requestNewPieceTo(remote_peer);
}
break;
- case MESSAGE_REQUEST:
+ case MessageType::REQUEST:
xbt_assert(remote_peer->interested);
xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
"Wrong HAVE message received");
XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
}
break;
- case MESSAGE_PIECE:
+ case MessageType::PIECE:
XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
message->block_index + message->block_length);
xbt_assert(not remote_peer->choked_download);
requestNewPieceTo(remote_peer);
}
break;
- case MESSAGE_CANCEL:
+ case MessageType::CANCEL:
break;
default:
THROW_IMPOSSIBLE;
xbt_assert(nb_interesting_pieces != 0);
// get a random interesting piece
- int random_piece_index = simgrid::xbt::random::uniform_int(0, nb_interesting_pieces - 1);
+ int random_piece_index = random.uniform_int(0, nb_interesting_pieces - 1);
int current_index = 0;
for (unsigned int i = 0; i < FILE_PIECES; i++) {
if (remotePeerHasMissingPiece(remote_peer, i)) {
nb_interesting_pieces++;
xbt_assert(nb_interesting_pieces != 0);
// get a random interesting piece
- int random_piece_index = simgrid::xbt::random::uniform_int(0, nb_interesting_pieces - 1);
+ int random_piece_index = random.uniform_int(0, nb_interesting_pieces - 1);
int current_index = 0;
for (unsigned int i = 0; i < FILE_PIECES; i++) {
if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i)) {
// get a random rarest piece
int random_rarest_index = 0;
if (nb_min_pieces > 0) {
- random_rarest_index = simgrid::xbt::random::uniform_int(0, nb_min_pieces - 1);
+ random_rarest_index = random.uniform_int(0, nb_min_pieces - 1);
}
for (unsigned int i = 0; i < FILE_PIECES; i++)
if (pieces_count[i] == min && remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i)) {
int j = 0;
do {
// We choose a random peer to unchoke.
- std::unordered_map<int, Connection>::iterator chosen_peer_it = connected_peers.begin();
- std::advance(chosen_peer_it, simgrid::xbt::random::uniform_int(0, connected_peers.size() - 1));
+ auto chosen_peer_it = connected_peers.begin();
+ std::advance(chosen_peer_it, random.uniform_int(0, static_cast<int>(connected_peers.size() - 1)));
chosen_peer = &chosen_peer_it->second;
if (not chosen_peer->interested || not chosen_peer->choked_upload)
chosen_peer = nullptr;
choked_peer->choked_upload = true;
updateActivePeersSet(choked_peer);
XBT_DEBUG("(%d) Sending a CHOKE to %d", id, choked_peer->id);
- sendMessage(choked_peer->mailbox_, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE);
+ sendMessage(choked_peer->mailbox_, MessageType::CHOKE, message_size(MessageType::CHOKE));
}
if (chosen_peer != nullptr) {
xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
chosen_peer->last_unchoke = simgrid::s4u::Engine::get_clock();
XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id);
updateActivePeersSet(chosen_peer);
- sendMessage(chosen_peer->mailbox_, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE);
+ sendMessage(chosen_peer->mailbox_, MessageType::UNCHOKE, message_size(MessageType::UNCHOKE));
}
}
}
if (not interested) { // no more piece to download from connection
remote_peer.am_interested = false;
- sendMessage(remote_peer.mailbox_, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE);
+ sendMessage(remote_peer.mailbox_, MessageType::NOTINTERESTED, message_size(MessageType::NOTINTERESTED));
}
}
}
bitfield_blocks |= (1ULL << static_cast<unsigned int>(piece * PIECES_BLOCKS + i));
}
-bool Peer::hasCompletedPiece(unsigned int piece)
+bool Peer::hasCompletedPiece(unsigned int piece) const
{
for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
return true;
}
-int Peer::getFirstMissingBlockFrom(int piece)
+int Peer::getFirstMissingBlockFrom(int piece) const
{
for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
}
/** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
-int Peer::partiallyDownloadedPiece(const Connection* remote_peer)
+int Peer::partiallyDownloadedPiece(const Connection* remote_peer) const
{
for (unsigned int i = 0; i < FILE_PIECES; i++)
if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i) && getFirstMissingBlockFrom(i) > 0)