Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
assume a dumb tracker (and please travis)
[simgrid.git] / examples / s4u / app-bittorrent / s4u_peer.cpp
index ba68a23..79157bc 100644 (file)
@@ -88,7 +88,7 @@ bool Peer::getPeersFromTracker()
 {
   simgrid::s4u::MailboxPtr tracker_mailbox = simgrid::s4u::Mailbox::byName(TRACKER_MAILBOX);
   // Build the task to send to the tracker
-  TrackerQuery* peer_request = new TrackerQuery(id, mailbox_, 0, 0, FILE_SIZE);
+  TrackerQuery* peer_request = new TrackerQuery(id, mailbox_);
   try {
     XBT_DEBUG("Sending a peer request to the tracker.");
     tracker_mailbox->put(peer_request, TRACKER_COMM_SIZE, GET_PEERS_TIMEOUT);
@@ -126,10 +126,11 @@ void Peer::sendHandshakeToAllPeers()
   }
 }
 
-void Peer::sendHandshake(simgrid::s4u::MailboxPtr mailbox)
+void Peer::sendMessage(simgrid::s4u::MailboxPtr mailbox, e_message_type type, uint64_t size)
 {
-  XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox->getName());
-  mailbox->put_init(new Message(MESSAGE_HANDSHAKE, id, mailbox_), MESSAGE_HANDSHAKE_SIZE)->detach();
+  const char* type_names[6] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "CANCEL"};
+  XBT_DEBUG("Sending %s to %s", type_names[type], mailbox->getName());
+  mailbox->put_init(new Message(type, id, bitfield_, mailbox_), size)->detach();
 }
 
 void Peer::sendBitfield(simgrid::s4u::MailboxPtr mailbox)
@@ -141,45 +142,13 @@ void Peer::sendBitfield(simgrid::s4u::MailboxPtr mailbox)
       ->detach();
 }
 
-void Peer::sendInterested(simgrid::s4u::MailboxPtr mailbox)
-{
-  XBT_DEBUG("Sending INTERESTED to %s", mailbox->getName());
-  mailbox->put_init(new Message(MESSAGE_INTERESTED, id, bitfield_, mailbox_), MESSAGE_INTERESTED_SIZE)->detach();
-}
-
-void Peer::sendNotInterested(simgrid::s4u::MailboxPtr mailbox)
-{
-  XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox->getName());
-  mailbox->put_init(new Message(MESSAGE_NOTINTERESTED, id, bitfield_, mailbox_), MESSAGE_NOTINTERESTED_SIZE)->detach();
-}
-
-void Peer::sendChoked(simgrid::s4u::MailboxPtr mailbox)
-{
-  XBT_DEBUG("Sending CHOKE to %s", mailbox->getName());
-  mailbox->put_init(new Message(MESSAGE_CHOKE, id, mailbox_), MESSAGE_CHOKE_SIZE)->detach();
-}
-
-/** Send a "unchoked" message to a peer */
-void Peer::sendUnchoked(simgrid::s4u::MailboxPtr mailbox)
-{
-  XBT_DEBUG("Sending UNCHOKE to %s", mailbox->getName());
-  mailbox->put_init(new Message(MESSAGE_UNCHOKE, id, mailbox_), MESSAGE_UNCHOKE_SIZE)->detach();
-}
-
 void Peer::sendPiece(simgrid::s4u::MailboxPtr mailbox, unsigned int piece, int block_index, int block_length)
 {
-  xbt_assert(!hasNotPiece(piece), "Tried to send a unavailable piece.");
+  xbt_assert(not hasNotPiece(piece), "Tried to send a unavailable piece.");
   XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->getName());
   mailbox->put_init(new Message(MESSAGE_PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)->detach();
 }
 
