Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'xbt_random' into 'master'
[simgrid.git] / examples / s4u / app-bittorrent / s4u-peer.cpp
1 /* Copyright (c) 2012-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <algorithm>
7 #include <climits>
8
9 #include "s4u-peer.hpp"
10 #include "s4u-tracker.hpp"
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_bt_peer, "Messages specific for the peers");
13
14 /*
15  * User parameters for transferred file data. For the test, the default values are :
16  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
17  */
18 constexpr unsigned long FILE_PIECES   = 10UL;
19 constexpr unsigned long PIECES_BLOCKS = 5UL;
20 constexpr int BLOCK_SIZE              = 16384;
21
22 /** Number of blocks asked by each request */
23 constexpr unsigned long BLOCKS_REQUESTED = 2UL;
24
25 constexpr double SLEEP_DURATION     = 1.0;
26 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
27
28 Peer::Peer(std::vector<std::string> args)
29 {
30   // Check arguments
31   xbt_assert(args.size() == 3 || args.size() == 4, "Wrong number of arguments");
32   try {
33     id       = std::stoi(args[1]);
34     mailbox_ = simgrid::s4u::Mailbox::by_name(std::to_string(id));
35   } catch (const std::invalid_argument&) {
36     throw std::invalid_argument("Invalid ID:" + args[1]);
37   }
38
39   try {
40     deadline = std::stod(args[2]);
41   } catch (const std::invalid_argument&) {
42     throw std::invalid_argument("Invalid deadline:" + args[2]);
43   }
44   xbt_assert(deadline > 0, "Wrong deadline supplied");
45
46   if (args.size() == 4 && args[3] == "1") {
47     bitfield_       = (1U << FILE_PIECES) - 1U;
48     bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
49   }
50   pieces_count.resize(FILE_PIECES);
51
52   XBT_INFO("Hi, I'm joining the network with id %d", id);
53 }
54
55 /** Peer main function */
56 void Peer::operator()()
57 {
58   // Getting peer data from the tracker.
59   if (getPeersFromTracker()) {
60     XBT_DEBUG("Got %zu peers from the tracker. Current status is: %s", connected_peers.size(), getStatus().c_str());
61     begin_receive_time = simgrid::s4u::Engine::get_clock();
62     mailbox_->set_receiver(simgrid::s4u::Actor::self());
63     if (hasFinished()) {
64       sendHandshakeToAllPeers();
65     } else {
66       leech();
67     }
68     seed();
69   } else {
70     XBT_INFO("Couldn't contact the tracker.");
71   }
72
73   XBT_INFO("Here is my current status: %s", getStatus().c_str());
74 }
75
76 bool Peer::getPeersFromTracker()
77 {
78   simgrid::s4u::Mailbox* tracker_mailbox = simgrid::s4u::Mailbox::by_name(TRACKER_MAILBOX);
79   // Build the task to send to the tracker
80   TrackerQuery* peer_request = new TrackerQuery(id, mailbox_);
81   try {
82     XBT_DEBUG("Sending a peer request to the tracker.");
83     tracker_mailbox->put(peer_request, TRACKER_COMM_SIZE, GET_PEERS_TIMEOUT);
84   } catch (const simgrid::TimeoutException&) {
85     XBT_DEBUG("Timeout expired when requesting peers to tracker");
86     delete peer_request;
87     return false;
88   }
89
90   try {
91     TrackerAnswer* answer = static_cast<TrackerAnswer*>(mailbox_->get(GET_PEERS_TIMEOUT));
92     // Add the peers the tracker gave us to our peer list.
93     for (auto const& peer_id : answer->getPeers())
94       if (id != peer_id)
95         connected_peers.emplace(peer_id, Connection(peer_id));
96     delete answer;
97   } catch (const simgrid::TimeoutException&) {
98     XBT_DEBUG("Timeout expired when requesting peers to tracker");
99     return false;
100   }
101   return true;
102 }
103
104 void Peer::sendHandshakeToAllPeers()
105 {
106   for (auto const& kv : connected_peers) {
107     const Connection& remote_peer = kv.second;
108     Message* handshake      = new Message(MESSAGE_HANDSHAKE, id, mailbox_);
109     remote_peer.mailbox_->put_init(handshake, MESSAGE_HANDSHAKE_SIZE)->detach();
110     XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer.id);
111   }
112 }
113
114 void Peer::sendMessage(simgrid::s4u::Mailbox* mailbox, e_message_type type, uint64_t size)
115 {
116   const char* type_names[6] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "CANCEL"};
117   XBT_DEBUG("Sending %s to %s", type_names[type], mailbox->get_cname());
118   mailbox->put_init(new Message(type, id, bitfield_, mailbox_), size)->detach();
119 }
120
121 void Peer::sendBitfield(simgrid::s4u::Mailbox* mailbox)
122 {
123   XBT_DEBUG("Sending a BITFIELD to %s", mailbox->get_cname());
124   mailbox
125       ->put_init(new Message(MESSAGE_BITFIELD, id, bitfield_, mailbox_),
126                  MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES))
127       ->detach();
128 }
129
130 void Peer::sendPiece(simgrid::s4u::Mailbox* mailbox, unsigned int piece, int block_index, int block_length)
131 {
132   xbt_assert(not hasNotPiece(piece), "Tried to send a unavailable piece.");
133   XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->get_cname());
134   mailbox->put_init(new Message(MESSAGE_PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)->detach();
135 }
136
137 void Peer::sendHaveToAllPeers(unsigned int piece)
138 {
139   XBT_DEBUG("Sending HAVE message to all my peers");
140   for (auto const& kv : connected_peers) {
141     const Connection& remote_peer = kv.second;
142     remote_peer.mailbox_->put_init(new Message(MESSAGE_HAVE, id, mailbox_, piece), MESSAGE_HAVE_SIZE)->detach();
143   }
144 }
145
146 void Peer::sendRequestTo(Connection* remote_peer, unsigned int piece)
147 {
148   remote_peer->current_piece = piece;
149   xbt_assert(remote_peer->hasPiece(piece));
150   int block_index = getFirstMissingBlockFrom(piece);
151   if (block_index != -1) {
152     int block_length = std::min(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
153     XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", remote_peer->mailbox_->get_cname(), piece, block_index,
154               block_length);
155     remote_peer->mailbox_
156         ->put_init(new Message(MESSAGE_REQUEST, id, mailbox_, piece, block_index, block_length), MESSAGE_REQUEST_SIZE)
157         ->detach();
158   }
159 }
160
161 std::string Peer::getStatus()
162 {
163   std::string res = std::string("");
164   for (int i = FILE_PIECES - 1; i >= 0; i--)
165     res = std::string((bitfield_ & (1U << i)) ? "1" : "0") + res;
166   return res;
167 }
168
169 bool Peer::hasFinished()
170 {
171   return bitfield_ == (1U << FILE_PIECES) - 1U;
172 }
173
174 /** Indicates if the remote peer has a piece not stored by the local peer */
175 bool Peer::isInterestedBy(Connection* remote_peer)
176 {
177   return remote_peer->bitfield & (bitfield_ ^ ((1 << FILE_PIECES) - 1));
178 }
179
180 bool Peer::isInterestedByFree(Connection* remote_peer)
181 {
182   for (unsigned int i = 0; i < FILE_PIECES; i++)
183     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
184       return true;
185   return false;
186 }
187
188 void Peer::updatePiecesCountFromBitfield(unsigned int bitfield)
189 {
190   for (unsigned int i = 0; i < FILE_PIECES; i++)
191     if (bitfield & (1U << i))
192       pieces_count[i]++;
193 }
194
195 unsigned int Peer::countPieces(unsigned int bitfield)
196 {
197   unsigned int count = 0U;
198   unsigned int n     = bitfield;
199   while (n) {
200     count += n & 1U;
201     n >>= 1U;
202   }
203   return count;
204 }
205
206 int Peer::nbInterestedPeers()
207 {
208   int nb = 0;
209   for (auto const& kv : connected_peers)
210     if (kv.second.interested)
211       nb++;
212   return nb;
213 }
214
215 void Peer::leech()
216 {
217   double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
218   XBT_DEBUG("Start downloading.");
219
220   /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
221   sendHandshakeToAllPeers();
222   XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->get_cname());
223
224   void* data = nullptr;
225   while (simgrid::s4u::Engine::get_clock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
226     if (comm_received == nullptr) {
227       comm_received = mailbox_->get_async(&data);
228     }
229     if (comm_received->test()) {
230       message = static_cast<Message*>(data);
231       handleMessage();
232       delete message;
233       comm_received = nullptr;
234     } else {
235       // We don't execute the choke algorithm if we don't already have a piece
236       if (simgrid::s4u::Engine::get_clock() >= next_choked_update && countPieces(bitfield_) > 0) {
237         updateChokedPeers();
238         next_choked_update += UPDATE_CHOKED_INTERVAL;
239       } else {
240         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
241       }
242     }
243   }
244   if (hasFinished())
245     XBT_DEBUG("%d becomes a seeder", id);
246 }
247
248 void Peer::seed()
249 {
250   double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
251   XBT_DEBUG("Start seeding.");
252   // start the main seed loop
253   void* data = nullptr;
254   while (simgrid::s4u::Engine::get_clock() < deadline) {
255     if (comm_received == nullptr) {
256       comm_received = mailbox_->get_async(&data);
257     }
258     if (comm_received->test()) {
259       message = static_cast<Message*>(data);
260       handleMessage();
261       delete message;
262       comm_received = nullptr;
263     } else {
264       if (simgrid::s4u::Engine::get_clock() >= next_choked_update) {
265         updateChokedPeers();
266         // TODO: Change the choked peer algorithm when seeding.
267         next_choked_update += UPDATE_CHOKED_INTERVAL;
268       } else {
269         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
270       }
271     }
272   }
273 }
274
275 void Peer::updateActivePeersSet(Connection* remote_peer)
276 {
277   if (remote_peer->interested && not remote_peer->choked_upload)
278     active_peers.insert(remote_peer);
279   else
280     active_peers.erase(remote_peer);
281 }
282
283 void Peer::handleMessage()
284 {
285   const char* type_names[10] = {"HANDSHAKE", "CHOKE",    "UNCHOKE", "INTERESTED", "NOTINTERESTED",
286                                 "HAVE",      "BITFIELD", "REQUEST", "PIECE",      "CANCEL"};
287
288   XBT_DEBUG("Received a %s message from %s", type_names[message->type], message->return_mailbox->get_cname());
289
290   auto known_peer         = connected_peers.find(message->peer_id);
291   Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : &known_peer->second;
292   xbt_assert(remote_peer != nullptr || message->type == MESSAGE_HANDSHAKE,
293              "The impossible did happened: A not-in-our-list peer sent us a message.");
294
295   switch (message->type) {
296     case MESSAGE_HANDSHAKE:
297       // Check if the peer is in our connection list.
298       if (remote_peer == nullptr) {
299         XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id);
300         connected_peers.emplace(message->peer_id, Connection(message->peer_id));
301         sendMessage(message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE);
302       }
303       // Send our bitfield to the peer
304       sendBitfield(message->return_mailbox);
305       break;
306     case MESSAGE_BITFIELD:
307       // Update the pieces list
308       updatePiecesCountFromBitfield(message->bitfield);
309       // Store the bitfield
310       remote_peer->bitfield = message->bitfield;
311       xbt_assert(not remote_peer->am_interested, "Should not be interested at first");
312       if (isInterestedBy(remote_peer)) {
313         remote_peer->am_interested = true;
314         sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
315       }
316       break;
317     case MESSAGE_INTERESTED:
318       // Update the interested state of the peer.
319       remote_peer->interested = true;
320       updateActivePeersSet(remote_peer);
321       break;
322     case MESSAGE_NOTINTERESTED:
323       remote_peer->interested = false;
324       updateActivePeersSet(remote_peer);
325       break;
326     case MESSAGE_UNCHOKE:
327       xbt_assert(remote_peer->choked_download);
328       remote_peer->choked_download = false;
329       // Send requests to the peer, since it has unchoked us
330       if (remote_peer->am_interested)
331         requestNewPieceTo(remote_peer);
332       break;
333     case MESSAGE_CHOKE:
334       xbt_assert(not remote_peer->choked_download);
335       remote_peer->choked_download = true;
336       if (remote_peer->current_piece != -1)
337         removeCurrentPiece(remote_peer, remote_peer->current_piece);
338       break;
339     case MESSAGE_HAVE:
340       XBT_DEBUG("\t for piece %d", message->piece);
341       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
342                  "Wrong HAVE message received");
343       remote_peer->bitfield = remote_peer->bitfield | (1U << static_cast<unsigned int>(message->piece));
344       pieces_count[message->piece]++;
345       // If the piece is in our pieces, we tell the peer that we are interested.
346       if (not remote_peer->am_interested && hasNotPiece(message->piece)) {
347         remote_peer->am_interested = true;
348         sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
349         if (not remote_peer->choked_download)
350           requestNewPieceTo(remote_peer);
351       }
352       break;
353     case MESSAGE_REQUEST:
354       xbt_assert(remote_peer->interested);
355       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
356                  "Wrong HAVE message received");
357       if (not remote_peer->choked_upload) {
358         XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
359                   message->block_index + message->block_length);
360         if (not hasNotPiece(message->piece)) {
361           sendPiece(message->return_mailbox, message->piece, message->block_index, message->block_length);
362         }
363       } else {
364         XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
365       }
366       break;
367     case MESSAGE_PIECE:
368       XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
369                 message->block_index + message->block_length);
370       xbt_assert(not remote_peer->choked_download);
371       xbt_assert(not remote_peer->choked_download, "Can't received a piece if I'm choked !");
372       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
373                  "Wrong piece received");
374       // TODO: Execute a computation.
375       if (hasNotPiece(static_cast<unsigned int>(message->piece))) {
376         updateBitfieldBlocks(message->piece, message->block_index, message->block_length);
377         if (hasCompletedPiece(static_cast<unsigned int>(message->piece))) {
378           // Removing the piece from our piece list
379           removeCurrentPiece(remote_peer, message->piece);
380           // Setting the fact that we have the piece
381           bitfield_ = bitfield_ | (1U << static_cast<unsigned int>(message->piece));
382           XBT_DEBUG("My status is now %s", getStatus().c_str());
383           // Sending the information to all the peers we are connected to
384           sendHaveToAllPeers(message->piece);
385           // sending UNINTERESTED to peers that do not have what we want.
386           updateInterestedAfterReceive();
387         } else {                                      // piece not completed
388           sendRequestTo(remote_peer, message->piece); // ask for the next block
389         }
390       } else {
391         XBT_DEBUG("However, we already have it");
392         requestNewPieceTo(remote_peer);
393       }
394       break;
395     case MESSAGE_CANCEL:
396       break;
397     default:
398       THROW_IMPOSSIBLE;
399   }
400   // Update the peer speed.
401   if (remote_peer) {
402     remote_peer->addSpeedValue(1.0 / (simgrid::s4u::Engine::get_clock() - begin_receive_time));
403   }
404   begin_receive_time = simgrid::s4u::Engine::get_clock();
405 }
406
407 /** Selects the appropriate piece to download and requests it to the remote_peer */
408 void Peer::requestNewPieceTo(Connection* remote_peer)
409 {
410   int piece = selectPieceToDownload(remote_peer);
411   if (piece != -1) {
412     current_pieces |= (1U << (unsigned int)piece);
413     sendRequestTo(remote_peer, piece);
414   }
415 }
416
417 void Peer::removeCurrentPiece(Connection* remote_peer, unsigned int current_piece)
418 {
419   current_pieces &= ~(1U << current_piece);
420   remote_peer->current_piece = -1;
421 }
422
423 /** @brief Return the piece to be downloaded
424  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
425  * If a piece is partially downloaded, this piece will be selected prioritarily
426  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
427  * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
428  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
429  * @param remote_peer: information about the connection
430  * @return the piece to download if possible. -1 otherwise
431  */
432 int Peer::selectPieceToDownload(Connection* remote_peer)
433 {
434   int piece = partiallyDownloadedPiece(remote_peer);
435   // strict priority policy
436   if (piece != -1)
437     return piece;
438
439   // end game mode
440   if (countPieces(current_pieces) >= (FILE_PIECES - countPieces(bitfield_)) && isInterestedBy(remote_peer)) {
441     int nb_interesting_pieces = 0;
442     // compute the number of interesting pieces
443     for (unsigned int i = 0; i < FILE_PIECES; i++)
444       if (hasNotPiece(i) && remote_peer->hasPiece(i))
445         nb_interesting_pieces++;
446
447     xbt_assert(nb_interesting_pieces != 0);
448     // get a random interesting piece
449     int random_piece_index = simgrid::xbt::random::uniform_int(0, nb_interesting_pieces - 1);
450     int current_index      = 0;
451     for (unsigned int i = 0; i < FILE_PIECES; i++) {
452       if (hasNotPiece(i) && remote_peer->hasPiece(i)) {
453         if (random_piece_index == current_index) {
454           piece = i;
455           break;
456         }
457         current_index++;
458       }
459     }
460     xbt_assert(piece != -1);
461     return piece;
462   }
463   // Random first policy
464   if (countPieces(bitfield_) < 4 && isInterestedByFree(remote_peer)) {
465     int nb_interesting_pieces = 0;
466     // compute the number of interesting pieces
467     for (unsigned int i = 0; i < FILE_PIECES; i++)
468       if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
469         nb_interesting_pieces++;
470     xbt_assert(nb_interesting_pieces != 0);
471     // get a random interesting piece
472     int random_piece_index = simgrid::xbt::random::uniform_int(0, nb_interesting_pieces - 1);
473     int current_index      = 0;
474     for (unsigned int i = 0; i < FILE_PIECES; i++) {
475       if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
476         if (random_piece_index == current_index) {
477           piece = i;
478           break;
479         }
480         current_index++;
481       }
482     }
483     xbt_assert(piece != -1);
484     return piece;
485   } else { // Rarest first policy
486     short min         = SHRT_MAX;
487     int nb_min_pieces = 0;
488     int current_index = 0;
489     // compute the smallest number of copies of available pieces
490     for (unsigned int i = 0; i < FILE_PIECES; i++) {
491       if (pieces_count[i] < min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
492         min = pieces_count[i];
493     }
494
495     xbt_assert(min != SHRT_MAX || not isInterestedByFree(remote_peer));
496     // compute the number of rarest pieces
497     for (unsigned int i = 0; i < FILE_PIECES; i++)
498       if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
499         nb_min_pieces++;
500
501     xbt_assert(nb_min_pieces != 0 || not isInterestedByFree(remote_peer));
502     // get a random rarest piece
503     int random_rarest_index = 0;
504     if (nb_min_pieces > 0) {
505       random_rarest_index = simgrid::xbt::random::uniform_int(0, nb_min_pieces - 1);
506     }
507     for (unsigned int i = 0; i < FILE_PIECES; i++)
508       if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
509         if (random_rarest_index == current_index) {
510           piece = i;
511           break;
512         }
513         current_index++;
514       }
515
516     xbt_assert(piece != -1 || not isInterestedByFree(remote_peer));
517     return piece;
518   }
519 }
520
521 void Peer::updateChokedPeers()
522 {
523   if (nbInterestedPeers() == 0)
524     return;
525   XBT_DEBUG("(%d) update_choked peers %zu active peers", id, active_peers.size());
526   // update the current round
527   round_                  = (round_ + 1) % 3;
528   Connection* chosen_peer = nullptr;
529   // select first active peer and remove it from the set
530   Connection* choked_peer;
531   if (active_peers.empty()) {
532     choked_peer = nullptr;
533   } else {
534     choked_peer = *active_peers.begin();
535     active_peers.erase(choked_peer);
536   }
537
538   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
539   if (hasFinished()) {
540     double unchoke_time = simgrid::s4u::Engine::get_clock() + 1;
541     for (auto& kv : connected_peers) {
542       Connection& remote_peer = kv.second;
543       if (remote_peer.last_unchoke < unchoke_time && remote_peer.interested && remote_peer.choked_upload) {
544         unchoke_time = remote_peer.last_unchoke;
545         chosen_peer  = &remote_peer;
546       }
547     }
548   } else {
549     // Random optimistic unchoking
550     if (round_ == 0) {
551       int j = 0;
552       do {
553         // We choose a random peer to unchoke.
554         std::unordered_map<int, Connection>::iterator chosen_peer_it = connected_peers.begin();
555         std::advance(chosen_peer_it, simgrid::xbt::random::uniform_int(0, connected_peers.size() - 1));
556         chosen_peer = &chosen_peer_it->second;
557         if (not chosen_peer->interested || not chosen_peer->choked_upload)
558           chosen_peer = nullptr;
559         else
560           XBT_DEBUG("Nothing to do, keep going");
561         j++;
562       } while (chosen_peer == nullptr && j < MAXIMUM_PEERS);
563     } else {
564       // Use the "fastest download" policy.
565       double fastest_speed = 0.0;
566       for (auto& kv : connected_peers) {
567         Connection& remote_peer = kv.second;
568         if (remote_peer.peer_speed > fastest_speed && remote_peer.choked_upload && remote_peer.interested) {
569           fastest_speed = remote_peer.peer_speed;
570           chosen_peer   = &remote_peer;
571         }
572       }
573     }
574   }
575
576   if (chosen_peer != nullptr)
577     XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", id, chosen_peer->id,
578               chosen_peer->interested, chosen_peer->choked_upload);
579
580   if (choked_peer != chosen_peer) {
581     if (choked_peer != nullptr) {
582       xbt_assert(not choked_peer->choked_upload, "Tries to choked a choked peer");
583       choked_peer->choked_upload = true;
584       updateActivePeersSet(choked_peer);
585       XBT_DEBUG("(%d) Sending a CHOKE to %d", id, choked_peer->id);
586       sendMessage(choked_peer->mailbox_, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE);
587     }
588     if (chosen_peer != nullptr) {
589       xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
590       chosen_peer->choked_upload = false;
591       active_peers.insert(chosen_peer);
592       chosen_peer->last_unchoke = simgrid::s4u::Engine::get_clock();
593       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id);
594       updateActivePeersSet(chosen_peer);
595       sendMessage(chosen_peer->mailbox_, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE);
596     }
597   }
598 }
599
600 /** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.*/
601 void Peer::updateInterestedAfterReceive()
602 {
603   for (auto& kv : connected_peers) {
604     Connection& remote_peer = kv.second;
605     if (remote_peer.am_interested) {
606       bool interested = false;
607       // Check if the peer still has a piece we want.
608       for (unsigned int i = 0; i < FILE_PIECES; i++)
609         if (hasNotPiece(i) && remote_peer.hasPiece(i)) {
610           interested = true;
611           break;
612         }
613
614       if (not interested) { // no more piece to download from connection
615         remote_peer.am_interested = false;
616         sendMessage(remote_peer.mailbox_, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE);
617       }
618     }
619   }
620 }
621
622 void Peer::updateBitfieldBlocks(int piece, int block_index, int block_length)
623 {
624   xbt_assert((piece >= 0 && static_cast<unsigned int>(piece) <= FILE_PIECES), "Wrong piece.");
625   xbt_assert((block_index >= 0 && static_cast<unsigned int>(block_index) <= PIECES_BLOCKS), "Wrong block : %d.",
626              block_index);
627   for (int i = block_index; i < (block_index + block_length); i++)
628     bitfield_blocks |= (1ULL << static_cast<unsigned int>(piece * PIECES_BLOCKS + i));
629 }
630
631 bool Peer::hasCompletedPiece(unsigned int piece)
632 {
633   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
634     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
635       return false;
636   return true;
637 }
638
639 int Peer::getFirstMissingBlockFrom(int piece)
640 {
641   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
642     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
643       return i;
644   return -1;
645 }
646
647 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
648 int Peer::partiallyDownloadedPiece(Connection* remote_peer)
649 {
650   for (unsigned int i = 0; i < FILE_PIECES; i++)
651     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i) && getFirstMissingBlockFrom(i) > 0)
652       return i;
653   return -1;
654 }