From f2d416e568c637bec46247643265b22045e6eed3 Mon Sep 17 00:00:00 2001 From: Frederic Suter Date: Sun, 6 Aug 2017 15:57:31 +0200 Subject: [PATCH] rewrite bittorrent example in s4u --- examples/msg/app-bittorrent/bittorrent.h | 2 +- examples/msg/app-bittorrent/peer.c | 2 +- examples/msg/app-bittorrent/tracker.c | 2 +- examples/s4u/CMakeLists.txt | 15 +- .../app-bittorrent/s4u_app-bittorrent.tesh | 23 + .../app-bittorrent/s4u_app-bittorrent_d.xml | 39 + .../s4u/app-bittorrent/s4u_bittorrent.cpp | 37 + .../s4u/app-bittorrent/s4u_bittorrent.hpp | 104 +++ examples/s4u/app-bittorrent/s4u_peer.cpp | 707 ++++++++++++++++++ examples/s4u/app-bittorrent/s4u_peer.hpp | 95 +++ examples/s4u/app-bittorrent/s4u_tracker.cpp | 71 ++ examples/s4u/app-bittorrent/s4u_tracker.hpp | 48 ++ 12 files changed, 1140 insertions(+), 5 deletions(-) create mode 100644 examples/s4u/app-bittorrent/s4u_app-bittorrent.tesh create mode 100644 examples/s4u/app-bittorrent/s4u_app-bittorrent_d.xml create mode 100644 examples/s4u/app-bittorrent/s4u_bittorrent.cpp create mode 100644 examples/s4u/app-bittorrent/s4u_bittorrent.hpp create mode 100644 examples/s4u/app-bittorrent/s4u_peer.cpp create mode 100644 examples/s4u/app-bittorrent/s4u_peer.hpp create mode 100644 examples/s4u/app-bittorrent/s4u_tracker.cpp create mode 100644 examples/s4u/app-bittorrent/s4u_tracker.hpp diff --git a/examples/msg/app-bittorrent/bittorrent.h b/examples/msg/app-bittorrent/bittorrent.h index b45d688a4c..62d4172e9e 100644 --- a/examples/msg/app-bittorrent/bittorrent.h +++ b/examples/msg/app-bittorrent/bittorrent.h @@ -10,7 +10,7 @@ #define MAILBOX_SIZE 40 #define TRACKER_MAILBOX "tracker_mailbox" /** Max number of pairs sent by the tracker to clients */ -#define MAXIMUM_PAIRS 50 +#define MAXIMUM_PEERS 50 /** Interval of time where the peer should send a request to the tracker */ #define TRACKER_QUERY_INTERVAL 1000 /** Communication size for a task to the tracker */ diff --git a/examples/msg/app-bittorrent/peer.c b/examples/msg/app-bittorrent/peer.c index 7c248b39ee..23b78ef9e5 100644 --- a/examples/msg/app-bittorrent/peer.c +++ b/examples/msg/app-bittorrent/peer.c @@ -624,7 +624,7 @@ void update_choked_peers(peer_t peer) else XBT_DEBUG("Nothing to do, keep going"); j++; - } while (peer_choosed == NULL && j < MAXIMUM_PAIRS); + } while (peer_choosed == NULL && j < MAXIMUM_PEERS); } else { //Use the "fastest download" policy. connection_t connection; diff --git a/examples/msg/app-bittorrent/tracker.c b/examples/msg/app-bittorrent/tracker.c index bdcea7f858..255ed261b9 100644 --- a/examples/msg/app-bittorrent/tracker.c +++ b/examples/msg/app-bittorrent/tracker.c @@ -49,7 +49,7 @@ int tracker(int argc, char *argv[]) //Sending peers to the peer int next_peer; int peers_length = xbt_dynar_length(peers_list); - for (int i = 0; i < MAXIMUM_PAIRS && i < peers_length; i++) { + for (int i = 0; i < MAXIMUM_PEERS && i < peers_length; i++) { do { next_peer = xbt_dynar_get_as(peers_list, RngStream_RandInt(stream, 0, peers_length - 1), int); } while (is_in_list(data->peers, next_peer)); diff --git a/examples/s4u/CMakeLists.txt b/examples/s4u/CMakeLists.txt index 72a43da51f..6064c0caa5 100644 --- a/examples/s4u/CMakeLists.txt +++ b/examples/s4u/CMakeLists.txt @@ -17,12 +17,23 @@ foreach (file s4u_dht-chord node) endforeach() set(examples_src ${examples_src} ${CMAKE_CURRENT_SOURCE_DIR}/dht-chord/s4u_dht-chord.hpp) +add_executable (s4u_bittorrent app-bittorrent/s4u_bittorrent.cpp app-bittorrent/s4u_peer.cpp + app-bittorrent/s4u_tracker.cpp) +target_link_libraries(s4u_bittorrent simgrid) +set_target_properties(s4u_bittorrent PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/app-bittorrent) +foreach (file s4u_bittorrent s4u_peer s4u_tracker) + set(examples_src ${examples_src} ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/${file}.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/${file}.hpp) +endforeach() + set(examples_src ${examples_src} PARENT_SCOPE) -set(tesh_files ${tesh_files} ${CMAKE_CURRENT_SOURCE_DIR}/dht-chord/s4u_dht-chord.tesh PARENT_SCOPE) +set(tesh_files ${tesh_files} ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/s4u_app-bittorrent.tesh + ${CMAKE_CURRENT_SOURCE_DIR}/dht-chord/s4u_dht-chord.tesh PARENT_SCOPE) set(xml_files ${xml_files} ${CMAKE_CURRENT_SOURCE_DIR}/actions-comm/s4u_actions-comm_split_d.xml ${CMAKE_CURRENT_SOURCE_DIR}/actions-comm/s4u_actions-comm_d.xml ${CMAKE_CURRENT_SOURCE_DIR}/actions-storage/s4u_actions-storage_d.xml ${CMAKE_CURRENT_SOURCE_DIR}/actor-create/s4u_actor-create_d.xml + ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/s4u_app-bittorrent_d.xml ${CMAKE_CURRENT_SOURCE_DIR}/app-masterworker/s4u_app-masterworker_d.xml ${CMAKE_CURRENT_SOURCE_DIR}/dht-chord/s4u_dht-chord_d.xml PARENT_SCOPE) set(txt_files ${txt_files} ${CMAKE_CURRENT_SOURCE_DIR}/actions-comm/s4u_actions-comm_split_p0.txt @@ -32,6 +43,6 @@ set(txt_files ${txt_files} ${CMAKE_CURRENT_SOURCE_DIR}/actions-comm/s4u_a ${CMAKE_CURRENT_SOURCE_DIR}/README.doc PARENT_SCOPE) foreach(example actions-comm actions-storage actor-create actor-daemon actor-kill actor-migration actor-suspend - app-masterworker app-pingpong app-token-ring dht-chord plugin-hostload io mutex ) + app-bittorrent app-masterworker app-pingpong app-token-ring dht-chord plugin-hostload io mutex ) ADD_TESH_FACTORIES(s4u-${example} "thread;ucontext;raw;boost" --setenv bindir=${CMAKE_CURRENT_BINARY_DIR}/${example} --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/platforms --cd ${CMAKE_HOME_DIRECTORY}/examples/s4u/${example} s4u_${example}.tesh) endforeach() diff --git a/examples/s4u/app-bittorrent/s4u_app-bittorrent.tesh b/examples/s4u/app-bittorrent/s4u_app-bittorrent.tesh new file mode 100644 index 0000000000..d309e01b18 --- /dev/null +++ b/examples/s4u/app-bittorrent/s4u_app-bittorrent.tesh @@ -0,0 +1,23 @@ +#! ./tesh + +p Testing the Bittorrent implementation with MSG + +! timeout 10 +! output sort 19 +$ $SG_TEST_EXENV ${bindir:=.}/s4u_bittorrent ${srcdir:=.}/cluster.xml ${srcdir:=.}/../s4u/app-bittorrent/s4u_app-bittorrent_d.xml "--log=root.fmt:[%12.6r]%e(%i:%P@%h)%e%m%n" +> [ 0.000000] (1:tracker@node-0.acme.org) Tracker launched. +> [ 0.000000] (2:peer@node-1.acme.org) Hi, I'm joining the network with id 2 +> [ 0.000000] (3:peer@node-2.acme.org) Hi, I'm joining the network with id 3 +> [ 0.000000] (4:peer@node-3.acme.org) Hi, I'm joining the network with id 4 +> [ 0.000000] (5:peer@node-4.acme.org) Hi, I'm joining the network with id 5 +> [ 0.000000] (6:peer@node-5.acme.org) Hi, I'm joining the network with id 6 +> [ 0.000000] (7:peer@node-6.acme.org) Hi, I'm joining the network with id 7 +> [ 0.000000] (8:peer@node-7.acme.org) Hi, I'm joining the network with id 8 +> [ 3000.000000] (1:tracker@node-0.acme.org) Tracker is leaving +> [ 5000.007806] (2:peer@node-1.acme.org) Here is my current status: 1111111111 +> [ 5000.007806] (3:peer@node-2.acme.org) Here is my current status: 1111111111 +> [ 5000.007806] (4:peer@node-3.acme.org) Here is my current status: 1111111111 +> [ 5000.007806] (5:peer@node-4.acme.org) Here is my current status: 1111111111 +> [ 5000.007806] (6:peer@node-5.acme.org) Here is my current status: 1111111111 +> [ 5000.007806] (7:peer@node-6.acme.org) Here is my current status: 1111111111 +> [ 5000.007806] (8:peer@node-7.acme.org) Here is my current status: 1111111111 diff --git a/examples/s4u/app-bittorrent/s4u_app-bittorrent_d.xml b/examples/s4u/app-bittorrent/s4u_app-bittorrent_d.xml new file mode 100644 index 0000000000..5460ab1a9d --- /dev/null +++ b/examples/s4u/app-bittorrent/s4u_app-bittorrent_d.xml @@ -0,0 +1,39 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/s4u/app-bittorrent/s4u_bittorrent.cpp b/examples/s4u/app-bittorrent/s4u_bittorrent.cpp new file mode 100644 index 0000000000..cbc093b293 --- /dev/null +++ b/examples/s4u/app-bittorrent/s4u_bittorrent.cpp @@ -0,0 +1,37 @@ +/* Copyright (c) 2012-2017. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "s4u_bittorrent.hpp" +#include "s4u_peer.hpp" +#include "s4u_tracker.hpp" + +simgrid::xbt::Extension HostBittorrent::EXTENSION_ID; + +int main(int argc, char* argv[]) +{ + simgrid::s4u::Engine* e = new simgrid::s4u::Engine(&argc, argv); + + /* Check the arguments */ + xbt_assert(argc > 2, "Usage: %s platform_file deployment_file", argv[0]); + + e->loadPlatform(argv[1]); + + HostBittorrent::EXTENSION_ID = simgrid::s4u::Host::extension_create(); + + std::vector list; + simgrid::s4u::Engine::getInstance()->getHostList(&list); + for (auto host : list) + host->extension_set(new HostBittorrent(host)); + + e->registerFunction("tracker"); + e->registerFunction("peer"); + e->loadDeployment(argv[2]); + + e->run(); + + delete e; + return 0; +} diff --git a/examples/s4u/app-bittorrent/s4u_bittorrent.hpp b/examples/s4u/app-bittorrent/s4u_bittorrent.hpp new file mode 100644 index 0000000000..4b02ab2f16 --- /dev/null +++ b/examples/s4u/app-bittorrent/s4u_bittorrent.hpp @@ -0,0 +1,104 @@ +/* Copyright (c) 2012-2014, 2016-2017. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef BITTORRENT_BITTORRENT_HPP_ +#define BITTORRENT_BITTORRENT_HPP_ + +#include +#include + +#define MAILBOX_SIZE 40 +#define TRACKER_MAILBOX "tracker_mailbox" +/** Max number of peers sent by the tracker to clients */ +#define MAXIMUM_PEERS 50 +/** Interval of time where the peer should send a request to the tracker */ +#define TRACKER_QUERY_INTERVAL 1000 +/** Communication size for a task to the tracker */ +#define TRACKER_COMM_SIZE 0.01 +#define GET_PEERS_TIMEOUT 10000 +#define TIMEOUT_MESSAGE 10 +#define TRACKER_RECEIVE_TIMEOUT 10 +/** Number of peers that can be unchocked at a given time */ +#define MAX_UNCHOKED_PEERS 4 +/** Interval between each update of the choked peers */ +#define UPDATE_CHOKED_INTERVAL 30 +/** Number of pieces the peer asks for simultaneously */ +#define MAX_PIECES 1 + +/** Message sizes + * Sizes based on report by A. Legout et al, Understanding BitTorrent: An Experimental Perspective + * http://hal.inria.fr/inria-00000156/en + */ +#define MESSAGE_HANDSHAKE_SIZE 68 +#define MESSAGE_CHOKE_SIZE 5 +#define MESSAGE_UNCHOKE_SIZE 5 +#define MESSAGE_INTERESTED_SIZE 5 +#define MESSAGE_NOTINTERESTED_SIZE 5 +#define MESSAGE_HAVE_SIZE 9 +#define MESSAGE_BITFIELD_SIZE 5 +#define MESSAGE_REQUEST_SIZE 17 +#define MESSAGE_PIECE_SIZE 13 +#define MESSAGE_CANCEL_SIZE 17 + +/** Types of messages exchanged between two peers. */ +typedef enum { + MESSAGE_HANDSHAKE, + MESSAGE_CHOKE, + MESSAGE_UNCHOKE, + MESSAGE_INTERESTED, + MESSAGE_NOTINTERESTED, + MESSAGE_HAVE, + MESSAGE_BITFIELD, + MESSAGE_REQUEST, + MESSAGE_PIECE, + MESSAGE_CANCEL +} e_message_type; + +class Message { +public: + e_message_type type; + int peer_id; + simgrid::s4u::MailboxPtr return_mailbox; + unsigned int bitfield = 0U; + int piece = 0; + int block_index = 0; + int block_length = 0; + Message(e_message_type type, int peer_id, simgrid::s4u::MailboxPtr return_mailbox) + : type(type), peer_id(peer_id), return_mailbox(return_mailbox){}; + Message(e_message_type type, int peer_id, unsigned int bitfield, simgrid::s4u::MailboxPtr return_mailbox) + : type(type), peer_id(peer_id), return_mailbox(return_mailbox), bitfield(bitfield){}; + Message(e_message_type type, int peer_id, simgrid::s4u::MailboxPtr return_mailbox, int piece, int block_index, + int block_length) + : type(type) + , peer_id(peer_id) + , return_mailbox(return_mailbox) + , piece(piece) + , block_index(block_index) + , block_length(block_length){}; + Message(e_message_type type, int peer_id, simgrid::s4u::MailboxPtr return_mailbox, int piece) + : type(type), peer_id(peer_id), return_mailbox(return_mailbox), piece(piece){}; + ~Message() = default; +}; + +class HostBittorrent { + RngStream stream_; + simgrid::s4u::Host* host = nullptr; + +public: + static simgrid::xbt::Extension EXTENSION_ID; + + explicit HostBittorrent(simgrid::s4u::Host* ptr) : host(ptr) + { + std::string descr = std::string("RngSream<") + host->getCname() + ">"; + stream_ = RngStream_CreateStream(descr.c_str()); + } + + ~HostBittorrent() { RngStream_DeleteStream(&stream_); }; + + RngStream getStream() { return stream_; }; +}; + +#endif /* BITTORRENT_BITTORRENT_HPP_ */ diff --git a/examples/s4u/app-bittorrent/s4u_peer.cpp b/examples/s4u/app-bittorrent/s4u_peer.cpp new file mode 100644 index 0000000000..ba68a23e65 --- /dev/null +++ b/examples/s4u/app-bittorrent/s4u_peer.cpp @@ -0,0 +1,707 @@ +/* Copyright (c) 2012-2017. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include +#include + +#include "s4u_peer.hpp" +#include "s4u_tracker.hpp" + +XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_bt_peer, "Messages specific for the peers"); + +/* + * User parameters for transferred file data. For the test, the default values are : + * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes + */ +#define FILE_PIECES 10UL +#define PIECES_BLOCKS 5UL +#define BLOCK_SIZE 16384 +static const unsigned long int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE; + +/** Number of blocks asked by each request */ +#define BLOCKS_REQUESTED 2 + +#define ENABLE_END_GAME_MODE 1 +#define SLEEP_DURATION 1 +#define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0) + +Peer::Peer(std::vector args) +{ + // Check arguments + xbt_assert(args.size() == 3 || args.size() == 4, "Wrong number of arguments"); + try { + id = std::stoi(args[1]); + mailbox_ = simgrid::s4u::Mailbox::byName(std::to_string(id)); + } catch (std::invalid_argument& ia) { + throw std::invalid_argument(std::string("Invalid ID:") + args[1].c_str()); + } + + try { + deadline = std::stod(args[2]); + } catch (std::invalid_argument& ia) { + throw std::invalid_argument(std::string("Invalid deadline:") + args[2].c_str()); + } + xbt_assert(deadline > 0, "Wrong deadline supplied"); + + stream = simgrid::s4u::this_actor::getHost()->extension()->getStream(); + + if (args.size() == 4 && args[3] == "1") { + bitfield_ = (1U << FILE_PIECES) - 1U; + bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL; + } + pieces_count = new short[FILE_PIECES]{0}; + + XBT_INFO("Hi, I'm joining the network with id %d", id); +} + +Peer::~Peer() +{ + for (auto peer : connected_peers) + delete peer.second; + delete[] pieces_count; +} + +/** Peer main function */ +void Peer::operator()() +{ + // Getting peer data from the tracker. + if (getPeersFromTracker()) { + XBT_DEBUG("Got %zu peers from the tracker. Current status is: %s", connected_peers.size(), getStatus().c_str()); + begin_receive_time = simgrid::s4u::Engine::getClock(); + mailbox_->setReceiver(simgrid::s4u::Actor::self()); + if (hasFinished()) { + sendHandshakeToAllPeers(); + } else { + leech(); + } + seed(); + } else { + XBT_INFO("Couldn't contact the tracker."); + } + + XBT_INFO("Here is my current status: %s", getStatus().c_str()); +} + +bool Peer::getPeersFromTracker() +{ + simgrid::s4u::MailboxPtr tracker_mailbox = simgrid::s4u::Mailbox::byName(TRACKER_MAILBOX); + // Build the task to send to the tracker + TrackerQuery* peer_request = new TrackerQuery(id, mailbox_, 0, 0, FILE_SIZE); + try { + XBT_DEBUG("Sending a peer request to the tracker."); + tracker_mailbox->put(peer_request, TRACKER_COMM_SIZE, GET_PEERS_TIMEOUT); + } catch (xbt_ex& e) { + if (e.category == timeout_error) { + XBT_DEBUG("Timeout expired when requesting peers to tracker"); + delete peer_request; + return false; + } + } + + try { + TrackerAnswer* answer = static_cast(mailbox_->get(GET_PEERS_TIMEOUT)); + // Add the peers the tracker gave us to our peer list. + for (auto peer_id : *answer->getPeers()) + if (id != peer_id) + connected_peers[peer_id] = new Connection(peer_id); + delete answer; + } catch (xbt_ex& e) { + if (e.category == timeout_error) { + XBT_DEBUG("Timeout expired when requesting peers to tracker"); + return false; + } + } + return true; +} + +void Peer::sendHandshakeToAllPeers() +{ + for (auto kv : connected_peers) { + Connection* remote_peer = kv.second; + Message* handshake = new Message(MESSAGE_HANDSHAKE, id, mailbox_); + remote_peer->mailbox_->put_init(handshake, MESSAGE_HANDSHAKE_SIZE)->detach(); + XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer->id); + } +} + +void Peer::sendHandshake(simgrid::s4u::MailboxPtr mailbox) +{ + XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox->getName()); + mailbox->put_init(new Message(MESSAGE_HANDSHAKE, id, mailbox_), MESSAGE_HANDSHAKE_SIZE)->detach(); +} + +void Peer::sendBitfield(simgrid::s4u::MailboxPtr mailbox) +{ + XBT_DEBUG("Sending a BITFIELD to %s", mailbox->getName()); + mailbox + ->put_init(new Message(MESSAGE_BITFIELD, id, bitfield_, mailbox_), + MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES)) + ->detach(); +} + +void Peer::sendInterested(simgrid::s4u::MailboxPtr mailbox) +{ + XBT_DEBUG("Sending INTERESTED to %s", mailbox->getName()); + mailbox->put_init(new Message(MESSAGE_INTERESTED, id, bitfield_, mailbox_), MESSAGE_INTERESTED_SIZE)->detach(); +} + +void Peer::sendNotInterested(simgrid::s4u::MailboxPtr mailbox) +{ + XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox->getName()); + mailbox->put_init(new Message(MESSAGE_NOTINTERESTED, id, bitfield_, mailbox_), MESSAGE_NOTINTERESTED_SIZE)->detach(); +} + +void Peer::sendChoked(simgrid::s4u::MailboxPtr mailbox) +{ + XBT_DEBUG("Sending CHOKE to %s", mailbox->getName()); + mailbox->put_init(new Message(MESSAGE_CHOKE, id, mailbox_), MESSAGE_CHOKE_SIZE)->detach(); +} + +/** Send a "unchoked" message to a peer */ +void Peer::sendUnchoked(simgrid::s4u::MailboxPtr mailbox) +{ + XBT_DEBUG("Sending UNCHOKE to %s", mailbox->getName()); + mailbox->put_init(new Message(MESSAGE_UNCHOKE, id, mailbox_), MESSAGE_UNCHOKE_SIZE)->detach(); +} + +void Peer::sendPiece(simgrid::s4u::MailboxPtr mailbox, unsigned int piece, int block_index, int block_length) +{ + xbt_assert(!hasNotPiece(piece), "Tried to send a unavailable piece."); + XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->getName()); + mailbox->put_init(new Message(MESSAGE_PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)->detach(); +} + +void Peer::sendRequest(simgrid::s4u::MailboxPtr mailbox, unsigned int piece, int block_index, int block_length) +{ + XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", mailbox->getName(), piece, block_index, block_length); + mailbox->put_init(new Message(MESSAGE_REQUEST, id, mailbox_, piece, block_index, block_length), MESSAGE_REQUEST_SIZE) + ->detach(); +} + +void Peer::sendHaveToAllPeers(unsigned int piece) +{ + XBT_DEBUG("Sending HAVE message to all my peers"); + for (auto kv : connected_peers) { + Connection* remote_peer = kv.second; + remote_peer->mailbox_->put_init(new Message(MESSAGE_HAVE, id, mailbox_, piece), MESSAGE_HAVE_SIZE)->detach(); + } +} + +void Peer::sendRequestTo(Connection* remote_peer, unsigned int piece) +{ + remote_peer->current_piece = piece; + xbt_assert(remote_peer->hasPiece(piece)); + int block_index = getFirstMissingBlockFrom(piece); + if (block_index != -1) { + int block_length = MIN(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index); + sendRequest(remote_peer->mailbox_, piece, block_index, block_length); + } +} + +std::string Peer::getStatus() +{ + std::string res = std::string(""); + for (int i = FILE_PIECES - 1; i >= 0; i--) + res = std::string((bitfield_ & (1U << i)) ? "1" : "0") + res; + return res; +} + +bool Peer::hasFinished() +{ + return bitfield_ == (1U << FILE_PIECES) - 1U; +} + +/** Indicates if the remote peer has a piece not stored by the local peer */ +bool Peer::isInterestedBy(Connection* remote_peer) +{ + return remote_peer->bitfield & (bitfield_ ^ ((1 << FILE_PIECES) - 1)); +} + +void Peer::updatePiecesCountFromBitfield(unsigned int bitfield) +{ + for (unsigned int i = 0; i < FILE_PIECES; i++) + if (bitfield & (1U << i)) + pieces_count[i]++; +} + +unsigned int Peer::countPieces(unsigned int bitfield) +{ + unsigned int count = 0U; + unsigned int n = bitfield; + while (n) { + count += n & 1U; + n >>= 1U; + } + return count; +} + +int Peer::nbInterestedPeers() +{ + int nb = 0; + for (auto kv : connected_peers) + if (kv.second->interested) + nb++; + return nb; +} + +void Peer::leech() +{ + double next_choked_update = simgrid::s4u::Engine::getClock() + UPDATE_CHOKED_INTERVAL; + XBT_DEBUG("Start downloading."); + + /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */ + sendHandshakeToAllPeers(); + XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->getName()); + + void* data = nullptr; + while (simgrid::s4u::Engine::getClock() < deadline && countPieces(bitfield_) < FILE_PIECES) { + if (comm_received == nullptr) { + comm_received = mailbox_->get_async(&data); + } + if (comm_received->test()) { + message = static_cast(data); + handleMessage(); + delete message; + comm_received = nullptr; + } else { + // We don't execute the choke algorithm if we don't already have a piece + if (simgrid::s4u::Engine::getClock() >= next_choked_update && countPieces(bitfield_) > 0) { + updateChokedPeers(); + next_choked_update += UPDATE_CHOKED_INTERVAL; + } else { + simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION); + } + } + } + if (hasFinished()) + XBT_DEBUG("%d becomes a seeder", id); +} + +void Peer::seed() +{ + double next_choked_update = simgrid::s4u::Engine::getClock() + UPDATE_CHOKED_INTERVAL; + XBT_DEBUG("Start seeding."); + // start the main seed loop + void* data = nullptr; + while (simgrid::s4u::Engine::getClock() < deadline) { + if (comm_received == nullptr) { + comm_received = mailbox_->get_async(&data); + } + if (comm_received->test()) { + message = static_cast(data); + handleMessage(); + delete message; + comm_received = nullptr; + } else { + if (simgrid::s4u::Engine::getClock() >= next_choked_update) { + updateChokedPeers(); + // TODO: Change the choked peer algorithm when seeding. + next_choked_update += UPDATE_CHOKED_INTERVAL; + } else { + simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION); + } + } + } +} + +void Peer::updateActivePeersSet(Connection* remote_peer) +{ + if (remote_peer->interested && not remote_peer->choked_upload) { + // add in the active peers set + active_peers.insert(remote_peer); + } else if (active_peers.find(remote_peer) != active_peers.end()) { + active_peers.erase(remote_peer); + } +} + +void Peer::handleMessage() +{ + const char* type_names[10] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", + "HAVE", "BITFIELD", "REQUEST", "PIECE", "CANCEL"}; + + XBT_DEBUG("Received a %s message from %s", type_names[message->type], message->return_mailbox->getName()); + + auto known_peer = connected_peers.find(message->peer_id); + Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : known_peer->second; + switch (message->type) { + case MESSAGE_HANDSHAKE: + // Check if the peer is in our connection list. + if (remote_peer == nullptr) { + XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id); + connected_peers[message->peer_id] = new Connection(message->peer_id); + sendHandshake(message->return_mailbox); + } + // Send our bitfield to the peer + sendBitfield(message->return_mailbox); + break; + case MESSAGE_BITFIELD: + // Update the pieces list + updatePiecesCountFromBitfield(message->bitfield); + // Store the bitfield + remote_peer->bitfield = message->bitfield; + xbt_assert(!remote_peer->am_interested, "Should not be interested at first"); + if (isInterestedBy(remote_peer)) { + remote_peer->am_interested = true; + sendInterested(message->return_mailbox); + } + break; + case MESSAGE_INTERESTED: + xbt_assert((remote_peer != nullptr), + "The impossible did happened: A non-in-our-list peer has sent us a message."); + // Update the interested state of the peer. + remote_peer->interested = true; + updateActivePeersSet(remote_peer); + break; + case MESSAGE_NOTINTERESTED: + xbt_assert((remote_peer != nullptr), + "The impossible did happened: A non-in-our-list peer has sent us a message."); + remote_peer->interested = false; + updateActivePeersSet(remote_peer); + break; + case MESSAGE_UNCHOKE: + xbt_assert((remote_peer != nullptr), + "The impossible did happened: A non-in-our-list peer has sent us a message."); + xbt_assert(remote_peer->choked_download); + remote_peer->choked_download = false; + // Send requests to the peer, since it has unchoked us + // if (remote_peer->am_interested) + requestNewPieceTo(remote_peer); + break; + case MESSAGE_CHOKE: + xbt_assert((remote_peer != nullptr), + "The impossible did happened: A non-in-our-list peer has sent us a message."); + xbt_assert(not remote_peer->choked_download); + remote_peer->choked_download = true; + if (remote_peer->current_piece != -1) + removeCurrentPiece(remote_peer, remote_peer->current_piece); + break; + case MESSAGE_HAVE: + XBT_DEBUG("\t for piece %d", message->piece); + xbt_assert((message->piece >= 0 && static_cast(message->piece) < FILE_PIECES), + "Wrong HAVE message received"); + remote_peer->bitfield = remote_peer->bitfield | (1U << static_cast(message->piece)); + pieces_count[message->piece]++; + // If the piece is in our pieces, we tell the peer that we are interested. + if (not remote_peer->am_interested && hasNotPiece(message->piece)) { + remote_peer->am_interested = true; + sendInterested(message->return_mailbox); + if (not remote_peer->choked_download) + requestNewPieceTo(remote_peer); + } + break; + case MESSAGE_REQUEST: + xbt_assert(remote_peer->interested); + xbt_assert((message->piece >= 0 && static_cast(message->piece) < FILE_PIECES), + "Wrong HAVE message received"); + if (not remote_peer->choked_upload) { + XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index, + message->block_index + message->block_length); + if (not hasNotPiece(message->piece)) { + sendPiece(message->return_mailbox, message->piece, message->block_index, message->block_length); + } + } else { + XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id); + } + break; + case MESSAGE_PIECE: + XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index, + message->block_index + message->block_length); + xbt_assert(not remote_peer->choked_download); + xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE, + "Can't received a piece if I'm not interested wihtout end-game mode!" + "piece (%d) bitfield (%u) remote bitfield (%u)", + message->piece, bitfield_, remote_peer->bitfield); + xbt_assert(not remote_peer->choked_download, "Can't received a piece if I'm choked !"); + xbt_assert((message->piece >= 0 && static_cast(message->piece) < FILE_PIECES), + "Wrong piece received"); + // TODO: Execute a computation. + if (hasNotPiece(static_cast(message->piece))) { + updateBitfieldBlocks(message->piece, message->block_index, message->block_length); + if (hasCompletedPiece(static_cast(message->piece))) { + // Removing the piece from our piece list + removeCurrentPiece(remote_peer, message->piece); + // Setting the fact that we have the piece + bitfield_ = bitfield_ | (1U << static_cast(message->piece)); + XBT_DEBUG("My status is now %s", getStatus().c_str()); + // Sending the information to all the peers we are connected to + sendHaveToAllPeers(message->piece); + // sending UNINTERESTED to peers that do not have what we want. + updateInterestedAfterReceive(); + } else { // piece not completed + sendRequestTo(remote_peer, message->piece); // ask for the next block + } + } else { + XBT_DEBUG("However, we already have it"); + xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !"); + requestNewPieceTo(remote_peer); + } + break; + case MESSAGE_CANCEL: + break; + default: + THROW_IMPOSSIBLE; + } + // Update the peer speed. + if (remote_peer) { + remote_peer->addSpeedValue(1.0 / (simgrid::s4u::Engine::getClock() - begin_receive_time)); + } + begin_receive_time = simgrid::s4u::Engine::getClock(); +} + +/** Selects the appropriate piece to download and requests it to the remote_peer */ +void Peer::requestNewPieceTo(Connection* remote_peer) +{ + int piece = selectPieceToDownload(remote_peer); + if (piece != -1) { + current_pieces |= (1U << (unsigned int)piece); + sendRequestTo(remote_peer, piece); + } +} + +void Peer::removeCurrentPiece(Connection* remote_peer, unsigned int current_piece) +{ + current_pieces &= ~(1U << current_piece); + remote_peer->current_piece = -1; +} + +/** @brief Return the piece to be downloaded + * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole : + * If a piece is partially downloaded, this piece will be selected prioritarily + * If the peer has strictly less than 4 pieces, he chooses a piece at random. + * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy). + * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode). + * @param remote_peer: information about the connection + * @return the piece to download if possible. -1 otherwise + */ +int Peer::selectPieceToDownload(Connection* remote_peer) +{ + int piece = partiallyDownloadedPiece(remote_peer); + // strict priority policy + if (piece != -1) + return piece; + + // end game mode + if (countPieces(current_pieces) >= (FILE_PIECES - countPieces(bitfield_)) && isInterestedBy(remote_peer)) { +#if ENABLE_END_GAME_MODE == 0 + return -1; +#endif + int nb_interesting_pieces = 0; + // compute the number of interesting pieces + for (unsigned int i = 0; i < FILE_PIECES; i++) + if (hasNotPiece(i) && remote_peer->hasPiece(i)) + nb_interesting_pieces++; + + xbt_assert(nb_interesting_pieces != 0); + // get a random interesting piece + int random_piece_index = RngStream_RandInt(stream, 0, nb_interesting_pieces - 1); + int current_index = 0; + for (unsigned int i = 0; i < FILE_PIECES; i++) { + if (hasNotPiece(i) && remote_peer->hasPiece(i)) { + if (random_piece_index == current_index) { + piece = i; + break; + } + current_index++; + } + } + xbt_assert(piece != -1); + return piece; + } + // Random first policy + if (countPieces(bitfield_) < 4 && isInterestedByFree(remote_peer)) { + int nb_interesting_pieces = 0; + // compute the number of interesting pieces + for (unsigned int i = 0; i < FILE_PIECES; i++) + if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) + nb_interesting_pieces++; + xbt_assert(nb_interesting_pieces != 0); + // get a random interesting piece + int random_piece_index = RngStream_RandInt(stream, 0, nb_interesting_pieces - 1); + int current_index = 0; + for (unsigned int i = 0; i < FILE_PIECES; i++) { + if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) { + if (random_piece_index == current_index) { + piece = i; + break; + } + current_index++; + } + } + xbt_assert(piece != -1); + return piece; + } else { // Rarest first policy + short min = SHRT_MAX; + int nb_min_pieces = 0; + int current_index = 0; + // compute the smallest number of copies of available pieces + for (unsigned int i = 0; i < FILE_PIECES; i++) { + if (pieces_count[i] < min) + if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) + min = pieces_count[i]; + } + + xbt_assert(min != SHRT_MAX || not isInterestedByFree(remote_peer)); + // compute the number of rarest pieces + for (unsigned int i = 0; i < FILE_PIECES; i++) + if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) + nb_min_pieces++; + + xbt_assert(nb_min_pieces != 0 || not isInterestedByFree(remote_peer)); + // get a random rarest piece + int random_rarest_index = RngStream_RandInt(stream, 0, nb_min_pieces - 1); + for (unsigned int i = 0; i < FILE_PIECES; i++) + if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) { + if (random_rarest_index == current_index) { + piece = i; + break; + } + current_index++; + } + + xbt_assert(piece != -1 || not isInterestedByFree(remote_peer)); + return piece; + } +} + +void Peer::updateChokedPeers() +{ + if (nbInterestedPeers() == 0) + return; + XBT_DEBUG("(%d) update_choked peers %zu active peers", id, active_peers.size()); + // update the current round + round_ = (round_ + 1) % 3; + Connection* chosen_peer = nullptr; + // select first active peer and remove it from the set + Connection* choked_peer = *(active_peers.begin()); + active_peers.erase(choked_peer); + + /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/ + if (hasFinished()) { + Connection* remote_peer; + double unchoke_time = simgrid::s4u::Engine::getClock() + 1; + for (auto kv : connected_peers) { + remote_peer = kv.second; + if (remote_peer->last_unchoke < unchoke_time && remote_peer->interested && remote_peer->choked_upload) { + unchoke_time = remote_peer->last_unchoke; + chosen_peer = remote_peer; + } + } + } else { + // Random optimistic unchoking + if (round_ == 0) { + int j = 0; + do { + // We choose a random peer to unchoke. + std::unordered_map::iterator chosen_peer_it = connected_peers.begin(); + std::advance(chosen_peer_it, RngStream_RandInt(stream, 0, connected_peers.size() - 1)); + chosen_peer = chosen_peer_it->second; + if (chosen_peer == nullptr) + THROWF(unknown_error, 0, "A peer should have be selected at this point"); + else if (not chosen_peer->interested || not chosen_peer->choked_upload) + chosen_peer = nullptr; + else + XBT_DEBUG("Nothing to do, keep going"); + j++; + } while (chosen_peer == nullptr && j < MAXIMUM_PEERS); + } else { + // Use the "fastest download" policy. + double fastest_speed = 0.0; + for (auto kv : connected_peers) { + Connection* remote_peer = kv.second; + if (remote_peer->peer_speed > fastest_speed && remote_peer->choked_upload && remote_peer->interested) { + chosen_peer = remote_peer; + fastest_speed = remote_peer->peer_speed; + } + } + } + } + + if (chosen_peer != nullptr) + XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", id, chosen_peer->id, + chosen_peer->interested, chosen_peer->choked_upload); + + if (choked_peer != chosen_peer) { + if (choked_peer != nullptr) { + xbt_assert((!choked_peer->choked_upload), "Tries to choked a choked peer"); + choked_peer->choked_upload = true; + updateActivePeersSet(choked_peer); + XBT_DEBUG("(%d) Sending a CHOKE to %d", id, choked_peer->id); + sendChoked(choked_peer->mailbox_); + } + if (chosen_peer != nullptr) { + xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer"); + chosen_peer->choked_upload = false; + active_peers.insert(chosen_peer); + chosen_peer->last_unchoke = simgrid::s4u::Engine::getClock(); + XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id); + updateActivePeersSet(chosen_peer); + sendUnchoked(chosen_peer->mailbox_); + } + } +} + +/** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.*/ +void Peer::updateInterestedAfterReceive() +{ + for (auto kv : connected_peers) { + Connection* remote_peer = kv.second; + if (remote_peer->am_interested) { + bool interested = false; + // Check if the peer still has a piece we want. + for (unsigned int i = 0; i < FILE_PIECES; i++) + if (hasNotPiece(i) && remote_peer->hasPiece(i)) { + interested = true; + break; + } + + if (not interested) { // no more piece to download from connection + remote_peer->am_interested = false; + sendNotInterested(remote_peer->mailbox_); + } + } + } +} + +void Peer::updateBitfieldBlocks(int piece, int block_index, int block_length) +{ + xbt_assert((piece >= 0 && static_cast(piece) <= FILE_PIECES), "Wrong piece."); + xbt_assert((block_index >= 0 && static_cast(block_index) <= PIECES_BLOCKS), "Wrong block : %d.", + block_index); + for (int i = block_index; i < (block_index + block_length); i++) + bitfield_blocks |= (1ULL << static_cast(piece * PIECES_BLOCKS + i)); +} + +bool Peer::hasCompletedPiece(unsigned int piece) +{ + for (unsigned int i = 0; i < PIECES_BLOCKS; i++) + if (!(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i))) + return false; + return true; +} + +int Peer::getFirstMissingBlockFrom(int piece) +{ + for (unsigned int i = 0; i < PIECES_BLOCKS; i++) + if (!(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i))) + return i; + return -1; +} + +bool Peer::isInterestedByFree(Connection* remote_peer) +{ + for (unsigned int i = 0; i < FILE_PIECES; i++) + if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) + return true; + return false; +} + +/** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */ +int Peer::partiallyDownloadedPiece(Connection* remote_peer) +{ + for (unsigned int i = 0; i < FILE_PIECES; i++) + if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i) && getFirstMissingBlockFrom(i) > 0) + return i; + return -1; +} diff --git a/examples/s4u/app-bittorrent/s4u_peer.hpp b/examples/s4u/app-bittorrent/s4u_peer.hpp new file mode 100644 index 0000000000..97630d9d86 --- /dev/null +++ b/examples/s4u/app-bittorrent/s4u_peer.hpp @@ -0,0 +1,95 @@ +/* Copyright (c) 2012-2017. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef BITTORRENT_PEER_HPP +#define BITTORRENT_PEER_HPP +#include "s4u_bittorrent.hpp" +#include +#include + +class Connection { +public: + int id; // Peer id + simgrid::s4u::MailboxPtr mailbox_; + unsigned int bitfield = 0U; // Fields + // int messages_count; + double peer_speed = 0; + double last_unchoke = 0; + int current_piece = -1; + bool am_interested = false; // Indicates if we are interested in something the peer has + bool interested = false; // Indicates if the peer is interested in one of our pieces + bool choked_upload = true; // Indicates if the peer is choked for the current peer + bool choked_download = true; // Indicates if the peer has choked the current peer + + Connection(int id) : id(id), mailbox_(simgrid::s4u::Mailbox::byName(std::to_string(id))){}; + ~Connection() = default; + void addSpeedValue(double speed) { peer_speed = peer_speed * 0.6 + speed * 0.4; } + bool hasPiece(unsigned int piece) { return bitfield & 1U << piece; } +}; + +class Peer { + int id; + double deadline; + RngStream stream; + simgrid::s4u::MailboxPtr mailbox_; + std::set active_peers; // active peers list + + unsigned int bitfield_ = 0; // list of pieces the peer has. + unsigned long long bitfield_blocks = 0; // list of blocks the peer has. + short* pieces_count = nullptr; // number of peers that have each piece. + unsigned int current_pieces = 0; // current pieces the peer is downloading + double begin_receive_time = 0; // time when the receiving communication has begun, useful for calculating host speed. + int round_ = 0; // current round for the chocking algorithm. + + std::unordered_map connected_peers; + simgrid::s4u::CommPtr comm_received = nullptr; // current comm + Message* message = nullptr; // current message being received +public: + explicit Peer(std::vector args); + ~Peer(); + void operator()(); + + std::string getStatus(); + bool hasFinished(); + int nbInterestedPeers(); + bool isInterestedBy(Connection* remote_peer); + bool isInterestedByFree(Connection* remote_peer); + void updateActivePeersSet(Connection* remote_peer); + void updateInterestedAfterReceive(); + void updateChokedPeers(); + + bool hasNotPiece(unsigned int piece) { return !(bitfield_ & 1U << piece); } + bool hasCompletedPiece(unsigned int piece); + unsigned int countPieces(unsigned int bitfield); + /** Check that a piece is not currently being download by the peer. */ + bool isNotDownloadingPiece(unsigned int piece) { return !(current_pieces & 1U << piece); } + int partiallyDownloadedPiece(Connection* remote_peer); + void updatePiecesCountFromBitfield(unsigned int bitfield); + void removeCurrentPiece(Connection* remote_peer, unsigned int current_piece); + void updateBitfieldBlocks(int piece, int block_index, int block_length); + int getFirstMissingBlockFrom(int piece); + int selectPieceToDownload(Connection* remote_peer); + void requestNewPieceTo(Connection* remote_peer); + + bool getPeersFromTracker(); + void sendHandshake(simgrid::s4u::MailboxPtr mailbox); + void sendBitfield(simgrid::s4u::MailboxPtr mailbox); + void sendPiece(simgrid::s4u::MailboxPtr mailbox, unsigned int piece, int block_index, int block_length); + void sendInterested(simgrid::s4u::MailboxPtr mailbox); + void sendChoked(simgrid::s4u::MailboxPtr mailbox); + void sendUnchoked(simgrid::s4u::MailboxPtr mailbox); + void sendNotInterested(simgrid::s4u::MailboxPtr mailbox); + void sendHandshakeToAllPeers(); + void sendRequest(simgrid::s4u::MailboxPtr mailbox, unsigned int piece, int block_index, int block_length); + void sendHaveToAllPeers(unsigned int piece); + void sendRequestTo(Connection* remote_peer, unsigned int piece); + + void handleMessage(); + void leech(); + void seed(); +}; + +#endif /* BITTORRENT_PEER_HPP */ diff --git a/examples/s4u/app-bittorrent/s4u_tracker.cpp b/examples/s4u/app-bittorrent/s4u_tracker.cpp new file mode 100644 index 0000000000..5a30b191c1 --- /dev/null +++ b/examples/s4u/app-bittorrent/s4u_tracker.cpp @@ -0,0 +1,71 @@ +/* Copyright (c) 2012-2017. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "s4u_tracker.hpp" +#include + +XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_bt_tracker, "Messages specific for the tracker"); + +Tracker::Tracker(std::vector args) +{ + // Checking arguments + xbt_assert(args.size() == 2, "Wrong number of arguments for the tracker."); + // Retrieving end time + try { + deadline = std::stod(args[1]); + } catch (std::invalid_argument& ia) { + throw std::invalid_argument(std::string("Invalid deadline:") + args[1].c_str()); + } + xbt_assert(deadline > 0, "Wrong deadline supplied"); + + stream = simgrid::s4u::this_actor::getHost()->extension()->getStream(); + + mailbox = simgrid::s4u::Mailbox::byName(TRACKER_MAILBOX); + + XBT_INFO("Tracker launched."); +} + +void Tracker::operator()() +{ + simgrid::s4u::CommPtr comm = nullptr; + while (simgrid::s4u::Engine::getClock() < deadline) { + void* received; + if (comm == nullptr) + comm = mailbox->get_async(&received); + if (comm->test()) { + // Retrieve the data sent by the peer. + TrackerQuery* tq = static_cast(received); + + // Add the peer to our peer list, if not already known. + if (known_peers.find(tq->getPeerId()) == known_peers.end()) { + known_peers.insert(tq->getPeerId()); + } + + // Sending back peers to the requesting peer + TrackerAnswer* ta = new TrackerAnswer(TRACKER_QUERY_INTERVAL); + std::set::iterator next_peer; + int nb_known_peers = known_peers.size(); + int max_tries = MIN(MAXIMUM_PEERS, nb_known_peers); + int tried = 0; + while (tried < max_tries) { + do { + next_peer = known_peers.begin(); + std::advance(next_peer, RngStream_RandInt(stream, 0, nb_known_peers - 1)); + } while (ta->getPeers()->find(*next_peer) != ta->getPeers()->end()); + ta->addPeer(*next_peer); + tried++; + } + tq->getReturnMailbox()->put_init(ta, TRACKER_COMM_SIZE)->detach(); + + delete tq; + comm = nullptr; + } else { + simgrid::s4u::this_actor::sleep_for(1); + } + } + // TODO See if some cleanup is needed + XBT_INFO("Tracker is leaving"); +} diff --git a/examples/s4u/app-bittorrent/s4u_tracker.hpp b/examples/s4u/app-bittorrent/s4u_tracker.hpp new file mode 100644 index 0000000000..5ed077edc7 --- /dev/null +++ b/examples/s4u/app-bittorrent/s4u_tracker.hpp @@ -0,0 +1,48 @@ +/* Copyright (c) 2012-2014, 2017. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef BITTORRENT_TRACKER_HPP_ +#define BITTORRENT_TRACKER_HPP_ + +#include "s4u_bittorrent.hpp" +#include + +class TrackerQuery { + int peer_id; // peer id + simgrid::s4u::MailboxPtr return_mailbox; + int uploaded; // how much the peer has already uploaded + int downloaded; // how much the peer has downloaded + int left; // how much the peer has left +public: + explicit TrackerQuery(int peer_id, simgrid::s4u::MailboxPtr return_mailbox, int uploaded, int downloaded, int left) + : peer_id(peer_id), return_mailbox(return_mailbox), uploaded(uploaded), downloaded(downloaded), left(left){}; + ~TrackerQuery() = default; + int getPeerId() { return peer_id; } + simgrid::s4u::MailboxPtr getReturnMailbox() { return return_mailbox; } +}; + +class TrackerAnswer { + int interval; // how often the peer should contact the tracker (unused for now) + std::set* peers; // the peer list the peer has asked for. +public: + explicit TrackerAnswer(int interval) : interval(interval) { peers = new std::set; } + ~TrackerAnswer() { delete peers; }; + void addPeer(int peer) { peers->insert(peer); } + std::set* getPeers() { return peers; } +}; + +class Tracker { + double deadline; + RngStream stream; + simgrid::s4u::MailboxPtr mailbox; + std::set known_peers; + +public: + explicit Tracker(std::vector args); + void operator()(); +}; + +#endif /* BITTORRENT_TRACKER_HPP */ -- 2.20.1