X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/149c63f36e15b8500b1e826bda5138318ff7ba2b..6ade1c748396ae71562fd718e8409de61ab00148:/examples/s4u/app-bittorrent/s4u-peer.cpp diff --git a/examples/s4u/app-bittorrent/s4u-peer.cpp b/examples/s4u/app-bittorrent/s4u-peer.cpp index 2962df6e18..fa06d8e0c1 100644 --- a/examples/s4u/app-bittorrent/s4u-peer.cpp +++ b/examples/s4u/app-bittorrent/s4u-peer.cpp @@ -4,6 +4,7 @@ * under the terms of the license (GNU LGPL) which comes with this package. */ #include +#include #include #include "s4u-peer.hpp" @@ -25,6 +26,32 @@ constexpr unsigned long BLOCKS_REQUESTED = 2UL; constexpr double SLEEP_DURATION = 1.0; #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0) +/** Message sizes + * Sizes based on report by A. Legout et al, Understanding BitTorrent: An Experimental Perspective + * http://hal.inria.fr/inria-00000156/en + */ +constexpr unsigned message_size(MessageType type) +{ + constexpr std::array sizes{{/* HANDSHAKE */ 68, + /* CHOKE */ 5, + /* UNCHOKE */ 5, + /* INTERESTED */ 5, + /* NOTINTERESTED */ 5, + /* HAVE */ 9, + /* BITFIELD */ 5, + /* REQUEST */ 17, + /* PIECE */ 13, + /* CANCEL */ 17}}; + return sizes[static_cast(type)]; +} + +constexpr const char* message_name(MessageType type) +{ + constexpr std::array names{{"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "HAVE", + "BITFIELD", "REQUEST", "PIECE", "CANCEL"}}; + return names[static_cast(type)]; +} + Peer::Peer(std::vector args) { // Check arguments @@ -35,6 +62,7 @@ Peer::Peer(std::vector args) } catch (const std::invalid_argument&) { throw std::invalid_argument("Invalid ID:" + args[1]); } + random.set_seed(id); try { deadline = std::stod(args[2]); @@ -77,7 +105,7 @@ bool Peer::getPeersFromTracker() { simgrid::s4u::Mailbox* tracker_mailbox = simgrid::s4u::Mailbox::by_name(TRACKER_MAILBOX); // Build the task to send to the tracker - TrackerQuery* peer_request = new TrackerQuery(id, mailbox_); + auto* peer_request = new TrackerQuery(id, mailbox_); try { XBT_DEBUG("Sending a peer request to the tracker."); tracker_mailbox->put(peer_request, TRACKER_COMM_SIZE, GET_PEERS_TIMEOUT); @@ -88,12 +116,11 @@ bool Peer::getPeersFromTracker() } try { - TrackerAnswer* answer = static_cast(mailbox_->get(GET_PEERS_TIMEOUT)); + auto answer = mailbox_->get_unique(GET_PEERS_TIMEOUT); // Add the peers the tracker gave us to our peer list. for (auto const& peer_id : answer->getPeers()) if (id != peer_id) connected_peers.emplace(peer_id, Connection(peer_id)); - delete answer; } catch (const simgrid::TimeoutException&) { XBT_DEBUG("Timeout expired when requesting peers to tracker"); return false; @@ -105,16 +132,15 @@ void Peer::sendHandshakeToAllPeers() { for (auto const& kv : connected_peers) { const Connection& remote_peer = kv.second; - Message* handshake = new Message(MESSAGE_HANDSHAKE, id, mailbox_); - remote_peer.mailbox_->put_init(handshake, MESSAGE_HANDSHAKE_SIZE)->detach(); + auto* handshake = new Message(MessageType::HANDSHAKE, id, mailbox_); + remote_peer.mailbox_->put_init(handshake, message_size(MessageType::HANDSHAKE))->detach(); XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer.id); } } -void Peer::sendMessage(simgrid::s4u::Mailbox* mailbox, e_message_type type, uint64_t size) +void Peer::sendMessage(simgrid::s4u::Mailbox* mailbox, MessageType type, uint64_t size) { - const char* type_names[6] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "CANCEL"}; - XBT_DEBUG("Sending %s to %s", type_names[type], mailbox->get_cname()); + XBT_DEBUG("Sending %s to %s", message_name(type), mailbox->get_cname()); mailbox->put_init(new Message(type, id, bitfield_, mailbox_), size)->detach(); } @@ -122,8 +148,8 @@ void Peer::sendBitfield(simgrid::s4u::Mailbox* mailbox) { XBT_DEBUG("Sending a BITFIELD to %s", mailbox->get_cname()); mailbox - ->put_init(new Message(MESSAGE_BITFIELD, id, bitfield_, mailbox_), - MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES)) + ->put_init(new Message(MessageType::BITFIELD, id, bitfield_, mailbox_), + message_size(MessageType::BITFIELD) + BITS_TO_BYTES(FILE_PIECES)) ->detach(); } @@ -131,7 +157,8 @@ void Peer::sendPiece(simgrid::s4u::Mailbox* mailbox, unsigned int piece, int blo { xbt_assert(not hasNotPiece(piece), "Tried to send a unavailable piece."); XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->get_cname()); - mailbox->put_init(new Message(MESSAGE_PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)->detach(); + mailbox->put_init(new Message(MessageType::PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE) + ->detach(); } void Peer::sendHaveToAllPeers(unsigned int piece) @@ -139,7 +166,8 @@ void Peer::sendHaveToAllPeers(unsigned int piece) XBT_DEBUG("Sending HAVE message to all my peers"); for (auto const& kv : connected_peers) { const Connection& remote_peer = kv.second; - remote_peer.mailbox_->put_init(new Message(MESSAGE_HAVE, id, mailbox_, piece), MESSAGE_HAVE_SIZE)->detach(); + remote_peer.mailbox_->put_init(new Message(MessageType::HAVE, id, mailbox_, piece), message_size(MessageType::HAVE)) + ->detach(); } } @@ -149,16 +177,17 @@ void Peer::sendRequestTo(Connection* remote_peer, unsigned int piece) xbt_assert(remote_peer->hasPiece(piece)); int block_index = getFirstMissingBlockFrom(piece); if (block_index != -1) { - int block_length = std::min(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index); + int block_length = static_cast(std::min(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index)); XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", remote_peer->mailbox_->get_cname(), piece, block_index, block_length); remote_peer->mailbox_ - ->put_init(new Message(MESSAGE_REQUEST, id, mailbox_, piece, block_index, block_length), MESSAGE_REQUEST_SIZE) + ->put_init(new Message(MessageType::REQUEST, id, mailbox_, piece, block_index, block_length), + message_size(MessageType::REQUEST)) ->detach(); } } -std::string Peer::getStatus() +std::string Peer::getStatus() const { std::string res; for (unsigned i = 0; i < FILE_PIECES; i++) @@ -166,7 +195,7 @@ std::string Peer::getStatus() return res; } -bool Peer::hasFinished() +bool Peer::hasFinished() const { return bitfield_ == (1U << FILE_PIECES) - 1U; } @@ -192,7 +221,7 @@ void Peer::updatePiecesCountFromBitfield(unsigned int bitfield) pieces_count[i]++; } -unsigned int Peer::countPieces(unsigned int bitfield) +unsigned int Peer::countPieces(unsigned int bitfield) const { unsigned int count = 0U; unsigned int n = bitfield; @@ -203,7 +232,7 @@ unsigned int Peer::countPieces(unsigned int bitfield) return count; } -int Peer::nbInterestedPeers() +int Peer::nbInterestedPeers() const { int nb = 0; for (auto const& kv : connected_peers) @@ -221,13 +250,11 @@ void Peer::leech() sendHandshakeToAllPeers(); XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->get_cname()); - void* data = nullptr; while (simgrid::s4u::Engine::get_clock() < deadline && countPieces(bitfield_) < FILE_PIECES) { if (comm_received == nullptr) { - comm_received = mailbox_->get_async(&data); + comm_received = mailbox_->get_async(&message); } if (comm_received->test()) { - message = static_cast(data); handleMessage(); delete message; comm_received = nullptr; @@ -250,13 +277,11 @@ void Peer::seed() double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL; XBT_DEBUG("Start seeding."); // start the main seed loop - void* data = nullptr; while (simgrid::s4u::Engine::get_clock() < deadline) { if (comm_received == nullptr) { - comm_received = mailbox_->get_async(&data); + comm_received = mailbox_->get_async(&message); } if (comm_received->test()) { - message = static_cast(data); handleMessage(); delete message; comm_received = nullptr; @@ -282,28 +307,25 @@ void Peer::updateActivePeersSet(Connection* remote_peer) void Peer::handleMessage() { - const char* type_names[10] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", - "HAVE", "BITFIELD", "REQUEST", "PIECE", "CANCEL"}; - - XBT_DEBUG("Received a %s message from %s", type_names[message->type], message->return_mailbox->get_cname()); + XBT_DEBUG("Received a %s message from %s", message_name(message->type), message->return_mailbox->get_cname()); auto known_peer = connected_peers.find(message->peer_id); Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : &known_peer->second; - xbt_assert(remote_peer != nullptr || message->type == MESSAGE_HANDSHAKE, + xbt_assert(remote_peer != nullptr || message->type == MessageType::HANDSHAKE, "The impossible did happened: A not-in-our-list peer sent us a message."); switch (message->type) { - case MESSAGE_HANDSHAKE: + case MessageType::HANDSHAKE: // Check if the peer is in our connection list. if (remote_peer == nullptr) { XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id); connected_peers.emplace(message->peer_id, Connection(message->peer_id)); - sendMessage(message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE); + sendMessage(message->return_mailbox, MessageType::HANDSHAKE, message_size(MessageType::HANDSHAKE)); } // Send our bitfield to the peer sendBitfield(message->return_mailbox); break; - case MESSAGE_BITFIELD: + case MessageType::BITFIELD: // Update the pieces list updatePiecesCountFromBitfield(message->bitfield); // Store the bitfield @@ -311,32 +333,32 @@ void Peer::handleMessage() xbt_assert(not remote_peer->am_interested, "Should not be interested at first"); if (isInterestedBy(remote_peer)) { remote_peer->am_interested = true; - sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE); + sendMessage(message->return_mailbox, MessageType::INTERESTED, message_size(MessageType::INTERESTED)); } break; - case MESSAGE_INTERESTED: + case MessageType::INTERESTED: // Update the interested state of the peer. remote_peer->interested = true; updateActivePeersSet(remote_peer); break; - case MESSAGE_NOTINTERESTED: + case MessageType::NOTINTERESTED: remote_peer->interested = false; updateActivePeersSet(remote_peer); break; - case MESSAGE_UNCHOKE: + case MessageType::UNCHOKE: xbt_assert(remote_peer->choked_download); remote_peer->choked_download = false; // Send requests to the peer, since it has unchoked us if (remote_peer->am_interested) requestNewPieceTo(remote_peer); break; - case MESSAGE_CHOKE: + case MessageType::CHOKE: xbt_assert(not remote_peer->choked_download); remote_peer->choked_download = true; if (remote_peer->current_piece != -1) removeCurrentPiece(remote_peer, remote_peer->current_piece); break; - case MESSAGE_HAVE: + case MessageType::HAVE: XBT_DEBUG("\t for piece %d", message->piece); xbt_assert((message->piece >= 0 && static_cast(message->piece) < FILE_PIECES), "Wrong HAVE message received"); @@ -345,12 +367,12 @@ void Peer::handleMessage() // If the piece is in our pieces, we tell the peer that we are interested. if (not remote_peer->am_interested && hasNotPiece(message->piece)) { remote_peer->am_interested = true; - sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE); + sendMessage(message->return_mailbox, MessageType::INTERESTED, message_size(MessageType::INTERESTED)); if (not remote_peer->choked_download) requestNewPieceTo(remote_peer); } break; - case MESSAGE_REQUEST: + case MessageType::REQUEST: xbt_assert(remote_peer->interested); xbt_assert((message->piece >= 0 && static_cast(message->piece) < FILE_PIECES), "Wrong HAVE message received"); @@ -364,7 +386,7 @@ void Peer::handleMessage() XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id); } break; - case MESSAGE_PIECE: + case MessageType::PIECE: XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index, message->block_index + message->block_length); xbt_assert(not remote_peer->choked_download); @@ -392,7 +414,7 @@ void Peer::handleMessage() requestNewPieceTo(remote_peer); } break; - case MESSAGE_CANCEL: + case MessageType::CANCEL: break; default: THROW_IMPOSSIBLE; @@ -446,7 +468,7 @@ int Peer::selectPieceToDownload(const Connection* remote_peer) xbt_assert(nb_interesting_pieces != 0); // get a random interesting piece - int random_piece_index = simgrid::xbt::random::uniform_int(0, nb_interesting_pieces - 1); + int random_piece_index = random.uniform_int(0, nb_interesting_pieces - 1); int current_index = 0; for (unsigned int i = 0; i < FILE_PIECES; i++) { if (remotePeerHasMissingPiece(remote_peer, i)) { @@ -469,7 +491,7 @@ int Peer::selectPieceToDownload(const Connection* remote_peer) nb_interesting_pieces++; xbt_assert(nb_interesting_pieces != 0); // get a random interesting piece - int random_piece_index = simgrid::xbt::random::uniform_int(0, nb_interesting_pieces - 1); + int random_piece_index = random.uniform_int(0, nb_interesting_pieces - 1); int current_index = 0; for (unsigned int i = 0; i < FILE_PIECES; i++) { if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i)) { @@ -502,7 +524,7 @@ int Peer::selectPieceToDownload(const Connection* remote_peer) // get a random rarest piece int random_rarest_index = 0; if (nb_min_pieces > 0) { - random_rarest_index = simgrid::xbt::random::uniform_int(0, nb_min_pieces - 1); + random_rarest_index = random.uniform_int(0, nb_min_pieces - 1); } for (unsigned int i = 0; i < FILE_PIECES; i++) if (pieces_count[i] == min && remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i)) { @@ -551,8 +573,8 @@ void Peer::updateChokedPeers() int j = 0; do { // We choose a random peer to unchoke. - std::unordered_map::iterator chosen_peer_it = connected_peers.begin(); - std::advance(chosen_peer_it, simgrid::xbt::random::uniform_int(0, connected_peers.size() - 1)); + auto chosen_peer_it = connected_peers.begin(); + std::advance(chosen_peer_it, random.uniform_int(0, static_cast(connected_peers.size() - 1))); chosen_peer = &chosen_peer_it->second; if (not chosen_peer->interested || not chosen_peer->choked_upload) chosen_peer = nullptr; @@ -583,7 +605,7 @@ void Peer::updateChokedPeers() choked_peer->choked_upload = true; updateActivePeersSet(choked_peer); XBT_DEBUG("(%d) Sending a CHOKE to %d", id, choked_peer->id); - sendMessage(choked_peer->mailbox_, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE); + sendMessage(choked_peer->mailbox_, MessageType::CHOKE, message_size(MessageType::CHOKE)); } if (chosen_peer != nullptr) { xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer"); @@ -592,7 +614,7 @@ void Peer::updateChokedPeers() chosen_peer->last_unchoke = simgrid::s4u::Engine::get_clock(); XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id); updateActivePeersSet(chosen_peer); - sendMessage(chosen_peer->mailbox_, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE); + sendMessage(chosen_peer->mailbox_, MessageType::UNCHOKE, message_size(MessageType::UNCHOKE)); } } } @@ -613,7 +635,7 @@ void Peer::updateInterestedAfterReceive() if (not interested) { // no more piece to download from connection remote_peer.am_interested = false; - sendMessage(remote_peer.mailbox_, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE); + sendMessage(remote_peer.mailbox_, MessageType::NOTINTERESTED, message_size(MessageType::NOTINTERESTED)); } } } @@ -628,7 +650,7 @@ void Peer::updateBitfieldBlocks(int piece, int block_index, int block_length) bitfield_blocks |= (1ULL << static_cast(piece * PIECES_BLOCKS + i)); } -bool Peer::hasCompletedPiece(unsigned int piece) +bool Peer::hasCompletedPiece(unsigned int piece) const { for (unsigned int i = 0; i < PIECES_BLOCKS; i++) if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i))) @@ -636,7 +658,7 @@ bool Peer::hasCompletedPiece(unsigned int piece) return true; } -int Peer::getFirstMissingBlockFrom(int piece) +int Peer::getFirstMissingBlockFrom(int piece) const { for (unsigned int i = 0; i < PIECES_BLOCKS; i++) if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i))) @@ -645,7 +667,7 @@ int Peer::getFirstMissingBlockFrom(int piece) } /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */ -int Peer::partiallyDownloadedPiece(const Connection* remote_peer) +int Peer::partiallyDownloadedPiece(const Connection* remote_peer) const { for (unsigned int i = 0; i < FILE_PIECES; i++) if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i) && getFirstMissingBlockFrom(i) > 0)