Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
83c22de8636f693faf35cbba141294df1c97a81b
[simgrid.git] / examples / cpp / app-bittorrent / s4u-peer.cpp
1 /* Copyright (c) 2012-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <algorithm>
7 #include <array>
8 #include <climits>
9
10 #include "s4u-peer.hpp"
11 #include "s4u-tracker.hpp"
12
13 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_bt_peer, "Messages specific for the peers");
14 namespace sg4 = simgrid::s4u;
15
16 /*
17  * User parameters for transferred file data. For the test, the default values are :
18  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
19  */
20 constexpr unsigned long FILE_PIECES   = 10UL;
21 constexpr unsigned long PIECES_BLOCKS = 5UL;
22 constexpr int BLOCK_SIZE              = 16384;
23
24 /** Number of blocks asked by each request */
25 constexpr unsigned long BLOCKS_REQUESTED = 2UL;
26
27 constexpr double SLEEP_DURATION     = 1.0;
28 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
29
30 /** Message sizes
31  * Sizes based on report by A. Legout et al, Understanding BitTorrent: An Experimental Perspective
32  * http://hal.inria.fr/inria-00000156/en
33  */
34 constexpr unsigned message_size(MessageType type)
35 {
36   constexpr std::array<unsigned, 10> sizes{{/* HANDSHAKE     */ 68,
37                                             /* CHOKE         */ 5,
38                                             /* UNCHOKE       */ 5,
39                                             /* INTERESTED    */ 5,
40                                             /* NOTINTERESTED */ 5,
41                                             /* HAVE          */ 9,
42                                             /* BITFIELD      */ 5,
43                                             /* REQUEST       */ 17,
44                                             /* PIECE         */ 13,
45                                             /* CANCEL        */ 17}};
46   return sizes[static_cast<int>(type)];
47 }
48
49 constexpr const char* message_name(MessageType type)
50 {
51   constexpr std::array<const char*, 10> names{{"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "HAVE",
52                                                "BITFIELD", "REQUEST", "PIECE", "CANCEL"}};
53   return names[static_cast<int>(type)];
54 }
55
56 Peer::Peer(std::vector<std::string> args)
57 {
58   // Check arguments
59   xbt_assert(args.size() == 3 || args.size() == 4, "Wrong number of arguments");
60   try {
61     id       = std::stoi(args[1]);
62     mailbox_ = sg4::Mailbox::by_name(std::to_string(id));
63   } catch (const std::invalid_argument&) {
64     throw std::invalid_argument("Invalid ID:" + args[1]);
65   }
66   random.set_seed(id);
67
68   try {
69     deadline = std::stod(args[2]);
70   } catch (const std::invalid_argument&) {
71     throw std::invalid_argument("Invalid deadline:" + args[2]);
72   }
73   xbt_assert(deadline > 0, "Wrong deadline supplied");
74
75   if (args.size() == 4 && args[3] == "1") {
76     bitfield_       = (1U << FILE_PIECES) - 1U;
77     bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
78   }
79   pieces_count.resize(FILE_PIECES);
80
81   XBT_INFO("Hi, I'm joining the network with id %d", id);
82 }
83
84 /** Peer main function */
85 void Peer::operator()()
86 {
87   // Getting peer data from the tracker.
88   if (getPeersFromTracker()) {
89     XBT_DEBUG("Got %zu peers from the tracker. Current status is: %s", connected_peers.size(), getStatus().c_str());
90     begin_receive_time = sg4::Engine::get_clock();
91     mailbox_->set_receiver(sg4::Actor::self());
92     if (hasFinished()) {
93       sendHandshakeToAllPeers();
94     } else {
95       leech();
96     }
97     seed();
98   } else {
99     XBT_INFO("Couldn't contact the tracker.");
100   }
101
102   XBT_INFO("Here is my current status: %s", getStatus().c_str());
103 }
104
105 bool Peer::getPeersFromTracker()
106 {
107   sg4::Mailbox* tracker_mailbox = sg4::Mailbox::by_name(TRACKER_MAILBOX);
108   // Build the task to send to the tracker
109   auto* peer_request = new TrackerQuery(id, mailbox_);
110   try {
111     XBT_DEBUG("Sending a peer request to the tracker.");
112     tracker_mailbox->put(peer_request, TRACKER_COMM_SIZE, GET_PEERS_TIMEOUT);
113   } catch (const simgrid::TimeoutException&) {
114     XBT_DEBUG("Timeout expired when requesting peers to tracker");
115     delete peer_request;
116     return false;
117   }
118
119   try {
120     auto answer = mailbox_->get_unique<TrackerAnswer>(GET_PEERS_TIMEOUT);
121     // Add the peers the tracker gave us to our peer list.
122     for (auto const& peer_id : answer->getPeers())
123       if (id != peer_id)
124         connected_peers.try_emplace(peer_id, peer_id);
125   } catch (const simgrid::TimeoutException&) {
126     XBT_DEBUG("Timeout expired when requesting peers to tracker");
127     return false;
128   }
129   return true;
130 }
131
132 void Peer::sendHandshakeToAllPeers()
133 {
134   for (auto const& kv : connected_peers) {
135     const Connection& remote_peer = kv.second;
136     auto* handshake               = new Message(MessageType::HANDSHAKE, id, mailbox_);
137     remote_peer.mailbox_->put_init(handshake, message_size(MessageType::HANDSHAKE))->detach();
138     XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer.id);
139   }
140 }
141
142 void Peer::sendMessage(sg4::Mailbox* mailbox, MessageType type, uint64_t size)
143 {
144   XBT_DEBUG("Sending %s to %s", message_name(type), mailbox->get_cname());
145   mailbox->put_init(new Message(type, id, bitfield_, mailbox_), size)->detach();
146 }
147
148 void Peer::sendBitfield(sg4::Mailbox* mailbox)
149 {
150   XBT_DEBUG("Sending a BITFIELD to %s", mailbox->get_cname());
151   mailbox
152       ->put_init(new Message(MessageType::BITFIELD, id, bitfield_, mailbox_),
153                  message_size(MessageType::BITFIELD) + BITS_TO_BYTES(FILE_PIECES))
154       ->detach();
155 }
156
157 void Peer::sendPiece(sg4::Mailbox* mailbox, unsigned int piece, int block_index, int block_length)
158 {
159   xbt_assert(not hasNotPiece(piece), "Tried to send a unavailable piece.");
160   XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->get_cname());
161   mailbox->put_init(new Message(MessageType::PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)
162       ->detach();
163 }
164
165 void Peer::sendHaveToAllPeers(unsigned int piece)
166 {
167   XBT_DEBUG("Sending HAVE message to all my peers");
168   for (auto const& kv : connected_peers) {
169     const Connection& remote_peer = kv.second;
170     remote_peer.mailbox_->put_init(new Message(MessageType::HAVE, id, mailbox_, piece), message_size(MessageType::HAVE))
171         ->detach();
172   }
173 }
174
175 void Peer::sendRequestTo(Connection* remote_peer, unsigned int piece)
176 {
177   remote_peer->current_piece = piece;
178   xbt_assert(remote_peer->hasPiece(piece));
179   int block_index = getFirstMissingBlockFrom(piece);
180   if (block_index != -1) {
181     int block_length = static_cast<int>(std::min(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index));
182     XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", remote_peer->mailbox_->get_cname(), piece, block_index,
183               block_length);
184     remote_peer->mailbox_
185         ->put_init(new Message(MessageType::REQUEST, id, mailbox_, piece, block_index, block_length),
186                    message_size(MessageType::REQUEST))
187         ->detach();
188   }
189 }
190
191 std::string Peer::getStatus() const
192 {
193   std::string res;
194   for (unsigned i = 0; i < FILE_PIECES; i++)
195     res += (bitfield_ & (1U << i)) ? '1' : '0';
196   return res;
197 }
198
199 bool Peer::hasFinished() const
200 {
201   return bitfield_ == (1U << FILE_PIECES) - 1U;
202 }
203
204 /** Indicates if the remote peer has a piece not stored by the local peer */
205 bool Peer::isInterestedBy(const Connection* remote_peer) const
206 {
207   return remote_peer->bitfield & (bitfield_ ^ ((1 << FILE_PIECES) - 1));
208 }
209
210 bool Peer::isInterestedByFree(const Connection* remote_peer) const
211 {
212   for (unsigned int i = 0; i < FILE_PIECES; i++)
213     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
214       return true;
215   return false;
216 }
217
218 void Peer::updatePiecesCountFromBitfield(unsigned int bitfield)
219 {
220   for (unsigned int i = 0; i < FILE_PIECES; i++)
221     if (bitfield & (1U << i))
222       pieces_count[i]++;
223 }
224
225 unsigned int Peer::countPieces(unsigned int bitfield) const
226 {
227   unsigned int count = 0U;
228   unsigned int n     = bitfield;
229   while (n) {
230     count += n & 1U;
231     n >>= 1U;
232   }
233   return count;
234 }
235
236 int Peer::nbInterestedPeers() const
237 {
238   return static_cast<int>(std::count_if(connected_peers.begin(), connected_peers.end(),
239                                         [](const auto& kv) { return kv.second.interested; }));
240 }
241
242 void Peer::leech()
243 {
244   double next_choked_update = sg4::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
245   XBT_DEBUG("Start downloading.");
246
247   /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
248   sendHandshakeToAllPeers();
249   XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->get_cname());
250
251   while (sg4::Engine::get_clock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
252     if (comm_received == nullptr) {
253       comm_received = mailbox_->get_async<Message>(&message);
254     }
255     if (comm_received->test()) {
256       handleMessage();
257       delete message;
258       comm_received = nullptr;
259     } else {
260       // We don't execute the choke algorithm if we don't already have a piece
261       if (sg4::Engine::get_clock() >= next_choked_update && countPieces(bitfield_) > 0) {
262         updateChokedPeers();
263         next_choked_update += UPDATE_CHOKED_INTERVAL;
264       } else {
265         sg4::this_actor::sleep_for(SLEEP_DURATION);
266       }
267     }
268   }
269   if (hasFinished())
270     XBT_DEBUG("%d becomes a seeder", id);
271 }
272
273 void Peer::seed()
274 {
275   double next_choked_update = sg4::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
276   XBT_DEBUG("Start seeding.");
277   // start the main seed loop
278   while (sg4::Engine::get_clock() < deadline) {
279     if (comm_received == nullptr) {
280       comm_received = mailbox_->get_async<Message>(&message);
281     }
282     if (comm_received->test()) {
283       handleMessage();
284       delete message;
285       comm_received = nullptr;
286     } else {
287       if (sg4::Engine::get_clock() >= next_choked_update) {
288         updateChokedPeers();
289         // TODO: Change the choked peer algorithm when seeding.
290         next_choked_update += UPDATE_CHOKED_INTERVAL;
291       } else {
292         sg4::this_actor::sleep_for(SLEEP_DURATION);
293       }
294     }
295   }
296 }
297
298 void Peer::updateActivePeersSet(Connection* remote_peer)
299 {
300   if (remote_peer->interested && not remote_peer->choked_upload)
301     active_peers.insert(remote_peer);
302   else
303     active_peers.erase(remote_peer);
304 }
305
306 void Peer::handleMessage()
307 {
308   XBT_DEBUG("Received a %s message from %s", message_name(message->type), message->return_mailbox->get_cname());
309
310   auto known_peer         = connected_peers.find(message->peer_id);
311   Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : &known_peer->second;
312   xbt_assert(remote_peer != nullptr || message->type == MessageType::HANDSHAKE,
313              "The impossible did happened: A not-in-our-list peer sent us a message.");
314
315   switch (message->type) {
316     case MessageType::HANDSHAKE:
317       // Check if the peer is in our connection list.
318       if (remote_peer == nullptr) {
319         XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id);
320         connected_peers.try_emplace(message->peer_id, message->peer_id);
321         sendMessage(message->return_mailbox, MessageType::HANDSHAKE, message_size(MessageType::HANDSHAKE));
322       }
323       // Send our bitfield to the peer
324       sendBitfield(message->return_mailbox);
325       break;
326     case MessageType::BITFIELD:
327       // Update the pieces list
328       updatePiecesCountFromBitfield(message->bitfield);
329       // Store the bitfield
330       remote_peer->bitfield = message->bitfield;
331       xbt_assert(not remote_peer->am_interested, "Should not be interested at first");
332       if (isInterestedBy(remote_peer)) {
333         remote_peer->am_interested = true;
334         sendMessage(message->return_mailbox, MessageType::INTERESTED, message_size(MessageType::INTERESTED));
335       }
336       break;
337     case MessageType::INTERESTED:
338       // Update the interested state of the peer.
339       remote_peer->interested = true;
340       updateActivePeersSet(remote_peer);
341       break;
342     case MessageType::NOTINTERESTED:
343       remote_peer->interested = false;
344       updateActivePeersSet(remote_peer);
345       break;
346     case MessageType::UNCHOKE:
347       xbt_assert(remote_peer->choked_download);
348       remote_peer->choked_download = false;
349       // Send requests to the peer, since it has unchoked us
350       if (remote_peer->am_interested)
351         requestNewPieceTo(remote_peer);
352       break;
353     case MessageType::CHOKE:
354       xbt_assert(not remote_peer->choked_download);
355       remote_peer->choked_download = true;
356       if (remote_peer->current_piece != -1)
357         removeCurrentPiece(remote_peer, remote_peer->current_piece);
358       break;
359     case MessageType::HAVE:
360       XBT_DEBUG("\t for piece %d", message->piece);
361       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
362                  "Wrong HAVE message received");
363       remote_peer->bitfield = remote_peer->bitfield | (1U << static_cast<unsigned int>(message->piece));
364       pieces_count[message->piece]++;
365       // If the piece is in our pieces, we tell the peer that we are interested.
366       if (not remote_peer->am_interested && hasNotPiece(message->piece)) {
367         remote_peer->am_interested = true;
368         sendMessage(message->return_mailbox, MessageType::INTERESTED, message_size(MessageType::INTERESTED));
369         if (not remote_peer->choked_download)
370           requestNewPieceTo(remote_peer);
371       }
372       break;
373     case MessageType::REQUEST:
374       xbt_assert(remote_peer->interested);
375       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
376                  "Wrong HAVE message received");
377       if (not remote_peer->choked_upload) {
378         XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
379                   message->block_index + message->block_length);
380         if (not hasNotPiece(message->piece)) {
381           sendPiece(message->return_mailbox, message->piece, message->block_index, message->block_length);
382         }
383       } else {
384         XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
385       }
386       break;
387     case MessageType::PIECE:
388       XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
389                 message->block_index + message->block_length);
390       xbt_assert(not remote_peer->choked_download);
391       xbt_assert(not remote_peer->choked_download, "Can't received a piece if I'm choked !");
392       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
393                  "Wrong piece received");
394       // TODO: Execute a computation.
395       if (hasNotPiece(static_cast<unsigned int>(message->piece))) {
396         updateBitfieldBlocks(message->piece, message->block_index, message->block_length);
397         if (hasCompletedPiece(static_cast<unsigned int>(message->piece))) {
398           // Removing the piece from our piece list
399           removeCurrentPiece(remote_peer, message->piece);
400           // Setting the fact that we have the piece
401           bitfield_ = bitfield_ | (1U << static_cast<unsigned int>(message->piece));
402           XBT_DEBUG("My status is now %s", getStatus().c_str());
403           // Sending the information to all the peers we are connected to
404           sendHaveToAllPeers(message->piece);
405           // sending UNINTERESTED to peers that do not have what we want.
406           updateInterestedAfterReceive();
407         } else {                                      // piece not completed
408           sendRequestTo(remote_peer, message->piece); // ask for the next block
409         }
410       } else {
411         XBT_DEBUG("However, we already have it");
412         requestNewPieceTo(remote_peer);
413       }
414       break;
415     case MessageType::CANCEL:
416       break;
417     default:
418       THROW_IMPOSSIBLE;
419   }
420   // Update the peer speed.
421   if (remote_peer) {
422     remote_peer->addSpeedValue(1.0 / (sg4::Engine::get_clock() - begin_receive_time));
423   }
424   begin_receive_time = sg4::Engine::get_clock();
425 }
426
427 /** Selects the appropriate piece to download and requests it to the remote_peer */
428 void Peer::requestNewPieceTo(Connection* remote_peer)
429 {
430   int piece = selectPieceToDownload(remote_peer);
431   if (piece != -1) {
432     current_pieces |= (1U << (unsigned int)piece);
433     sendRequestTo(remote_peer, piece);
434   }
435 }
436
437 void Peer::removeCurrentPiece(Connection* remote_peer, unsigned int current_piece)
438 {
439   current_pieces &= ~(1U << current_piece);
440   remote_peer->current_piece = -1;
441 }
442
443 /** @brief Return the piece to be downloaded
444  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
445  * If a piece is partially downloaded, this piece will be selected prioritarily
446  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
447  * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
448  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
449  * @param remote_peer: information about the connection
450  * @return the piece to download if possible. -1 otherwise
451  */
452 int Peer::selectPieceToDownload(const Connection* remote_peer)
453 {
454   int piece = partiallyDownloadedPiece(remote_peer);
455   // strict priority policy
456   if (piece != -1)
457     return piece;
458
459   // end game mode
460   if (countPieces(current_pieces) >= (FILE_PIECES - countPieces(bitfield_)) && isInterestedBy(remote_peer)) {
461     int nb_interesting_pieces = 0;
462     // compute the number of interesting pieces
463     for (unsigned int i = 0; i < FILE_PIECES; i++)
464       if (remotePeerHasMissingPiece(remote_peer, i))
465         nb_interesting_pieces++;
466
467     xbt_assert(nb_interesting_pieces != 0);
468     // get a random interesting piece
469     int random_piece_index = random.uniform_int(0, nb_interesting_pieces - 1);
470     int current_index      = 0;
471     for (unsigned int i = 0; i < FILE_PIECES; i++) {
472       if (remotePeerHasMissingPiece(remote_peer, i)) {
473         if (random_piece_index == current_index) {
474           piece = i;
475           break;
476         }
477         current_index++;
478       }
479     }
480     xbt_assert(piece != -1);
481     return piece;
482   }
483   // Random first policy
484   if (countPieces(bitfield_) < 4 && isInterestedByFree(remote_peer)) {
485     int nb_interesting_pieces = 0;
486     // compute the number of interesting pieces
487     for (unsigned int i = 0; i < FILE_PIECES; i++)
488       if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i))
489         nb_interesting_pieces++;
490     xbt_assert(nb_interesting_pieces != 0);
491     // get a random interesting piece
492     int random_piece_index = random.uniform_int(0, nb_interesting_pieces - 1);
493     int current_index      = 0;
494     for (unsigned int i = 0; i < FILE_PIECES; i++) {
495       if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i)) {
496         if (random_piece_index == current_index) {
497           piece = i;
498           break;
499         }
500         current_index++;
501       }
502     }
503     xbt_assert(piece != -1);
504     return piece;
505   } else { // Rarest first policy
506     short min         = SHRT_MAX;
507     int nb_min_pieces = 0;
508     int current_index = 0;
509     // compute the smallest number of copies of available pieces
510     for (unsigned int i = 0; i < FILE_PIECES; i++) {
511       if (pieces_count[i] < min && remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i))
512         min = pieces_count[i];
513     }
514
515     xbt_assert(min != SHRT_MAX || not isInterestedByFree(remote_peer));
516     // compute the number of rarest pieces
517     for (unsigned int i = 0; i < FILE_PIECES; i++)
518       if (pieces_count[i] == min && remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i))
519         nb_min_pieces++;
520
521     xbt_assert(nb_min_pieces != 0 || not isInterestedByFree(remote_peer));
522     // get a random rarest piece
523     int random_rarest_index = 0;
524     if (nb_min_pieces > 0) {
525       random_rarest_index = random.uniform_int(0, nb_min_pieces - 1);
526     }
527     for (unsigned int i = 0; i < FILE_PIECES; i++)
528       if (pieces_count[i] == min && remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i)) {
529         if (random_rarest_index == current_index) {
530           piece = i;
531           break;
532         }
533         current_index++;
534       }
535
536     xbt_assert(piece != -1 || not isInterestedByFree(remote_peer));
537     return piece;
538   }
539 }
540
541 void Peer::updateChokedPeers()
542 {
543   if (nbInterestedPeers() == 0)
544     return;
545   XBT_DEBUG("(%d) update_choked peers %zu active peers", id, active_peers.size());
546   // update the current round
547   round_                  = (round_ + 1) % 3;
548   Connection* chosen_peer = nullptr;
549   // select first active peer and remove it from the set
550   Connection* choked_peer;
551   if (active_peers.empty()) {
552     choked_peer = nullptr;
553   } else {
554     choked_peer = *active_peers.begin();
555     active_peers.erase(choked_peer);
556   }
557
558   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
559   if (hasFinished()) {
560     double unchoke_time = sg4::Engine::get_clock() + 1;
561     for (auto& kv : connected_peers) {
562       Connection& remote_peer = kv.second;
563       if (remote_peer.last_unchoke < unchoke_time && remote_peer.interested && remote_peer.choked_upload) {
564         unchoke_time = remote_peer.last_unchoke;
565         chosen_peer  = &remote_peer;
566       }
567     }
568   } else {
569     // Random optimistic unchoking
570     if (round_ == 0) {
571       int j = 0;
572       do {
573         // We choose a random peer to unchoke.
574         auto chosen_peer_it = connected_peers.begin();
575         std::advance(chosen_peer_it, random.uniform_int(0, static_cast<int>(connected_peers.size() - 1)));
576         chosen_peer = &chosen_peer_it->second;
577         if (not chosen_peer->interested || not chosen_peer->choked_upload)
578           chosen_peer = nullptr;
579         else
580           XBT_DEBUG("Nothing to do, keep going");
581         j++;
582       } while (chosen_peer == nullptr && j < MAXIMUM_PEERS);
583     } else {
584       // Use the "fastest download" policy.
585       double fastest_speed = 0.0;
586       for (auto& kv : connected_peers) {
587         Connection& remote_peer = kv.second;
588         if (remote_peer.peer_speed > fastest_speed && remote_peer.choked_upload && remote_peer.interested) {
589           fastest_speed = remote_peer.peer_speed;
590           chosen_peer   = &remote_peer;
591         }
592       }
593     }
594   }
595
596   if (chosen_peer != nullptr)
597     XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", id, chosen_peer->id,
598               chosen_peer->interested, chosen_peer->choked_upload);
599
600   if (choked_peer != chosen_peer) {
601     if (choked_peer != nullptr) {
602       xbt_assert(not choked_peer->choked_upload, "Tries to choked a choked peer");
603       choked_peer->choked_upload = true;
604       updateActivePeersSet(choked_peer);
605       XBT_DEBUG("(%d) Sending a CHOKE to %d", id, choked_peer->id);
606       sendMessage(choked_peer->mailbox_, MessageType::CHOKE, message_size(MessageType::CHOKE));
607     }
608     if (chosen_peer != nullptr) {
609       xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
610       chosen_peer->choked_upload = false;
611       active_peers.insert(chosen_peer);
612       chosen_peer->last_unchoke = sg4::Engine::get_clock();
613       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id);
614       updateActivePeersSet(chosen_peer);
615       sendMessage(chosen_peer->mailbox_, MessageType::UNCHOKE, message_size(MessageType::UNCHOKE));
616     }
617   }
618 }
619
620 /** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.*/
621 void Peer::updateInterestedAfterReceive()
622 {
623   for (auto& kv : connected_peers) {
624     Connection& remote_peer = kv.second;
625     if (remote_peer.am_interested) {
626       bool interested = false;
627       // Check if the peer still has a piece we want.
628       for (unsigned int i = 0; i < FILE_PIECES; i++)
629         if (remotePeerHasMissingPiece(&remote_peer, i)) {
630           interested = true;
631           break;
632         }
633
634       if (not interested) { // no more piece to download from connection
635         remote_peer.am_interested = false;
636         sendMessage(remote_peer.mailbox_, MessageType::NOTINTERESTED, message_size(MessageType::NOTINTERESTED));
637       }
638     }
639   }
640 }
641
642 void Peer::updateBitfieldBlocks(int piece, int block_index, int block_length)
643 {
644   xbt_assert((piece >= 0 && static_cast<unsigned int>(piece) <= FILE_PIECES), "Wrong piece.");
645   xbt_assert((block_index >= 0 && static_cast<unsigned int>(block_index) <= PIECES_BLOCKS), "Wrong block : %d.",
646              block_index);
647   for (int i = block_index; i < (block_index + block_length); i++)
648     bitfield_blocks |= (1ULL << static_cast<unsigned int>(piece * PIECES_BLOCKS + i));
649 }
650
651 bool Peer::hasCompletedPiece(unsigned int piece) const
652 {
653   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
654     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
655       return false;
656   return true;
657 }
658
659 int Peer::getFirstMissingBlockFrom(int piece) const
660 {
661   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
662     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
663       return i;
664   return -1;
665 }
666
667 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
668 int Peer::partiallyDownloadedPiece(const Connection* remote_peer) const
669 {
670   for (unsigned int i = 0; i < FILE_PIECES; i++)
671     if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i) && getFirstMissingBlockFrom(i) > 0)
672       return i;
673   return -1;
674 }