-void Peer::sendRequest(simgrid::s4u::MailboxPtr mailbox, unsigned int piece, int block_index, int block_length)
-{
-  XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", mailbox->getName(), piece, block_index, block_length);
-  mailbox->put_init(new Message(MESSAGE_REQUEST, id, mailbox_, piece, block_index, block_length), MESSAGE_REQUEST_SIZE)
-      ->detach();
-}
-
 void Peer::sendHaveToAllPeers(unsigned int piece)
 {
   XBT_DEBUG("Sending HAVE message to all my peers");
@@ -196,7 +165,11 @@ void Peer::sendRequestTo(Connection* remote_peer, unsigned int piece)
   int block_index = getFirstMissingBlockFrom(piece);
   if (block_index != -1) {
     int block_length = MIN(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
-    sendRequest(remote_peer->mailbox_, piece, block_index, block_length);
+    XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", remote_peer->mailbox_->getName(), piece, block_index,
+              block_length);
+    remote_peer->mailbox_
+        ->put_init(new Message(MESSAGE_REQUEST, id, mailbox_, piece, block_index, block_length), MESSAGE_REQUEST_SIZE)
+        ->detach();
   }
 }
 
@@ -219,6 +192,14 @@ bool Peer::isInterestedBy(Connection* remote_peer)
   return remote_peer->bitfield & (bitfield_ ^ ((1 << FILE_PIECES) - 1));
 }
 
+bool Peer::isInterestedByFree(Connection* remote_peer)
+{
+  for (unsigned int i = 0; i < FILE_PIECES; i++)
+    if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
+      return true;
+  return false;
+}
+
 void Peer::updatePiecesCountFromBitfield(unsigned int bitfield)
 {
   for (unsigned int i = 0; i < FILE_PIECES; i++)
@@ -308,12 +289,10 @@ void Peer::seed()
 
 void Peer::updateActivePeersSet(Connection* remote_peer)
 {
-  if (remote_peer->interested && not remote_peer->choked_upload) {
-    // add in the active peers set
+  if (remote_peer->interested && not remote_peer->choked_upload)
     active_peers.insert(remote_peer);
-  } else if (active_peers.find(remote_peer) != active_peers.end()) {
+  else
     active_peers.erase(remote_peer);
-  }
 }
 
 void Peer::handleMessage()
@@ -325,13 +304,16 @@ void Peer::handleMessage()
 
   auto known_peer         = connected_peers.find(message->peer_id);
   Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : known_peer->second;
+  xbt_assert(remote_peer != nullptr || message->type == MESSAGE_HANDSHAKE,
+             "The impossible did happened: A not-in-our-list peer sent us a message.");
+
   switch (message->type) {
     case MESSAGE_HANDSHAKE:
       // Check if the peer is in our connection list.
       if (remote_peer == nullptr) {
         XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id);
         connected_peers[message->peer_id] = new Connection(message->peer_id);
-        sendHandshake(message->return_mailbox);
+        sendMessage(message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE);
       }
       // Send our bitfield to the peer
       sendBitfield(message->return_mailbox);
@@ -341,37 +323,29 @@ void Peer::handleMessage()
       updatePiecesCountFromBitfield(message->bitfield);
       // Store the bitfield
       remote_peer->bitfield = message->bitfield;
-      xbt_assert(!remote_peer->am_interested, "Should not be interested at first");
+      xbt_assert(not remote_peer->am_interested, "Should not be interested at first");
       if (isInterestedBy(remote_peer)) {
         remote_peer->am_interested = true;
-        sendInterested(message->return_mailbox);
+        sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
       }
       break;
     case MESSAGE_INTERESTED:
-      xbt_assert((remote_peer != nullptr),
-                 "The impossible did happened: A non-in-our-list peer has sent us a message.");
       // Update the interested state of the peer.
       remote_peer->interested = true;
       updateActivePeersSet(remote_peer);
       break;
     case MESSAGE_NOTINTERESTED:
-      xbt_assert((remote_peer != nullptr),
-                 "The impossible did happened: A non-in-our-list peer has sent us a message.");
       remote_peer->interested = false;
       updateActivePeersSet(remote_peer);
       break;
     case MESSAGE_UNCHOKE:
-      xbt_assert((remote_peer != nullptr),
-                 "The impossible did happened: A non-in-our-list peer has sent us a message.");
       xbt_assert(remote_peer->choked_download);
       remote_peer->choked_download = false;
       // Send requests to the peer, since it has unchoked us
-      // if (remote_peer->am_interested)
-      requestNewPieceTo(remote_peer);
+      if (remote_peer->am_interested)
+        requestNewPieceTo(remote_peer);
       break;
     case MESSAGE_CHOKE:
-      xbt_assert((remote_peer != nullptr),
-                 "The impossible did happened: A non-in-our-list peer has sent us a message.");
       xbt_assert(not remote_peer->choked_download);
       remote_peer->choked_download = true;
       if (remote_peer->current_piece != -1)
@@ -386,7 +360,7 @@ void Peer::handleMessage()
       // If the piece is in our pieces, we tell the peer that we are interested.
       if (not remote_peer->am_interested && hasNotPiece(message->piece)) {
         remote_peer->am_interested = true;
-        sendInterested(message->return_mailbox);
+        sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
         if (not remote_peer->choked_download)
           requestNewPieceTo(remote_peer);
       }
@@ -410,7 +384,7 @@ void Peer::handleMessage()
                 message->block_index + message->block_length);
       xbt_assert(not remote_peer->choked_download);
       xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE,
