Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
rewrite bittorrent example in s4u
authorFrederic Suter <frederic.suter@cc.in2p3.fr>
Sun, 6 Aug 2017 13:57:31 +0000 (15:57 +0200)
committerFrederic Suter <frederic.suter@cc.in2p3.fr>
Sun, 6 Aug 2017 14:05:59 +0000 (16:05 +0200)
12 files changed:
examples/msg/app-bittorrent/bittorrent.h
examples/msg/app-bittorrent/peer.c
examples/msg/app-bittorrent/tracker.c
examples/s4u/CMakeLists.txt
examples/s4u/app-bittorrent/s4u_app-bittorrent.tesh [new file with mode: 0644]
examples/s4u/app-bittorrent/s4u_app-bittorrent_d.xml [new file with mode: 0644]
examples/s4u/app-bittorrent/s4u_bittorrent.cpp [new file with mode: 0644]
examples/s4u/app-bittorrent/s4u_bittorrent.hpp [new file with mode: 0644]
examples/s4u/app-bittorrent/s4u_peer.cpp [new file with mode: 0644]
examples/s4u/app-bittorrent/s4u_peer.hpp [new file with mode: 0644]
examples/s4u/app-bittorrent/s4u_tracker.cpp [new file with mode: 0644]
examples/s4u/app-bittorrent/s4u_tracker.hpp [new file with mode: 0644]

index b45d688..62d4172 100644 (file)
@@ -10,7 +10,7 @@
 #define MAILBOX_SIZE 40
 #define TRACKER_MAILBOX "tracker_mailbox"
 /** Max number of pairs sent by the tracker to clients */
-#define MAXIMUM_PAIRS 50
+#define MAXIMUM_PEERS 50
 /** Interval of time where the peer should send a request to the tracker */
 #define TRACKER_QUERY_INTERVAL 1000
 /** Communication size for a task to the tracker */
index 7c248b3..23b78ef 100644 (file)
@@ -624,7 +624,7 @@ void update_choked_peers(peer_t peer)
         else
           XBT_DEBUG("Nothing to do, keep going");
         j++;
