Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
42b93556c8b0a10ab680c526c67e899ecb3624af
[simgrid.git] / examples / cpp / app-bittorrent / s4u-peer.cpp
1 /* Copyright (c) 2012-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <algorithm>
7 #include <array>
8 #include <climits>
9
10 #include "s4u-peer.hpp"
11 #include "s4u-tracker.hpp"
12
13 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_bt_peer, "Messages specific for the peers");
14
15 /*
16  * User parameters for transferred file data. For the test, the default values are :
17  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
18  */
19 constexpr unsigned long FILE_PIECES   = 10UL;
20 constexpr unsigned long PIECES_BLOCKS = 5UL;
21 constexpr int BLOCK_SIZE              = 16384;
22
23 /** Number of blocks asked by each request */
24 constexpr unsigned long BLOCKS_REQUESTED = 2UL;
25
26 constexpr double SLEEP_DURATION     = 1.0;
27 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
28
29 /** Message sizes
30  * Sizes based on report by A. Legout et al, Understanding BitTorrent: An Experimental Perspective
31  * http://hal.inria.fr/inria-00000156/en
32  */
33 constexpr unsigned message_size(MessageType type)
34 {
35   constexpr std::array<unsigned, 10> sizes{{/* HANDSHAKE     */ 68,
36                                             /* CHOKE         */ 5,
37                                             /* UNCHOKE       */ 5,
38                                             /* INTERESTED    */ 5,
39                                             /* NOTINTERESTED */ 5,
40                                             /* HAVE          */ 9,
41                                             /* BITFIELD      */ 5,
42                                             /* REQUEST       */ 17,
43                                             /* PIECE         */ 13,
44                                             /* CANCEL        */ 17}};
45   return sizes[static_cast<int>(type)];
46 }
47
48 constexpr const char* message_name(MessageType type)
49 {
50   constexpr std::array<const char*, 10> names{{"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "HAVE",
51                                                "BITFIELD", "REQUEST", "PIECE", "CANCEL"}};
52   return names[static_cast<int>(type)];
53 }
54
55 Peer::Peer(std::vector<std::string> args)
56 {
57   // Check arguments
58   xbt_assert(args.size() == 3 || args.size() == 4, "Wrong number of arguments");
59   try {
60     id       = std::stoi(args[1]);
61     mailbox_ = simgrid::s4u::Mailbox::by_name(std::to_string(id));
62   } catch (const std::invalid_argument&) {
63     throw std::invalid_argument("Invalid ID:" + args[1]);
64   }
65   random.set_seed(id);
66
67   try {
68     deadline = std::stod(args[2]);
69   } catch (const std::invalid_argument&) {
70     throw std::invalid_argument("Invalid deadline:" + args[2]);
71   }
72   xbt_assert(deadline > 0, "Wrong deadline supplied");
73
74   if (args.size() == 4 && args[3] == "1") {
75     bitfield_       = (1U << FILE_PIECES) - 1U;
76     bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
77   }
78   pieces_count.resize(FILE_PIECES);
79
80   XBT_INFO("Hi, I'm joining the network with id %d", id);
81 }
82
83 /** Peer main function */
84 void Peer::operator()()
85 {
86   // Getting peer data from the tracker.
87   if (getPeersFromTracker()) {
88     XBT_DEBUG("Got %zu peers from the tracker. Current status is: %s", connected_peers.size(), getStatus().c_str());
89     begin_receive_time = simgrid::s4u::Engine::get_clock();
90     mailbox_->set_receiver(simgrid::s4u::Actor::self());
91     if (hasFinished()) {
92       sendHandshakeToAllPeers();
93     } else {
94       leech();
95     }
96     seed();
97   } else {
98     XBT_INFO("Couldn't contact the tracker.");
99   }
100
101   XBT_INFO("Here is my current status: %s", getStatus().c_str());
102 }
103
104 bool Peer::getPeersFromTracker()
105 {
106   simgrid::s4u::Mailbox* tracker_mailbox = simgrid::s4u::Mailbox::by_name(TRACKER_MAILBOX);
107   // Build the task to send to the tracker
108   auto* peer_request = new TrackerQuery(id, mailbox_);
109   try {
110     XBT_DEBUG("Sending a peer request to the tracker.");
111     tracker_mailbox->put(peer_request, TRACKER_COMM_SIZE, GET_PEERS_TIMEOUT);
112   } catch (const simgrid::TimeoutException&) {
113     XBT_DEBUG("Timeout expired when requesting peers to tracker");
114     delete peer_request;
115     return false;
116   }
117
118   try {
119     auto answer = mailbox_->get_unique<TrackerAnswer>(GET_PEERS_TIMEOUT);
120     // Add the peers the tracker gave us to our peer list.
121     for (auto const& peer_id : answer->getPeers())
122       if (id != peer_id)
123         connected_peers.emplace(peer_id, Connection(peer_id));
124   } catch (const simgrid::TimeoutException&) {
125     XBT_DEBUG("Timeout expired when requesting peers to tracker");
126     return false;
127   }
128   return true;
129 }
130
131 void Peer::sendHandshakeToAllPeers()
132 {
133   for (auto const& kv : connected_peers) {
134     const Connection& remote_peer = kv.second;
135     auto* handshake               = new Message(MessageType::HANDSHAKE, id, mailbox_);
136     remote_peer.mailbox_->put_init(handshake, message_size(MessageType::HANDSHAKE))->detach();
137     XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer.id);
138   }
139 }
140
141 void Peer::sendMessage(simgrid::s4u::Mailbox* mailbox, MessageType type, uint64_t size)
142 {
143   XBT_DEBUG("Sending %s to %s", message_name(type), mailbox->get_cname());
144   mailbox->put_init(new Message(type, id, bitfield_, mailbox_), size)->detach();
145 }
146
147 void Peer::sendBitfield(simgrid::s4u::Mailbox* mailbox)
148 {
149   XBT_DEBUG("Sending a BITFIELD to %s", mailbox->get_cname());
150   mailbox
151       ->put_init(new Message(MessageType::BITFIELD, id, bitfield_, mailbox_),
152                  message_size(MessageType::BITFIELD) + BITS_TO_BYTES(FILE_PIECES))
153       ->detach();
154 }
155
156 void Peer::sendPiece(simgrid::s4u::Mailbox* mailbox, unsigned int piece, int block_index, int block_length)
157 {
158   xbt_assert(not hasNotPiece(piece), "Tried to send a unavailable piece.");
159   XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->get_cname());
160   mailbox->put_init(new Message(MessageType::PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)
161       ->detach();
162 }
163
164 void Peer::sendHaveToAllPeers(unsigned int piece)
165 {
166   XBT_DEBUG("Sending HAVE message to all my peers");
167   for (auto const& kv : connected_peers) {
168     const Connection& remote_peer = kv.second;
169     remote_peer.mailbox_->put_init(new Message(MessageType::HAVE, id, mailbox_, piece), message_size(MessageType::HAVE))
170         ->detach();
171   }
172 }
173
174 void Peer::sendRequestTo(Connection* remote_peer, unsigned int piece)
175 {
176   remote_peer->current_piece = piece;
177   xbt_assert(remote_peer->hasPiece(piece));
178   int block_index = getFirstMissingBlockFrom(piece);
179   if (block_index != -1) {
180     int block_length = static_cast<int>(std::min(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index));
181     XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", remote_peer->mailbox_->get_cname(), piece, block_index,
182               block_length);
183     remote_peer->mailbox_
184         ->put_init(new Message(MessageType::REQUEST, id, mailbox_, piece, block_index, block_length),
185                    message_size(MessageType::REQUEST))
186         ->detach();
187   }
188 }
189
190 std::string Peer::getStatus() const
191 {
192   std::string res;
193   for (unsigned i = 0; i < FILE_PIECES; i++)
194     res += (bitfield_ & (1U << i)) ? '1' : '0';
195   return res;
196 }
197
198 bool Peer::hasFinished() const
199 {
200   return bitfield_ == (1U << FILE_PIECES) - 1U;
201 }
202
203 /** Indicates if the remote peer has a piece not stored by the local peer */
204 bool Peer::isInterestedBy(const Connection* remote_peer) const
205 {
206   return remote_peer->bitfield & (bitfield_ ^ ((1 << FILE_PIECES) - 1));
207 }
208
209 bool Peer::isInterestedByFree(const Connection* remote_peer) const
210 {
211   for (unsigned int i = 0; i < FILE_PIECES; i++)
212     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
213       return true;
214   return false;
215 }
216
217 void Peer::updatePiecesCountFromBitfield(unsigned int bitfield)
218 {
219   for (unsigned int i = 0; i < FILE_PIECES; i++)
220     if (bitfield & (1U << i))
221       pieces_count[i]++;
222 }
223
224 unsigned int Peer::countPieces(unsigned int bitfield) const
225 {
226   unsigned int count = 0U;
227   unsigned int n     = bitfield;
228   while (n) {
229     count += n & 1U;
230     n >>= 1U;
231   }
232   return count;
233 }
234
235 int Peer::nbInterestedPeers() const
236 {
237   return static_cast<int>(std::count_if(connected_peers.begin(), connected_peers.end(),
238                                         [](const auto& kv) { return kv.second.interested; }));
239 }
240
241 void Peer::leech()
242 {
243   double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
244   XBT_DEBUG("Start downloading.");
245
246   /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
247   sendHandshakeToAllPeers();
248   XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->get_cname());
249
250   while (simgrid::s4u::Engine::get_clock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
251     if (comm_received == nullptr) {
252       comm_received = mailbox_->get_async<Message>(&message);
253     }
254     if (comm_received->test()) {
255       handleMessage();
256       delete message;
257       comm_received = nullptr;
258     } else {
259       // We don't execute the choke algorithm if we don't already have a piece
260       if (simgrid::s4u::Engine::get_clock() >= next_choked_update && countPieces(bitfield_) > 0) {
261         updateChokedPeers();
262         next_choked_update += UPDATE_CHOKED_INTERVAL;
263       } else {
264         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
265       }
266     }
267   }
268   if (hasFinished())
269     XBT_DEBUG("%d becomes a seeder", id);
270 }
271
272 void Peer::seed()
273 {
274   double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
275   XBT_DEBUG("Start seeding.");
276   // start the main seed loop
277   while (simgrid::s4u::Engine::get_clock() < deadline) {
278     if (comm_received == nullptr) {
279       comm_received = mailbox_->get_async<Message>(&message);
280     }
281     if (comm_received->test()) {
282       handleMessage();
283       delete message;
284       comm_received = nullptr;
285     } else {
286       if (simgrid::s4u::Engine::get_clock() >= next_choked_update) {
287         updateChokedPeers();
288         // TODO: Change the choked peer algorithm when seeding.
289         next_choked_update += UPDATE_CHOKED_INTERVAL;
290       } else {
291         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
292       }
293     }
294   }
295 }
296
297 void Peer::updateActivePeersSet(Connection* remote_peer)
298 {
299   if (remote_peer->interested && not remote_peer->choked_upload)
300     active_peers.insert(remote_peer);
301   else
302     active_peers.erase(remote_peer);
303 }
304
305 void Peer::handleMessage()
306 {
307   XBT_DEBUG("Received a %s message from %s", message_name(message->type), message->return_mailbox->get_cname());
308
309   auto known_peer         = connected_peers.find(message->peer_id);
310   Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : &known_peer->second;
311   xbt_assert(remote_peer != nullptr || message->type == MessageType::HANDSHAKE,
312              "The impossible did happened: A not-in-our-list peer sent us a message.");
313
314   switch (message->type) {
315     case MessageType::HANDSHAKE:
316       // Check if the peer is in our connection list.
317       if (remote_peer == nullptr) {
318         XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id);
319         connected_peers.emplace(message->peer_id, Connection(message->peer_id));
320         sendMessage(message->return_mailbox, MessageType::HANDSHAKE, message_size(MessageType::HANDSHAKE));
321       }
322       // Send our bitfield to the peer
323       sendBitfield(message->return_mailbox);
324       break;
325     case MessageType::BITFIELD:
326       // Update the pieces list
327       updatePiecesCountFromBitfield(message->bitfield);
328       // Store the bitfield
329       remote_peer->bitfield = message->bitfield;
330       xbt_assert(not remote_peer->am_interested, "Should not be interested at first");
331       if (isInterestedBy(remote_peer)) {
332         remote_peer->am_interested = true;
333         sendMessage(message->return_mailbox, MessageType::INTERESTED, message_size(MessageType::INTERESTED));
334       }
335       break;
336     case MessageType::INTERESTED:
337       // Update the interested state of the peer.
338       remote_peer->interested = true;
339       updateActivePeersSet(remote_peer);
340       break;
341     case MessageType::NOTINTERESTED:
342       remote_peer->interested = false;
343       updateActivePeersSet(remote_peer);
344       break;
345     case MessageType::UNCHOKE:
346       xbt_assert(remote_peer->choked_download);
347       remote_peer->choked_download = false;
348       // Send requests to the peer, since it has unchoked us
349       if (remote_peer->am_interested)
350         requestNewPieceTo(remote_peer);
351       break;
352     case MessageType::CHOKE:
353       xbt_assert(not remote_peer->choked_download);
354       remote_peer->choked_download = true;
355       if (remote_peer->current_piece != -1)
356         removeCurrentPiece(remote_peer, remote_peer->current_piece);
357       break;
358     case MessageType::HAVE:
359       XBT_DEBUG("\t for piece %d", message->piece);
360       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
361                  "Wrong HAVE message received");
362       remote_peer->bitfield = remote_peer->bitfield | (1U << static_cast<unsigned int>(message->piece));
363       pieces_count[message->piece]++;
364       // If the piece is in our pieces, we tell the peer that we are interested.
365       if (not remote_peer->am_interested && hasNotPiece(message->piece)) {
366         remote_peer->am_interested = true;
367         sendMessage(message->return_mailbox, MessageType::INTERESTED, message_size(MessageType::INTERESTED));
368         if (not remote_peer->choked_download)
369           requestNewPieceTo(remote_peer);
370       }
371       break;
372     case MessageType::REQUEST:
373       xbt_assert(remote_peer->interested);
374       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
375                  "Wrong HAVE message received");
376       if (not remote_peer->choked_upload) {
377         XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
378                   message->block_index + message->block_length);
379         if (not hasNotPiece(message->piece)) {
380           sendPiece(message->return_mailbox, message->piece, message->block_index, message->block_length);
381         }
382       } else {
383         XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
384       }
385       break;
386     case MessageType::PIECE:
387       XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
388                 message->block_index + message->block_length);
389       xbt_assert(not remote_peer->choked_download);
390       xbt_assert(not remote_peer->choked_download, "Can't received a piece if I'm choked !");
391       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
392                  "Wrong piece received");
393       // TODO: Execute a computation.
394       if (hasNotPiece(static_cast<unsigned int>(message->piece))) {
395         updateBitfieldBlocks(message->piece, message->block_index, message->block_length);
396         if (hasCompletedPiece(static_cast<unsigned int>(message->piece))) {
397           // Removing the piece from our piece list
398           removeCurrentPiece(remote_peer, message->piece);
399           // Setting the fact that we have the piece
400           bitfield_ = bitfield_ | (1U << static_cast<unsigned int>(message->piece));
401           XBT_DEBUG("My status is now %s", getStatus().c_str());
402           // Sending the information to all the peers we are connected to
403           sendHaveToAllPeers(message->piece);
404           // sending UNINTERESTED to peers that do not have what we want.
405           updateInterestedAfterReceive();
406         } else {                                      // piece not completed
407           sendRequestTo(remote_peer, message->piece); // ask for the next block
408         }
409       } else {
410         XBT_DEBUG("However, we already have it");
411         requestNewPieceTo(remote_peer);
412       }
413       break;
414     case MessageType::CANCEL:
415       break;
416     default:
417       THROW_IMPOSSIBLE;
418   }
419   // Update the peer speed.
420   if (remote_peer) {
421     remote_peer->addSpeedValue(1.0 / (simgrid::s4u::Engine::get_clock() - begin_receive_time));
422   }
423   begin_receive_time = simgrid::s4u::Engine::get_clock();
424 }
425
426 /** Selects the appropriate piece to download and requests it to the remote_peer */
427 void Peer::requestNewPieceTo(Connection* remote_peer)
428 {
429   int piece = selectPieceToDownload(remote_peer);
430   if (piece != -1) {
431     current_pieces |= (1U << (unsigned int)piece);
432     sendRequestTo(remote_peer, piece);
433   }
434 }
435
436 void Peer::removeCurrentPiece(Connection* remote_peer, unsigned int current_piece)
437 {
438   current_pieces &= ~(1U << current_piece);
439   remote_peer->current_piece = -1;
440 }
441
442 /** @brief Return the piece to be downloaded
443  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
444  * If a piece is partially downloaded, this piece will be selected prioritarily
445  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
446  * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
447  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
448  * @param remote_peer: information about the connection
449  * @return the piece to download if possible. -1 otherwise
450  */
451 int Peer::selectPieceToDownload(const Connection* remote_peer)
452 {
453   int piece = partiallyDownloadedPiece(remote_peer);
454   // strict priority policy
455   if (piece != -1)
456     return piece;
457
458   // end game mode
459   if (countPieces(current_pieces) >= (FILE_PIECES - countPieces(bitfield_)) && isInterestedBy(remote_peer)) {
460     int nb_interesting_pieces = 0;
461     // compute the number of interesting pieces
462     for (unsigned int i = 0; i < FILE_PIECES; i++)
463       if (remotePeerHasMissingPiece(remote_peer, i))
464         nb_interesting_pieces++;
465
466     xbt_assert(nb_interesting_pieces != 0);
467     // get a random interesting piece
468     int random_piece_index = random.uniform_int(0, nb_interesting_pieces - 1);
469     int current_index      = 0;
470     for (unsigned int i = 0; i < FILE_PIECES; i++) {
471       if (remotePeerHasMissingPiece(remote_peer, i)) {
472         if (random_piece_index == current_index) {
473           piece = i;
474           break;
475         }
476         current_index++;
477       }
478     }
479     xbt_assert(piece != -1);
480     return piece;
481   }
482   // Random first policy
483   if (countPieces(bitfield_) < 4 && isInterestedByFree(remote_peer)) {
484     int nb_interesting_pieces = 0;
485     // compute the number of interesting pieces
486     for (unsigned int i = 0; i < FILE_PIECES; i++)
487       if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i))
488         nb_interesting_pieces++;
489     xbt_assert(nb_interesting_pieces != 0);
490     // get a random interesting piece
491     int random_piece_index = random.uniform_int(0, nb_interesting_pieces - 1);
492     int current_index      = 0;
493     for (unsigned int i = 0; i < FILE_PIECES; i++) {
494       if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i)) {
495         if (random_piece_index == current_index) {
496           piece = i;
497           break;
498         }
499         current_index++;
500       }
501     }
502     xbt_assert(piece != -1);
503     return piece;
504   } else { // Rarest first policy
505     short min         = SHRT_MAX;
506     int nb_min_pieces = 0;
507     int current_index = 0;
508     // compute the smallest number of copies of available pieces
509     for (unsigned int i = 0; i < FILE_PIECES; i++) {
510       if (pieces_count[i] < min && remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i))
511         min = pieces_count[i];
512     }
513
514     xbt_assert(min != SHRT_MAX || not isInterestedByFree(remote_peer));
515     // compute the number of rarest pieces
516     for (unsigned int i = 0; i < FILE_PIECES; i++)
517       if (pieces_count[i] == min && remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i))
518         nb_min_pieces++;
519
520     xbt_assert(nb_min_pieces != 0 || not isInterestedByFree(remote_peer));
521     // get a random rarest piece
522     int random_rarest_index = 0;
523     if (nb_min_pieces > 0) {
524       random_rarest_index = random.uniform_int(0, nb_min_pieces - 1);
525     }
526     for (unsigned int i = 0; i < FILE_PIECES; i++)
527       if (pieces_count[i] == min && remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i)) {
528         if (random_rarest_index == current_index) {
529           piece = i;
530           break;
531         }
532         current_index++;
533       }
534
535     xbt_assert(piece != -1 || not isInterestedByFree(remote_peer));
536     return piece;
537   }
538 }
539
540 void Peer::updateChokedPeers()
541 {
542   if (nbInterestedPeers() == 0)
543     return;
544   XBT_DEBUG("(%d) update_choked peers %zu active peers", id, active_peers.size());
545   // update the current round
546   round_                  = (round_ + 1) % 3;
547   Connection* chosen_peer = nullptr;
548   // select first active peer and remove it from the set
549   Connection* choked_peer;
550   if (active_peers.empty()) {
551     choked_peer = nullptr;
552   } else {
553     choked_peer = *active_peers.begin();
554     active_peers.erase(choked_peer);
555   }
556
557   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
558   if (hasFinished()) {
559     double unchoke_time = simgrid::s4u::Engine::get_clock() + 1;
560     for (auto& kv : connected_peers) {
561       Connection& remote_peer = kv.second;
562       if (remote_peer.last_unchoke < unchoke_time && remote_peer.interested && remote_peer.choked_upload) {
563         unchoke_time = remote_peer.last_unchoke;
564         chosen_peer  = &remote_peer;
565       }
566     }
567   } else {
568     // Random optimistic unchoking
569     if (round_ == 0) {
570       int j = 0;
571       do {
572         // We choose a random peer to unchoke.
573         auto chosen_peer_it = connected_peers.begin();
574         std::advance(chosen_peer_it, random.uniform_int(0, static_cast<int>(connected_peers.size() - 1)));
575         chosen_peer = &chosen_peer_it->second;
576         if (not chosen_peer->interested || not chosen_peer->choked_upload)
577           chosen_peer = nullptr;
578         else
579           XBT_DEBUG("Nothing to do, keep going");
580         j++;
581       } while (chosen_peer == nullptr && j < MAXIMUM_PEERS);
582     } else {
583       // Use the "fastest download" policy.
584       double fastest_speed = 0.0;
585       for (auto& kv : connected_peers) {
586         Connection& remote_peer = kv.second;
587         if (remote_peer.peer_speed > fastest_speed && remote_peer.choked_upload && remote_peer.interested) {
588           fastest_speed = remote_peer.peer_speed;
589           chosen_peer   = &remote_peer;
590         }
591       }
592     }
593   }
594
595   if (chosen_peer != nullptr)
596     XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", id, chosen_peer->id,
597               chosen_peer->interested, chosen_peer->choked_upload);
598
599   if (choked_peer != chosen_peer) {
600     if (choked_peer != nullptr) {
601       xbt_assert(not choked_peer->choked_upload, "Tries to choked a choked peer");
602       choked_peer->choked_upload = true;
603       updateActivePeersSet(choked_peer);
604       XBT_DEBUG("(%d) Sending a CHOKE to %d", id, choked_peer->id);
605       sendMessage(choked_peer->mailbox_, MessageType::CHOKE, message_size(MessageType::CHOKE));
606     }
607     if (chosen_peer != nullptr) {
608       xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
609       chosen_peer->choked_upload = false;
610       active_peers.insert(chosen_peer);
611       chosen_peer->last_unchoke = simgrid::s4u::Engine::get_clock();
612       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id);
613       updateActivePeersSet(chosen_peer);
614       sendMessage(chosen_peer->mailbox_, MessageType::UNCHOKE, message_size(MessageType::UNCHOKE));
615     }
616   }
617 }
618
619 /** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.*/
620 void Peer::updateInterestedAfterReceive()
621 {
622   for (auto& kv : connected_peers) {
623     Connection& remote_peer = kv.second;
624     if (remote_peer.am_interested) {
625       bool interested = false;
626       // Check if the peer still has a piece we want.
627       for (unsigned int i = 0; i < FILE_PIECES; i++)
628         if (remotePeerHasMissingPiece(&remote_peer, i)) {
629           interested = true;
630           break;
631         }
632
633       if (not interested) { // no more piece to download from connection
634         remote_peer.am_interested = false;
635         sendMessage(remote_peer.mailbox_, MessageType::NOTINTERESTED, message_size(MessageType::NOTINTERESTED));
636       }
637     }
638   }
639 }
640
641 void Peer::updateBitfieldBlocks(int piece, int block_index, int block_length)
642 {
643   xbt_assert((piece >= 0 && static_cast<unsigned int>(piece) <= FILE_PIECES), "Wrong piece.");
644   xbt_assert((block_index >= 0 && static_cast<unsigned int>(block_index) <= PIECES_BLOCKS), "Wrong block : %d.",
645              block_index);
646   for (int i = block_index; i < (block_index + block_length); i++)
647     bitfield_blocks |= (1ULL << static_cast<unsigned int>(piece * PIECES_BLOCKS + i));
648 }
649
650 bool Peer::hasCompletedPiece(unsigned int piece) const
651 {
652   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
653     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
654       return false;
655   return true;
656 }
657
658 int Peer::getFirstMissingBlockFrom(int piece) const
659 {
660   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
661     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
662       return i;
663   return -1;
664 }
665
666 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
667 int Peer::partiallyDownloadedPiece(const Connection* remote_peer) const
668 {
669   for (unsigned int i = 0; i < FILE_PIECES; i++)
670     if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i) && getFirstMissingBlockFrom(i) > 0)
671       return i;
672   return -1;
673 }