-                 "Can't received a piece if I'm not interested wihtout end-game mode!"
+                 "Can't received a piece if I'm not interested without end-game mode!"
                  "piece (%d) bitfield (%u) remote bitfield (%u)",
                  message->piece, bitfield_, remote_peer->bitfield);
       xbt_assert(not remote_peer->choked_download, "Can't received a piece if I'm choked !");
@@ -537,9 +511,8 @@ int Peer::selectPieceToDownload(Connection* remote_peer)
     int current_index = 0;
     // compute the smallest number of copies of available pieces
     for (unsigned int i = 0; i < FILE_PIECES; i++) {
-      if (pieces_count[i] < min)
-        if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
-          min = pieces_count[i];
+      if (pieces_count[i] < min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
+        min = pieces_count[i];
     }
 
     xbt_assert(min != SHRT_MAX || not isInterestedByFree(remote_peer));
@@ -624,11 +597,11 @@ void Peer::updateChokedPeers()
 
   if (choked_peer != chosen_peer) {
     if (choked_peer != nullptr) {
-      xbt_assert((!choked_peer->choked_upload), "Tries to choked a choked peer");
+      xbt_assert(not choked_peer->choked_upload, "Tries to choked a choked peer");
       choked_peer->choked_upload = true;
       updateActivePeersSet(choked_peer);
       XBT_DEBUG("(%d) Sending a CHOKE to %d", id, choked_peer->id);
-      sendChoked(choked_peer->mailbox_);
+      sendMessage(choked_peer->mailbox_, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE);
     }
     if (chosen_peer != nullptr) {
       xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
@@ -637,7 +610,7 @@ void Peer::updateChokedPeers()
       chosen_peer->last_unchoke = simgrid::s4u::Engine::getClock();
       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id);
       updateActivePeersSet(chosen_peer);
-      sendUnchoked(chosen_peer->mailbox_);
+      sendMessage(chosen_peer->mailbox_, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE);
     }
   }
 }
@@ -658,7 +631,7 @@ void Peer::updateInterestedAfterReceive()
 
       if (not interested) { // no more piece to download from connection
         remote_peer->am_interested = false;
-        sendNotInterested(remote_peer->mailbox_);
+        sendMessage(remote_peer->mailbox_, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE);
       }
     }
   }
@@ -676,7 +649,7 @@ void Peer::updateBitfieldBlocks(int piece, int block_index, int block_length)
 bool Peer::hasCompletedPiece(unsigned int piece)
 {
   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
-    if (!(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
+    if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
       return false;
   return true;
 }
@@ -684,19 +657,11 @@ bool Peer::hasCompletedPiece(unsigned int piece)
 int Peer::getFirstMissingBlockFrom(int piece)
 {
   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
-    if (!(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
+    if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
       return i;
   return -1;
 }
 
-bool Peer::isInterestedByFree(Connection* remote_peer)
-{
-  for (unsigned int i = 0; i < FILE_PIECES; i++)
-    if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
-      return true;
-  return false;
-}
-
 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
 int Peer::partiallyDownloadedPiece(Connection* remote_peer)
 {