-      } while (peer_choosed == NULL && j < MAXIMUM_PAIRS);
+      } while (peer_choosed == NULL && j < MAXIMUM_PEERS);
     } else {
       //Use the "fastest download" policy.
       connection_t connection;
index bdcea7f..255ed26 100644 (file)
@@ -49,7 +49,7 @@ int tracker(int argc, char *argv[])
         //Sending peers to the peer
         int next_peer;
         int peers_length = xbt_dynar_length(peers_list);
-        for (int i = 0; i < MAXIMUM_PAIRS && i < peers_length; i++) {
+        for (int i = 0; i < MAXIMUM_PEERS && i < peers_length; i++) {
           do {
             next_peer = xbt_dynar_get_as(peers_list, RngStream_RandInt(stream, 0, peers_length - 1), int);
           } while (is_in_list(data->peers, next_peer));
index 72a43da..6064c0c 100644 (file)
@@ -17,12 +17,23 @@ foreach (file s4u_dht-chord node)
 endforeach()
 set(examples_src  ${examples_src}  ${CMAKE_CURRENT_SOURCE_DIR}/dht-chord/s4u_dht-chord.hpp)
 
+add_executable       (s4u_bittorrent app-bittorrent/s4u_bittorrent.cpp app-bittorrent/s4u_peer.cpp
+                      app-bittorrent/s4u_tracker.cpp)
+target_link_libraries(s4u_bittorrent simgrid)
+set_target_properties(s4u_bittorrent PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/app-bittorrent)
+foreach (file s4u_bittorrent s4u_peer s4u_tracker)
+  set(examples_src  ${examples_src}  ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/${file}.cpp
+                                     ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/${file}.hpp)
+endforeach()
+
 set(examples_src  ${examples_src}                                                                          PARENT_SCOPE)
-set(tesh_files    ${tesh_files}   ${CMAKE_CURRENT_SOURCE_DIR}/dht-chord/s4u_dht-chord.tesh                 PARENT_SCOPE)
+set(tesh_files    ${tesh_files}   ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/s4u_app-bittorrent.tesh
+                                  ${CMAKE_CURRENT_SOURCE_DIR}/dht-chord/s4u_dht-chord.tesh                 PARENT_SCOPE)
 set(xml_files     ${xml_files}    ${CMAKE_CURRENT_SOURCE_DIR}/actions-comm/s4u_actions-comm_split_d.xml
                                   ${CMAKE_CURRENT_SOURCE_DIR}/actions-comm/s4u_actions-comm_d.xml
                                   ${CMAKE_CURRENT_SOURCE_DIR}/actions-storage/s4u_actions-storage_d.xml
                                   ${CMAKE_CURRENT_SOURCE_DIR}/actor-create/s4u_actor-create_d.xml
+                                  ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/s4u_app-bittorrent_d.xml
                                   ${CMAKE_CURRENT_SOURCE_DIR}/app-masterworker/s4u_app-masterworker_d.xml
                                   ${CMAKE_CURRENT_SOURCE_DIR}/dht-chord/s4u_dht-chord_d.xml                PARENT_SCOPE)
 set(txt_files     ${txt_files}    ${CMAKE_CURRENT_SOURCE_DIR}/actions-comm/s4u_actions-comm_split_p0.txt
@@ -32,6 +43,6 @@ set(txt_files     ${txt_files}    ${CMAKE_CURRENT_SOURCE_DIR}/actions-comm/s4u_a
                                   ${CMAKE_CURRENT_SOURCE_DIR}/README.doc                                   PARENT_SCOPE)
 
 foreach(example actions-comm actions-storage actor-create actor-daemon actor-kill actor-migration actor-suspend 
-                 app-masterworker app-pingpong app-token-ring dht-chord plugin-hostload io mutex )
+                app-bittorrent app-masterworker app-pingpong app-token-ring dht-chord plugin-hostload io mutex )
   ADD_TESH_FACTORIES(s4u-${example} "thread;ucontext;raw;boost" --setenv bindir=${CMAKE_CURRENT_BINARY_DIR}/${example} --setenv srcdir=${CMAKE_HOME_DIRECTORY}/examples/platforms --cd ${CMAKE_HOME_DIRECTORY}/examples/s4u/${example} s4u_${example}.tesh)
 endforeach()
diff --git a/examples/s4u/app-bittorrent/s4u_app-bittorrent.tesh b/examples/s4u/app-bittorrent/s4u_app-bittorrent.tesh
new file mode 100644 (file)
index 0000000..d309e01
--- /dev/null
@@ -0,0 +1,23 @@
+#! ./tesh
+
+p Testing the Bittorrent implementation with MSG
+
+! timeout 10
+! output sort 19
+$ $SG_TEST_EXENV ${bindir:=.}/s4u_bittorrent ${srcdir:=.}/cluster.xml ${srcdir:=.}/../s4u/app-bittorrent/s4u_app-bittorrent_d.xml "--log=root.fmt:[%12.6r]%e(%i:%P@%h)%e%m%n"
+> [    0.000000] (1:tracker@node-0.acme.org) Tracker launched.
+> [    0.000000] (2:peer@node-1.acme.org) Hi, I'm joining the network with id 2
+> [    0.000000] (3:peer@node-2.acme.org) Hi, I'm joining the network with id 3
+> [    0.000000] (4:peer@node-3.acme.org) Hi, I'm joining the network with id 4
+> [    0.000000] (5:peer@node-4.acme.org) Hi, I'm joining the network with id 5
+> [    0.000000] (6:peer@node-5.acme.org) Hi, I'm joining the network with id 6
+> [    0.000000] (7:peer@node-6.acme.org) Hi, I'm joining the network with id 7
+> [    0.000000] (8:peer@node-7.acme.org) Hi, I'm joining the network with id 8
+> [ 3000.000000] (1:tracker@node-0.acme.org) Tracker is leaving
+> [ 5000.007806] (2:peer@node-1.acme.org) Here is my current status: 1111111111
+> [ 5000.007806] (3:peer@node-2.acme.org) Here is my current status: 1111111111
+> [ 5000.007806] (4:peer@node-3.acme.org) Here is my current status: 1111111111
+> [ 5000.007806] (5:peer@node-4.acme.org) Here is my current status: 1111111111
+> [ 5000.007806] (6:peer@node-5.acme.org) Here is my current status: 1111111111
+> [ 5000.007806] (7:peer@node-6.acme.org) Here is my current status: 1111111111
+> [ 5000.007806] (8:peer@node-7.acme.org) Here is my current status: 1111111111
diff --git a/examples/s4u/app-bittorrent/s4u_app-bittorrent_d.xml b/examples/s4u/app-bittorrent/s4u_app-bittorrent_d.xml
new file mode 100644 (file)
index 0000000..5460ab1
--- /dev/null
@@ -0,0 +1,39 @@
+<?xml version='1.0'?>
+<!DOCTYPE platform SYSTEM "http://simgrid.gforge.inria.fr/simgrid/simgrid.dtd">
+<platform version="4.1">
+
+  <actor host="node-0.acme.org" function="tracker">
+    <argument value="3000" />
+  </actor>
+
+  <actor host="node-1.acme.org" function="peer">
+    <argument value="00000002"/>    <!-- my id -->
+    <argument value="5000" />    <!-- end time --> 
+    <argument value="1" />       <!-- indicates if the peer is a seed at the beginning of the simulation --> 
+  </actor>
+  <actor host="node-2.acme.org" function="peer">
+    <argument value="00000003"/>    <!-- my id -->
+    <argument value="5000" />    <!-- end time --> 
+  </actor>
+  <actor host="node-3.acme.org" function="peer">
+    <argument value="00000004"/>    <!-- my id -->
+    <argument value="5000" />    <!-- end time --> 
+  </actor>
+  <actor host="node-4.acme.org" function="peer">
+    <argument value="00000005"/>    <!-- my id -->
+    <argument value="5000" />    <!-- end time --> 
+    <argument value="1" />       <!-- indicates if the peer is a seed at the beginning of the simulation --> 
+  </actor>
+  <actor host="node-5.acme.org" function="peer">
+    <argument value="00000006"/>    <!-- my id -->
+    <argument value="5000" />    <!-- end time --> 
+  </actor>
+  <actor host="node-6.acme.org" function="peer">
+    <argument value="00000007"/>    <!-- my id -->
+    <argument value="5000" />    <!-- end time --> 
+  </actor>
+  <actor host="node-7.acme.org" function="peer">
+    <argument value="00000008"/>    <!-- my id -->
+    <argument value="5000" />    <!-- end time --> 
+  </actor>
+</platform>
diff --git a/examples/s4u/app-bittorrent/s4u_bittorrent.cpp b/examples/s4u/app-bittorrent/s4u_bittorrent.cpp
new file mode 100644 (file)
index 0000000..cbc093b
--- /dev/null
@@ -0,0 +1,37 @@
+/* Copyright (c) 2012-2017. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "s4u_bittorrent.hpp"
+#include "s4u_peer.hpp"
+#include "s4u_tracker.hpp"
+
+simgrid::xbt::Extension<simgrid::s4u::Host, HostBittorrent> HostBittorrent::EXTENSION_ID;
+
+int main(int argc, char* argv[])
+{
+  simgrid::s4u::Engine* e = new simgrid::s4u::Engine(&argc, argv);
+
+  /* Check the arguments */
+  xbt_assert(argc > 2, "Usage: %s platform_file deployment_file", argv[0]);
+
+  e->loadPlatform(argv[1]);
+
+  HostBittorrent::EXTENSION_ID = simgrid::s4u::Host::extension_create<HostBittorrent>();
+
+  std::vector<simgrid::s4u::Host*> list;
+  simgrid::s4u::Engine::getInstance()->getHostList(&list);
+  for (auto host : list)
+    host->extension_set(new HostBittorrent(host));
+
+  e->registerFunction<Tracker>("tracker");
+  e->registerFunction<Peer>("peer");
+  e->loadDeployment(argv[2]);
+
+  e->run();
+
+  delete e;
+  return 0;
+}
diff --git a/examples/s4u/app-bittorrent/s4u_bittorrent.hpp b/examples/s4u/app-bittorrent/s4u_bittorrent.hpp
new file mode 100644 (file)
index 0000000..4b02ab2
--- /dev/null
@@ -0,0 +1,104 @@
+/* Copyright (c) 2012-2014, 2016-2017. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef BITTORRENT_BITTORRENT_HPP_
+#define BITTORRENT_BITTORRENT_HPP_
+
+#include <simgrid/s4u.hpp>
+#include <xbt/RngStream.h>
+
+#define MAILBOX_SIZE 40
+#define TRACKER_MAILBOX "tracker_mailbox"
+/** Max number of peers sent by the tracker to clients */
+#define MAXIMUM_PEERS 50
+/** Interval of time where the peer should send a request to the tracker */
+#define TRACKER_QUERY_INTERVAL 1000
+/** Communication size for a task to the tracker */
+#define TRACKER_COMM_SIZE 0.01
+#define GET_PEERS_TIMEOUT 10000
+#define TIMEOUT_MESSAGE 10
+#define TRACKER_RECEIVE_TIMEOUT 10
+/** Number of peers that can be unchocked at a given time */
+#define MAX_UNCHOKED_PEERS 4
+/** Interval between each update of the choked peers */
+#define UPDATE_CHOKED_INTERVAL 30
+/** Number of pieces the peer asks for simultaneously */
+#define MAX_PIECES 1
+
+/** Message sizes
+ * Sizes based on report by A. Legout et al, Understanding BitTorrent: An Experimental Perspective
+ * http://hal.inria.fr/inria-00000156/en
+ */
+#define MESSAGE_HANDSHAKE_SIZE 68
+#define MESSAGE_CHOKE_SIZE 5
+#define MESSAGE_UNCHOKE_SIZE 5
+#define MESSAGE_INTERESTED_SIZE 5
+#define MESSAGE_NOTINTERESTED_SIZE 5
+#define MESSAGE_HAVE_SIZE 9
+#define MESSAGE_BITFIELD_SIZE 5
+#define MESSAGE_REQUEST_SIZE 17
+#define MESSAGE_PIECE_SIZE 13
+#define MESSAGE_CANCEL_SIZE 17
+
+/** Types of messages exchanged between two peers. */
+typedef enum {
+  MESSAGE_HANDSHAKE,
+  MESSAGE_CHOKE,
+  MESSAGE_UNCHOKE,
+  MESSAGE_INTERESTED,
+  MESSAGE_NOTINTERESTED,
+  MESSAGE_HAVE,
+  MESSAGE_BITFIELD,
+  MESSAGE_REQUEST,
+  MESSAGE_PIECE,
+  MESSAGE_CANCEL
+} e_message_type;
+
+class Message {
+public:
+  e_message_type type;
+  int peer_id;
+  simgrid::s4u::MailboxPtr return_mailbox;
+  unsigned int bitfield = 0U;
+  int piece             = 0;
+  int block_index       = 0;
+  int block_length      = 0;
+  Message(e_message_type type, int peer_id, simgrid::s4u::MailboxPtr return_mailbox)
+      : type(type), peer_id(peer_id), return_mailbox(return_mailbox){};
+  Message(e_message_type type, int peer_id, unsigned int bitfield, simgrid::s4u::MailboxPtr return_mailbox)
+      : type(type), peer_id(peer_id), return_mailbox(return_mailbox), bitfield(bitfield){};
+  Message(e_message_type type, int peer_id, simgrid::s4u::MailboxPtr return_mailbox, int piece, int block_index,
+          int block_length)
+      : type(type)
+      , peer_id(peer_id)
+      , return_mailbox(return_mailbox)
+      , piece(piece)
+      , block_index(block_index)
+      , block_length(block_length){};
+  Message(e_message_type type, int peer_id, simgrid::s4u::MailboxPtr return_mailbox, int piece)
+      : type(type), peer_id(peer_id), return_mailbox(return_mailbox), piece(piece){};
+  ~Message() = default;
+};
+
+class HostBittorrent {
+  RngStream stream_;
+  simgrid::s4u::Host* host = nullptr;
+
+public:
+  static simgrid::xbt::Extension<simgrid::s4u::Host, HostBittorrent> EXTENSION_ID;
+
+  explicit HostBittorrent(simgrid::s4u::Host* ptr) : host(ptr)
+  {
+    std::string descr = std::string("RngSream<") + host->getCname() + ">";
+    stream_           = RngStream_CreateStream(descr.c_str());
+  }
+
+  ~HostBittorrent() { RngStream_DeleteStream(&stream_); };
+
+  RngStream getStream() { return stream_; };
+};
+
+#endif /* BITTORRENT_BITTORRENT_HPP_ */
diff --git a/examples/s4u/app-bittorrent/s4u_peer.cpp b/examples/s4u/app-bittorrent/s4u_peer.cpp
new file mode 100644 (file)
index 0000000..ba68a23
--- /dev/null
@@ -0,0 +1,707 @@
+/* Copyright (c) 2012-2017. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <climits>
+#include <xbt/ex.hpp>
+
+#include "s4u_peer.hpp"
+#include "s4u_tracker.hpp"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_bt_peer, "Messages specific for the peers");
+
+/*
+ * User parameters for transferred file data. For the test, the default values are :
+ * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
+ */
+#define FILE_PIECES 10UL
+#define PIECES_BLOCKS 5UL
+#define BLOCK_SIZE 16384
+static const unsigned long int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
+
+/** Number of blocks asked by each request */
+#define BLOCKS_REQUESTED 2
+
+#define ENABLE_END_GAME_MODE 1
+#define SLEEP_DURATION 1
+#define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
+
+Peer::Peer(std::vector<std::string> args)
+{
+  // Check arguments
+  xbt_assert(args.size() == 3 || args.size() == 4, "Wrong number of arguments");
+  try {
+    id       = std::stoi(args[1]);
+    mailbox_ = simgrid::s4u::Mailbox::byName(std::to_string(id));
+  } catch (std::invalid_argument& ia) {
+    throw std::invalid_argument(std::string("Invalid ID:") + args[1].c_str());
+  }
+
+  try {
+    deadline = std::stod(args[2]);
+  } catch (std::invalid_argument& ia) {
+    throw std::invalid_argument(std::string("Invalid deadline:") + args[2].c_str());
+  }
+  xbt_assert(deadline > 0, "Wrong deadline supplied");
+
+  stream = simgrid::s4u::this_actor::getHost()->extension<HostBittorrent>()->getStream();
+
+  if (args.size() == 4 && args[3] == "1") {
+    bitfield_       = (1U << FILE_PIECES) - 1U;
+    bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
+  }
+  pieces_count = new short[FILE_PIECES]{0};
+
+  XBT_INFO("Hi, I'm joining the network with id %d", id);
+}
+
+Peer::~Peer()
+{
+  for (auto peer : connected_peers)
+    delete peer.second;
+  delete[] pieces_count;
+}
+
+/** Peer main function */
+void Peer::operator()()
+{
+  // Getting peer data from the tracker.
+  if (getPeersFromTracker()) {
+    XBT_DEBUG("Got %zu peers from the tracker. Current status is: %s", connected_peers.size(), getStatus().c_str());
+    begin_receive_time = simgrid::s4u::Engine::getClock();
+    mailbox_->setReceiver(simgrid::s4u::Actor::self());
+    if (hasFinished()) {
+      sendHandshakeToAllPeers();
+    } else {
+      leech();
+    }
+    seed();
+  } else {
+    XBT_INFO("Couldn't contact the tracker.");
+  }
+
+  XBT_INFO("Here is my current status: %s", getStatus().c_str());
+}
+
+bool Peer::getPeersFromTracker()
+{
+  simgrid::s4u::MailboxPtr tracker_mailbox = simgrid::s4u::Mailbox::byName(TRACKER_MAILBOX);
+  // Build the task to send to the tracker
+  TrackerQuery* peer_request = new TrackerQuery(id, mailbox_, 0, 0, FILE_SIZE);
+  try {
+    XBT_DEBUG("Sending a peer request to the tracker.");
+    tracker_mailbox->put(peer_request, TRACKER_COMM_SIZE, GET_PEERS_TIMEOUT);
+  } catch (xbt_ex& e) {
+    if (e.category == timeout_error) {
+      XBT_DEBUG("Timeout expired when requesting peers to tracker");
+      delete peer_request;
+      return false;
+    }
+  }
+
+  try {
+    TrackerAnswer* answer = static_cast<TrackerAnswer*>(mailbox_->get(GET_PEERS_TIMEOUT));
+    // Add the peers the tracker gave us to our peer list.
+    for (auto peer_id : *answer->getPeers())
+      if (id != peer_id)
+        connected_peers[peer_id] = new Connection(peer_id);
+    delete answer;
+  } catch (xbt_ex& e) {
+    if (e.category == timeout_error) {
+      XBT_DEBUG("Timeout expired when requesting peers to tracker");
+      return false;
+    }
+  }
+  return true;
+}
+
+void Peer::sendHandshakeToAllPeers()
+{
+  for (auto kv : connected_peers) {
+    Connection* remote_peer = kv.second;
+    Message* handshake      = new Message(MESSAGE_HANDSHAKE, id, mailbox_);
+    remote_peer->mailbox_->put_init(handshake, MESSAGE_HANDSHAKE_SIZE)->detach();
+    XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer->id);
+  }
+}
+
+void Peer::sendHandshake(simgrid::s4u::MailboxPtr mailbox)
+{
+  XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox->getName());
+  mailbox->put_init(new Message(MESSAGE_HANDSHAKE, id, mailbox_), MESSAGE_HANDSHAKE_SIZE)->detach();
+}
+
+void Peer::sendBitfield(simgrid::s4u::MailboxPtr mailbox)
+{
+  XBT_DEBUG("Sending a BITFIELD to %s", mailbox->getName());
+  mailbox
+      ->put_init(new Message(MESSAGE_BITFIELD, id, bitfield_, mailbox_),
+                 MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES))
+      ->detach();
+}
+
+void Peer::sendInterested(simgrid::s4u::MailboxPtr mailbox)
+{
+  XBT_DEBUG("Sending INTERESTED to %s", mailbox->getName());
+  mailbox->put_init(new Message(MESSAGE_INTERESTED, id, bitfield_, mailbox_), MESSAGE_INTERESTED_SIZE)->detach();
+}
+
+void Peer::sendNotInterested(simgrid::s4u::MailboxPtr mailbox)
+{
+  XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox->getName());
+  mailbox->put_init(new Message(MESSAGE_NOTINTERESTED, id, bitfield_, mailbox_), MESSAGE_NOTINTERESTED_SIZE)->detach();
+}
+
+void Peer::sendChoked(simgrid::s4u::MailboxPtr mailbox)
+{
+  XBT_DEBUG("Sending CHOKE to %s", mailbox->getName());
+  mailbox->put_init(new Message(MESSAGE_CHOKE, id, mailbox_), MESSAGE_CHOKE_SIZE)->detach();
+}
+
+/** Send a "unchoked" message to a peer */
+void Peer::sendUnchoked(simgrid::s4u::MailboxPtr mailbox)
+{
+  XBT_DEBUG("Sending UNCHOKE to %s", mailbox->getName());
+  mailbox->put_init(new Message(MESSAGE_UNCHOKE, id, mailbox_), MESSAGE_UNCHOKE_SIZE)->detach();
+}
+
+void Peer::sendPiece(simgrid::s4u::MailboxPtr mailbox, unsigned int piece, int block_index, int block_length)
+{
+  xbt_assert(!hasNotPiece(piece), "Tried to send a unavailable piece.");
+  XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->getName());
+  mailbox->put_init(new Message(MESSAGE_PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)->detach();
+}
+
+void Peer::sendRequest(simgrid::s4u::MailboxPtr mailbox, unsigned int piece, int block_index, int block_length)
+{
+  XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", mailbox->getName(), piece, block_index, block_length);
+  mailbox->put_init(new Message(MESSAGE_REQUEST, id, mailbox_, piece, block_index, block_length), MESSAGE_REQUEST_SIZE)
+      ->detach();
+}
+
+void Peer::sendHaveToAllPeers(unsigned int piece)
+{
+  XBT_DEBUG("Sending HAVE message to all my peers");
+  for (auto kv : connected_peers) {
+    Connection* remote_peer = kv.second;
+    remote_peer->mailbox_->put_init(new Message(MESSAGE_HAVE, id, mailbox_, piece), MESSAGE_HAVE_SIZE)->detach();
+  }
+}
+
+void Peer::sendRequestTo(Connection* remote_peer, unsigned int piece)
+{
+  remote_peer->current_piece = piece;
+  xbt_assert(remote_peer->hasPiece(piece));
+  int block_index = getFirstMissingBlockFrom(piece);
+  if (block_index != -1) {
+    int block_length = MIN(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
+    sendRequest(remote_peer->mailbox_, piece, block_index, block_length);
+  }
+}
+
+std::string Peer::getStatus()
+{
+  std::string res = std::string("");
+  for (int i = FILE_PIECES - 1; i >= 0; i--)
+    res = std::string((bitfield_ & (1U << i)) ? "1" : "0") + res;
+  return res;
+}
+
+bool Peer::hasFinished()
+{
+  return bitfield_ == (1U << FILE_PIECES) - 1U;
+}
+
+/** Indicates if the remote peer has a piece not stored by the local peer */
+bool Peer::isInterestedBy(Connection* remote_peer)
+{
+  return remote_peer->bitfield & (bitfield_ ^ ((1 << FILE_PIECES) - 1));
+}
+
+void Peer::updatePiecesCountFromBitfield(unsigned int bitfield)
+{
+  for (unsigned int i = 0; i < FILE_PIECES; i++)
+    if (bitfield & (1U << i))
+      pieces_count[i]++;
+}
+
+unsigned int Peer::countPieces(unsigned int bitfield)
+{
+  unsigned int count = 0U;
+  unsigned int n     = bitfield;
+  while (n) {
+    count += n & 1U;
+    n >>= 1U;
+  }
+  return count;
+}
+
+int Peer::nbInterestedPeers()
+{
+  int nb = 0;
+  for (auto kv : connected_peers)
+    if (kv.second->interested)
+      nb++;
+  return nb;
+}
+
+void Peer::leech()
+{
+  double next_choked_update = simgrid::s4u::Engine::getClock() + UPDATE_CHOKED_INTERVAL;
+  XBT_DEBUG("Start downloading.");
+
+  /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
+  sendHandshakeToAllPeers();
+  XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->getName());
+
+  void* data = nullptr;
+  while (simgrid::s4u::Engine::getClock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
+    if (comm_received == nullptr) {
+      comm_received = mailbox_->get_async(&data);
+    }
+    if (comm_received->test()) {
+      message = static_cast<Message*>(data);
+      handleMessage();
+      delete message;
+      comm_received = nullptr;
+    } else {
+      // We don't execute the choke algorithm if we don't already have a piece
+      if (simgrid::s4u::Engine::getClock() >= next_choked_update && countPieces(bitfield_) > 0) {
+        updateChokedPeers();
+        next_choked_update += UPDATE_CHOKED_INTERVAL;
+      } else {
+        simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
+      }
+    }
+  }
+  if (hasFinished())
+    XBT_DEBUG("%d becomes a seeder", id);
+}
+
+void Peer::seed()
+{
+  double next_choked_update = simgrid::s4u::Engine::getClock() + UPDATE_CHOKED_INTERVAL;
+  XBT_DEBUG("Start seeding.");
+  // start the main seed loop
+  void* data = nullptr;
+  while (simgrid::s4u::Engine::getClock() < deadline) {
+    if (comm_received == nullptr) {
+      comm_received = mailbox_->get_async(&data);
+    }
+    if (comm_received->test()) {
+      message = static_cast<Message*>(data);
+      handleMessage();
+      delete message;
+      comm_received = nullptr;
+    } else {
+      if (simgrid::s4u::Engine::getClock() >= next_choked_update) {
+        updateChokedPeers();
+        // TODO: Change the choked peer algorithm when seeding.
+        next_choked_update += UPDATE_CHOKED_INTERVAL;
+      } else {
+        simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
+      }
+    }
+  }
+}
+
+void Peer::updateActivePeersSet(Connection* remote_peer)
+{
+  if (remote_peer->interested && not remote_peer->choked_upload) {
+    // add in the active peers set
+    active_peers.insert(remote_peer);
+  } else if (active_peers.find(remote_peer) != active_peers.end()) {
+    active_peers.erase(remote_peer);
+  }
+}
+
+void Peer::handleMessage()
+{
+  const char* type_names[10] = {"HANDSHAKE", "CHOKE",    "UNCHOKE", "INTERESTED", "NOTINTERESTED",
+                                "HAVE",      "BITFIELD", "REQUEST", "PIECE",      "CANCEL"};
+
+  XBT_DEBUG("Received a %s message from %s", type_names[message->type], message->return_mailbox->getName());
+
+  auto known_peer         = connected_peers.find(message->peer_id);
+  Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : known_peer->second;
+  switch (message->type) {
+    case MESSAGE_HANDSHAKE:
+      // Check if the peer is in our connection list.
+      if (remote_peer == nullptr) {
+        XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id);
+        connected_peers[message->peer_id] = new Connection(message->peer_id);
+        sendHandshake(message->return_mailbox);
+      }
+      // Send our bitfield to the peer
+      sendBitfield(message->return_mailbox);
+      break;
+    case MESSAGE_BITFIELD:
+      // Update the pieces list
+      updatePiecesCountFromBitfield(message->bitfield);
+      // Store the bitfield
+      remote_peer->bitfield = message->bitfield;
+      xbt_assert(!remote_peer->am_interested, "Should not be interested at first");
+      if (isInterestedBy(remote_peer)) {
+        remote_peer->am_interested = true;
+        sendInterested(message->return_mailbox);
+      }
+      break;
+    case MESSAGE_INTERESTED:
+      xbt_assert((remote_peer != nullptr),
+                 "The impossible did happened: A non-in-our-list peer has sent us a message.");
+      // Update the interested state of the peer.
+      remote_peer->interested = true;
+      updateActivePeersSet(remote_peer);
+      break;
+    case MESSAGE_NOTINTERESTED:
+      xbt_assert((remote_peer != nullptr),
+                 "The impossible did happened: A non-in-our-list peer has sent us a message.");
+      remote_peer->interested = false;
+      updateActivePeersSet(remote_peer);
+      break;
+    case MESSAGE_UNCHOKE:
+      xbt_assert((remote_peer != nullptr),
+                 "The impossible did happened: A non-in-our-list peer has sent us a message.");
+      xbt_assert(remote_peer->choked_download);
+      remote_peer->choked_download = false;
+      // Send requests to the peer, since it has unchoked us
+      // if (remote_peer->am_interested)
+      requestNewPieceTo(remote_peer);
+      break;
+    case MESSAGE_CHOKE:
+      xbt_assert((remote_peer != nullptr),
+                 "The impossible did happened: A non-in-our-list peer has sent us a message.");
+      xbt_assert(not remote_peer->choked_download);
+      remote_peer->choked_download = true;
+      if (remote_peer->current_piece != -1)
+        removeCurrentPiece(remote_peer, remote_peer->current_piece);
+      break;
+    case MESSAGE_HAVE:
+      XBT_DEBUG("\t for piece %d", message->piece);
+      xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
+                 "Wrong HAVE message received");
+      remote_peer->bitfield = remote_peer->bitfield | (1U << static_cast<unsigned int>(message->piece));
+      pieces_count[message->piece]++;
+      // If the piece is in our pieces, we tell the peer that we are interested.
+      if (not remote_peer->am_interested && hasNotPiece(message->piece)) {
+        remote_peer->am_interested = true;
+        sendInterested(message->return_mailbox);
+        if (not remote_peer->choked_download)
+          requestNewPieceTo(remote_peer);
+      }
+      break;
+    case MESSAGE_REQUEST:
+      xbt_assert(remote_peer->interested);
+      xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
+                 "Wrong HAVE message received");
+      if (not remote_peer->choked_upload) {
+        XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
+                  message->block_index + message->block_length);
+        if (not hasNotPiece(message->piece)) {
+          sendPiece(message->return_mailbox, message->piece, message->block_index, message->block_length);
+        }
+      } else {
+        XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
+      }
+      break;
+    case MESSAGE_PIECE:
+      XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
+                message->block_index + message->block_length);
+      xbt_assert(not remote_peer->choked_download);
+      xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE,
+                 "Can't received a piece if I'm not interested wihtout end-game mode!"
+                 "piece (%d) bitfield (%u) remote bitfield (%u)",
+                 message->piece, bitfield_, remote_peer->bitfield);
+      xbt_assert(not remote_peer->choked_download, "Can't received a piece if I'm choked !");
+      xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
+                 "Wrong piece received");
+      // TODO: Execute a computation.
+      if (hasNotPiece(static_cast<unsigned int>(message->piece))) {
+        updateBitfieldBlocks(message->piece, message->block_index, message->block_length);
+        if (hasCompletedPiece(static_cast<unsigned int>(message->piece))) {
+          // Removing the piece from our piece list
+          removeCurrentPiece(remote_peer, message->piece);
+          // Setting the fact that we have the piece
+          bitfield_ = bitfield_ | (1U << static_cast<unsigned int>(message->piece));
+          XBT_DEBUG("My status is now %s", getStatus().c_str());
+          // Sending the information to all the peers we are connected to
+          sendHaveToAllPeers(message->piece);
+          // sending UNINTERESTED to peers that do not have what we want.
+          updateInterestedAfterReceive();
+        } else {                                      // piece not completed
+          sendRequestTo(remote_peer, message->piece); // ask for the next block
+        }
+      } else {
+        XBT_DEBUG("However, we already have it");
+        xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
+        requestNewPieceTo(remote_peer);
+      }
+      break;
+    case MESSAGE_CANCEL:
+      break;
+    default:
+      THROW_IMPOSSIBLE;
+  }
+  // Update the peer speed.
+  if (remote_peer) {
+    remote_peer->addSpeedValue(1.0 / (simgrid::s4u::Engine::getClock() - begin_receive_time));
+  }
+  begin_receive_time = simgrid::s4u::Engine::getClock();
+}
+
+/** Selects the appropriate piece to download and requests it to the remote_peer */
+void Peer::requestNewPieceTo(Connection* remote_peer)
+{
+  int piece = selectPieceToDownload(remote_peer);
+  if (piece != -1) {
+    current_pieces |= (1U << (unsigned int)piece);
+    sendRequestTo(remote_peer, piece);
+  }
+}
+
+void Peer::removeCurrentPiece(Connection* remote_peer, unsigned int current_piece)
+{
+  current_pieces &= ~(1U << current_piece);
+  remote_peer->current_piece = -1;
+}
+
+/** @brief Return the piece to be downloaded
+ * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
+ * If a piece is partially downloaded, this piece will be selected prioritarily
+ * If the peer has strictly less than 4 pieces, he chooses a piece at random.
+ * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
+ * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
+ * @param remote_peer: information about the connection
+ * @return the piece to download if possible. -1 otherwise
+ */
+int Peer::selectPieceToDownload(Connection* remote_peer)
+{
+  int piece = partiallyDownloadedPiece(remote_peer);
+  // strict priority policy
+  if (piece != -1)
+    return piece;
+
+  // end game mode
+  if (countPieces(current_pieces) >= (FILE_PIECES - countPieces(bitfield_)) && isInterestedBy(remote_peer)) {
+#if ENABLE_END_GAME_MODE == 0
+    return -1;
+#endif
+    int nb_interesting_pieces = 0;
+    // compute the number of interesting pieces
+    for (unsigned int i = 0; i < FILE_PIECES; i++)
+      if (hasNotPiece(i) && remote_peer->hasPiece(i))
+        nb_interesting_pieces++;
+
+    xbt_assert(nb_interesting_pieces != 0);
+    // get a random interesting piece
+    int random_piece_index = RngStream_RandInt(stream, 0, nb_interesting_pieces - 1);
+    int current_index      = 0;
+    for (unsigned int i = 0; i < FILE_PIECES; i++) {
+      if (hasNotPiece(i) && remote_peer->hasPiece(i)) {
+        if (random_piece_index == current_index) {
+          piece = i;
+          break;
+        }
+        current_index++;
+      }
+    }
+    xbt_assert(piece != -1);
+    return piece;
+  }
+  // Random first policy
+  if (countPieces(bitfield_) < 4 && isInterestedByFree(remote_peer)) {
+    int nb_interesting_pieces = 0;
+    // compute the number of interesting pieces
+    for (unsigned int i = 0; i < FILE_PIECES; i++)
+      if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
+        nb_interesting_pieces++;
+    xbt_assert(nb_interesting_pieces != 0);
+    // get a random interesting piece
+    int random_piece_index = RngStream_RandInt(stream, 0, nb_interesting_pieces - 1);
+    int current_index      = 0;
+    for (unsigned int i = 0; i < FILE_PIECES; i++) {
+      if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
+        if (random_piece_index == current_index) {
+          piece = i;
+          break;
+        }
+        current_index++;
+      }
+    }
+    xbt_assert(piece != -1);
+    return piece;
+  } else { // Rarest first policy
+    short min         = SHRT_MAX;
+    int nb_min_pieces = 0;
+    int current_index = 0;
+    // compute the smallest number of copies of available pieces
+    for (unsigned int i = 0; i < FILE_PIECES; i++) {
+      if (pieces_count[i] < min)
+        if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
+          min = pieces_count[i];
+    }
+
+    xbt_assert(min != SHRT_MAX || not isInterestedByFree(remote_peer));
+    // compute the number of rarest pieces
+    for (unsigned int i = 0; i < FILE_PIECES; i++)
+      if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
+        nb_min_pieces++;
+
+    xbt_assert(nb_min_pieces != 0 || not isInterestedByFree(remote_peer));
+    // get a random rarest piece
+    int random_rarest_index = RngStream_RandInt(stream, 0, nb_min_pieces - 1);
+    for (unsigned int i = 0; i < FILE_PIECES; i++)
+      if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
+        if (random_rarest_index == current_index) {
+          piece = i;
+          break;
+        }
+        current_index++;
+      }
+
+    xbt_assert(piece != -1 || not isInterestedByFree(remote_peer));
+    return piece;
+  }
+}
+
+void Peer::updateChokedPeers()
+{
+  if (nbInterestedPeers() == 0)
+    return;
+  XBT_DEBUG("(%d) update_choked peers %zu active peers", id, active_peers.size());
+  // update the current round
+  round_                  = (round_ + 1) % 3;
+  Connection* chosen_peer = nullptr;
+  // select first active peer and remove it from the set
+  Connection* choked_peer = *(active_peers.begin());
+  active_peers.erase(choked_peer);
+
+  /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
+  if (hasFinished()) {
+    Connection* remote_peer;
+    double unchoke_time = simgrid::s4u::Engine::getClock() + 1;
+    for (auto kv : connected_peers) {
+      remote_peer = kv.second;
+      if (remote_peer->last_unchoke < unchoke_time && remote_peer->interested && remote_peer->choked_upload) {
+        unchoke_time = remote_peer->last_unchoke;
+        chosen_peer  = remote_peer;
+      }
+    }
+  } else {
+    // Random optimistic unchoking
+    if (round_ == 0) {
+      int j = 0;
+      do {
+        // We choose a random peer to unchoke.
+        std::unordered_map<int, Connection*>::iterator chosen_peer_it = connected_peers.begin();
+        std::advance(chosen_peer_it, RngStream_RandInt(stream, 0, connected_peers.size() - 1));
+        chosen_peer = chosen_peer_it->second;
+        if (chosen_peer == nullptr)
+          THROWF(unknown_error, 0, "A peer should have be selected at this point");
+        else if (not chosen_peer->interested || not chosen_peer->choked_upload)
+          chosen_peer = nullptr;
+        else
+          XBT_DEBUG("Nothing to do, keep going");
+        j++;
+      } while (chosen_peer == nullptr && j < MAXIMUM_PEERS);
+    } else {
+      // Use the "fastest download" policy.
+      double fastest_speed = 0.0;
+      for (auto kv : connected_peers) {
+        Connection* remote_peer = kv.second;
+        if (remote_peer->peer_speed > fastest_speed && remote_peer->choked_upload && remote_peer->interested) {
+          chosen_peer   = remote_peer;
+          fastest_speed = remote_peer->peer_speed;
+        }
+      }
+    }
+  }
+
+  if (chosen_peer != nullptr)
+    XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", id, chosen_peer->id,
+              chosen_peer->interested, chosen_peer->choked_upload);
+
+  if (choked_peer != chosen_peer) {
+    if (choked_peer != nullptr) {
+      xbt_assert((!choked_peer->choked_upload), "Tries to choked a choked peer");
+      choked_peer->choked_upload = true;
+      updateActivePeersSet(choked_peer);
+      XBT_DEBUG("(%d) Sending a CHOKE to %d", id, choked_peer->id);
+      sendChoked(choked_peer->mailbox_);
+    }
+    if (chosen_peer != nullptr) {
+      xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
+      chosen_peer->choked_upload = false;
+      active_peers.insert(chosen_peer);
+      chosen_peer->last_unchoke = simgrid::s4u::Engine::getClock();
+      XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id);
+      updateActivePeersSet(chosen_peer);
+      sendUnchoked(chosen_peer->mailbox_);
+    }
+  }
+}
+
+/** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.*/
+void Peer::updateInterestedAfterReceive()
+{
+  for (auto kv : connected_peers) {
+    Connection* remote_peer = kv.second;
+    if (remote_peer->am_interested) {
+      bool interested = false;
+      // Check if the peer still has a piece we want.
+      for (unsigned int i = 0; i < FILE_PIECES; i++)
+        if (hasNotPiece(i) && remote_peer->hasPiece(i)) {
+          interested = true;
+          break;
+        }
+
+      if (not interested) { // no more piece to download from connection
+        remote_peer->am_interested = false;
+        sendNotInterested(remote_peer->mailbox_);
+      }
+    }
+  }
+}
+
+void Peer::updateBitfieldBlocks(int piece, int block_index, int block_length)
+{
+  xbt_assert((piece >= 0 && static_cast<unsigned int>(piece) <= FILE_PIECES), "Wrong piece.");
+  xbt_assert((block_index >= 0 && static_cast<unsigned int>(block_index) <= PIECES_BLOCKS), "Wrong block : %d.",
+             block_index);
+  for (int i = block_index; i < (block_index + block_length); i++)
+    bitfield_blocks |= (1ULL << static_cast<unsigned int>(piece * PIECES_BLOCKS + i));
+}
+
+bool Peer::hasCompletedPiece(unsigned int piece)
+{
+  for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
+    if (!(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
+      return false;
+  return true;
+}
+
+int Peer::getFirstMissingBlockFrom(int piece)
+{
+  for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
+    if (!(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
+      return i;
+  return -1;
+}
+
+bool Peer::isInterestedByFree(Connection* remote_peer)
+{
+  for (unsigned int i = 0; i < FILE_PIECES; i++)
+    if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
+      return true;
+  return false;
+}
+
+/** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
+int Peer::partiallyDownloadedPiece(Connection* remote_peer)
+{
+  for (unsigned int i = 0; i < FILE_PIECES; i++)
+    if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i) && getFirstMissingBlockFrom(i) > 0)
+      return i;
+  return -1;
+}
diff --git a/examples/s4u/app-bittorrent/s4u_peer.hpp b/examples/s4u/app-bittorrent/s4u_peer.hpp
new file mode 100644 (file)
index 0000000..97630d9
--- /dev/null
@@ -0,0 +1,95 @@
+/* Copyright (c) 2012-2017. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef BITTORRENT_PEER_HPP
+#define BITTORRENT_PEER_HPP
+#include "s4u_bittorrent.hpp"
+#include <set>
+#include <unordered_map>
+
+class Connection {
+public:
+  int id; // Peer id
+  simgrid::s4u::MailboxPtr mailbox_;
+  unsigned int bitfield = 0U; // Fields
+  //  int messages_count;
+  double peer_speed    = 0;
+  double last_unchoke  = 0;
+  int current_piece    = -1;
+  bool am_interested   = false; // Indicates if we are interested in something the peer has
+  bool interested      = false; // Indicates if the peer is interested in one of our pieces
+  bool choked_upload   = true;  // Indicates if the peer is choked for the current peer
+  bool choked_download = true;  // Indicates if the peer has choked the current peer
+
+  Connection(int id) : id(id), mailbox_(simgrid::s4u::Mailbox::byName(std::to_string(id))){};
+  ~Connection() = default;
+  void addSpeedValue(double speed) { peer_speed = peer_speed * 0.6 + speed * 0.4; }
+  bool hasPiece(unsigned int piece) { return bitfield & 1U << piece; }
+};
+
+class Peer {
+  int id;
+  double deadline;
+  RngStream stream;
+  simgrid::s4u::MailboxPtr mailbox_;
+  std::set<Connection*> active_peers; // active peers list
+
+  unsigned int bitfield_             = 0;       // list of pieces the peer has.
+  unsigned long long bitfield_blocks = 0;       // list of blocks the peer has.
+  short* pieces_count                = nullptr; // number of peers that have each piece.
+  unsigned int current_pieces        = 0;       // current pieces the peer is downloading
+  double begin_receive_time = 0; // time when the receiving communication has begun, useful for calculating host speed.
+  int round_                = 0; // current round for the chocking algorithm.
+
+  std::unordered_map<int, Connection*> connected_peers;
+  simgrid::s4u::CommPtr comm_received = nullptr; // current comm
+  Message* message                    = nullptr; // current message being received
+public:
+  explicit Peer(std::vector<std::string> args);
+  ~Peer();
+  void operator()();
+
+  std::string getStatus();
+  bool hasFinished();
+  int nbInterestedPeers();
+  bool isInterestedBy(Connection* remote_peer);
+  bool isInterestedByFree(Connection* remote_peer);
+  void updateActivePeersSet(Connection* remote_peer);
+  void updateInterestedAfterReceive();
+  void updateChokedPeers();
+
+  bool hasNotPiece(unsigned int piece) { return !(bitfield_ & 1U << piece); }
+  bool hasCompletedPiece(unsigned int piece);
+  unsigned int countPieces(unsigned int bitfield);
+  /** Check that a piece is not currently being download by the peer. */
+  bool isNotDownloadingPiece(unsigned int piece) { return !(current_pieces & 1U << piece); }
+  int partiallyDownloadedPiece(Connection* remote_peer);
+  void updatePiecesCountFromBitfield(unsigned int bitfield);
+  void removeCurrentPiece(Connection* remote_peer, unsigned int current_piece);
+  void updateBitfieldBlocks(int piece, int block_index, int block_length);
+  int getFirstMissingBlockFrom(int piece);
+  int selectPieceToDownload(Connection* remote_peer);
+  void requestNewPieceTo(Connection* remote_peer);
+
+  bool getPeersFromTracker();
+  void sendHandshake(simgrid::s4u::MailboxPtr mailbox);
+  void sendBitfield(simgrid::s4u::MailboxPtr mailbox);
+  void sendPiece(simgrid::s4u::MailboxPtr mailbox, unsigned int piece, int block_index, int block_length);
+  void sendInterested(simgrid::s4u::MailboxPtr mailbox);
+  void sendChoked(simgrid::s4u::MailboxPtr mailbox);
+  void sendUnchoked(simgrid::s4u::MailboxPtr mailbox);
+  void sendNotInterested(simgrid::s4u::MailboxPtr mailbox);
+  void sendHandshakeToAllPeers();
+  void sendRequest(simgrid::s4u::MailboxPtr mailbox, unsigned int piece, int block_index, int block_length);
+  void sendHaveToAllPeers(unsigned int piece);
+  void sendRequestTo(Connection* remote_peer, unsigned int piece);
+
+  void handleMessage();
+  void leech();
+  void seed();
+};
+
+#endif /* BITTORRENT_PEER_HPP */
diff --git a/examples/s4u/app-bittorrent/s4u_tracker.cpp b/examples/s4u/app-bittorrent/s4u_tracker.cpp
new file mode 100644 (file)
index 0000000..5a30b19
--- /dev/null
@@ -0,0 +1,71 @@
+/* Copyright (c) 2012-2017. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "s4u_tracker.hpp"
+#include <xbt/RngStream.h>
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_bt_tracker, "Messages specific for the tracker");
+
+Tracker::Tracker(std::vector<std::string> args)
+{
+  // Checking arguments
+  xbt_assert(args.size() == 2, "Wrong number of arguments for the tracker.");
+  // Retrieving end time
+  try {
+    deadline = std::stod(args[1]);
+  } catch (std::invalid_argument& ia) {
+    throw std::invalid_argument(std::string("Invalid deadline:") + args[1].c_str());
+  }
+  xbt_assert(deadline > 0, "Wrong deadline supplied");
+
+  stream = simgrid::s4u::this_actor::getHost()->extension<HostBittorrent>()->getStream();
+
+  mailbox = simgrid::s4u::Mailbox::byName(TRACKER_MAILBOX);
+
+  XBT_INFO("Tracker launched.");
+}
+
+void Tracker::operator()()
+{
+  simgrid::s4u::CommPtr comm = nullptr;
+  while (simgrid::s4u::Engine::getClock() < deadline) {
+    void* received;
+    if (comm == nullptr)
+      comm = mailbox->get_async(&received);
+    if (comm->test()) {
+      // Retrieve the data sent by the peer.
+      TrackerQuery* tq = static_cast<TrackerQuery*>(received);
+
+      // Add the peer to our peer list, if not already known.
+      if (known_peers.find(tq->getPeerId()) == known_peers.end()) {
+        known_peers.insert(tq->getPeerId());
+      }
+
+      // Sending back peers to the requesting peer
+      TrackerAnswer* ta = new TrackerAnswer(TRACKER_QUERY_INTERVAL);
+      std::set<int>::iterator next_peer;
+      int nb_known_peers = known_peers.size();
+      int max_tries      = MIN(MAXIMUM_PEERS, nb_known_peers);
+      int tried          = 0;
+      while (tried < max_tries) {
+        do {
+          next_peer = known_peers.begin();
+          std::advance(next_peer, RngStream_RandInt(stream, 0, nb_known_peers - 1));
+        } while (ta->getPeers()->find(*next_peer) != ta->getPeers()->end());
+        ta->addPeer(*next_peer);
+        tried++;
+      }
+      tq->getReturnMailbox()->put_init(ta, TRACKER_COMM_SIZE)->detach();
+
+      delete tq;
+      comm = nullptr;
+    } else {
+      simgrid::s4u::this_actor::sleep_for(1);
+    }
+  }
+  // TODO See if some cleanup is needed
+  XBT_INFO("Tracker is leaving");
+}
diff --git a/examples/s4u/app-bittorrent/s4u_tracker.hpp b/examples/s4u/app-bittorrent/s4u_tracker.hpp
new file mode 100644 (file)
index 0000000..5ed077e
--- /dev/null
@@ -0,0 +1,48 @@
+/* Copyright (c) 2012-2014, 2017. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef BITTORRENT_TRACKER_HPP_
+#define BITTORRENT_TRACKER_HPP_
+
+#include "s4u_bittorrent.hpp"
+#include <set>
+
+class TrackerQuery {
+  int peer_id; // peer id
+  simgrid::s4u::MailboxPtr return_mailbox;
+  int uploaded;   // how much the peer has already uploaded
+  int downloaded; // how much the peer has downloaded
+  int left;       // how much the peer has left
+public:
+  explicit TrackerQuery(int peer_id, simgrid::s4u::MailboxPtr return_mailbox, int uploaded, int downloaded, int left)
+      : peer_id(peer_id), return_mailbox(return_mailbox), uploaded(uploaded), downloaded(downloaded), left(left){};
+  ~TrackerQuery() = default;
+  int getPeerId() { return peer_id; }
+  simgrid::s4u::MailboxPtr getReturnMailbox() { return return_mailbox; }
+};
+
+class TrackerAnswer {
+  int interval;         // how often the peer should contact the tracker (unused for now)
+  std::set<int>* peers; // the peer list the peer has asked for.
+public:
+  explicit TrackerAnswer(int interval) : interval(interval) { peers = new std::set<int>; }
+  ~TrackerAnswer() { delete peers; };
+  void addPeer(int peer) { peers->insert(peer); }
+  std::set<int>* getPeers() { return peers; }
+};
+
+class Tracker {
+  double deadline;
+  RngStream stream;
+  simgrid::s4u::MailboxPtr mailbox;
+  std::set<int> known_peers;
+
+public:
+  explicit Tracker(std::vector<std::string> args);
+  void operator()();
+};
+
+#endif /* BITTORRENT_TRACKER_HPP */