Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
360bb203acdd70a687cb3b84e0d93ec70b5c3ac5
[simgrid.git] / examples / s4u / app-bittorrent / s4u-peer.cpp
1 /* Copyright (c) 2012-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <algorithm>
7 #include <climits>
8
9 #include "s4u-peer.hpp"
10 #include "s4u-tracker.hpp"
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_bt_peer, "Messages specific for the peers");
13
14 /*
15  * User parameters for transferred file data. For the test, the default values are :
16  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
17  */
18 constexpr unsigned long FILE_PIECES   = 10UL;
19 constexpr unsigned long PIECES_BLOCKS = 5UL;
20 constexpr int BLOCK_SIZE              = 16384;
21
22 /** Number of blocks asked by each request */
23 constexpr unsigned long BLOCKS_REQUESTED = 2UL;
24
25 constexpr bool ENABLE_END_GAME_MODE = true;
26 constexpr double SLEEP_DURATION     = 1.0;
27 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
28
29 Peer::Peer(std::vector<std::string> args)
30 {
31   // Check arguments
32   xbt_assert(args.size() == 3 || args.size() == 4, "Wrong number of arguments");
33   try {
34     id       = std::stoi(args[1]);
35     mailbox_ = simgrid::s4u::Mailbox::by_name(std::to_string(id));
36   } catch (const std::invalid_argument&) {
37     throw std::invalid_argument("Invalid ID:" + args[1]);
38   }
39
40   try {
41     deadline = std::stod(args[2]);
42   } catch (const std::invalid_argument&) {
43     throw std::invalid_argument("Invalid deadline:" + args[2]);
44   }
45   xbt_assert(deadline > 0, "Wrong deadline supplied");
46
47   if (args.size() == 4 && args[3] == "1") {
48     bitfield_       = (1U << FILE_PIECES) - 1U;
49     bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
50   }
51   pieces_count.resize(FILE_PIECES);
52
53   XBT_INFO("Hi, I'm joining the network with id %d", id);
54 }
55
56 /** Peer main function */
57 void Peer::operator()()
58 {
59   // Getting peer data from the tracker.
60   if (getPeersFromTracker()) {
61     XBT_DEBUG("Got %zu peers from the tracker. Current status is: %s", connected_peers.size(), getStatus().c_str());
62     begin_receive_time = simgrid::s4u::Engine::get_clock();
63     mailbox_->set_receiver(simgrid::s4u::Actor::self());
64     if (hasFinished()) {
65       sendHandshakeToAllPeers();
66     } else {
67       leech();
68     }
69     seed();
70   } else {
71     XBT_INFO("Couldn't contact the tracker.");
72   }
73
74   XBT_INFO("Here is my current status: %s", getStatus().c_str());
75 }
76
77 bool Peer::getPeersFromTracker()
78 {
79   simgrid::s4u::Mailbox* tracker_mailbox = simgrid::s4u::Mailbox::by_name(TRACKER_MAILBOX);
80   // Build the task to send to the tracker
81   TrackerQuery* peer_request = new TrackerQuery(id, mailbox_);
82   try {
83     XBT_DEBUG("Sending a peer request to the tracker.");
84     tracker_mailbox->put(peer_request, TRACKER_COMM_SIZE, GET_PEERS_TIMEOUT);
85   } catch (const simgrid::TimeoutException&) {
86     XBT_DEBUG("Timeout expired when requesting peers to tracker");
87     delete peer_request;
88     return false;
89   }
90
91   try {
92     TrackerAnswer* answer = static_cast<TrackerAnswer*>(mailbox_->get(GET_PEERS_TIMEOUT));
93     // Add the peers the tracker gave us to our peer list.
94     for (auto const& peer_id : answer->getPeers())
95       if (id != peer_id)
96         connected_peers.emplace(peer_id, Connection(peer_id));
97     delete answer;
98   } catch (const simgrid::TimeoutException&) {
99     XBT_DEBUG("Timeout expired when requesting peers to tracker");
100     return false;
101   }
102   return true;
103 }
104
105 void Peer::sendHandshakeToAllPeers()
106 {
107   for (auto const& kv : connected_peers) {
108     const Connection& remote_peer = kv.second;
109     Message* handshake      = new Message(MESSAGE_HANDSHAKE, id, mailbox_);
110     remote_peer.mailbox_->put_init(handshake, MESSAGE_HANDSHAKE_SIZE)->detach();
111     XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer.id);
112   }
113 }
114
115 void Peer::sendMessage(simgrid::s4u::Mailbox* mailbox, e_message_type type, uint64_t size)
116 {
117   const char* type_names[6] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "CANCEL"};
118   XBT_DEBUG("Sending %s to %s", type_names[type], mailbox->get_cname());
119   mailbox->put_init(new Message(type, id, bitfield_, mailbox_), size)->detach();
120 }
121
122 void Peer::sendBitfield(simgrid::s4u::Mailbox* mailbox)
123 {
124   XBT_DEBUG("Sending a BITFIELD to %s", mailbox->get_cname());
125   mailbox
126       ->put_init(new Message(MESSAGE_BITFIELD, id, bitfield_, mailbox_),
127                  MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES))
128       ->detach();
129 }
130
131 void Peer::sendPiece(simgrid::s4u::Mailbox* mailbox, unsigned int piece, int block_index, int block_length)
132 {
133   xbt_assert(not hasNotPiece(piece), "Tried to send a unavailable piece.");
134   XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->get_cname());
135   mailbox->put_init(new Message(MESSAGE_PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)->detach();
136 }
137
138 void Peer::sendHaveToAllPeers(unsigned int piece)
139 {
140   XBT_DEBUG("Sending HAVE message to all my peers");
141   for (auto const& kv : connected_peers) {
142     const Connection& remote_peer = kv.second;
143     remote_peer.mailbox_->put_init(new Message(MESSAGE_HAVE, id, mailbox_, piece), MESSAGE_HAVE_SIZE)->detach();
144   }
145 }
146
147 void Peer::sendRequestTo(Connection* remote_peer, unsigned int piece)
148 {
149   remote_peer->current_piece = piece;
150   xbt_assert(remote_peer->hasPiece(piece));
151   int block_index = getFirstMissingBlockFrom(piece);
152   if (block_index != -1) {
153     int block_length = std::min(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
154     XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", remote_peer->mailbox_->get_cname(), piece, block_index,
155               block_length);
156     remote_peer->mailbox_
157         ->put_init(new Message(MESSAGE_REQUEST, id, mailbox_, piece, block_index, block_length), MESSAGE_REQUEST_SIZE)
158         ->detach();
159   }
160 }
161
162 std::string Peer::getStatus()
163 {
164   std::string res = std::string("");
165   for (int i = FILE_PIECES - 1; i >= 0; i--)
166     res = std::string((bitfield_ & (1U << i)) ? "1" : "0") + res;
167   return res;
168 }
169
170 bool Peer::hasFinished()
171 {
172   return bitfield_ == (1U << FILE_PIECES) - 1U;
173 }
174
175 /** Indicates if the remote peer has a piece not stored by the local peer */
176 bool Peer::isInterestedBy(Connection* remote_peer)
177 {
178   return remote_peer->bitfield & (bitfield_ ^ ((1 << FILE_PIECES) - 1));
179 }
180
181 bool Peer::isInterestedByFree(Connection* remote_peer)
182 {
183   for (unsigned int i = 0; i < FILE_PIECES; i++)
184     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
185       return true;
186   return false;
187 }
188
189 void Peer::updatePiecesCountFromBitfield(unsigned int bitfield)
190 {
191   for (unsigned int i = 0; i < FILE_PIECES; i++)
192     if (bitfield & (1U << i))
193       pieces_count[i]++;
194 }
195
196 unsigned int Peer::countPieces(unsigned int bitfield)
197 {
198   unsigned int count = 0U;
199   unsigned int n     = bitfield;
200   while (n) {
201     count += n & 1U;
202     n >>= 1U;
203   }
204   return count;
205 }
206
207 int Peer::nbInterestedPeers()
208 {
209   int nb = 0;
210   for (auto const& kv : connected_peers)
211     if (kv.second.interested)
212       nb++;
213   return nb;
214 }
215
216 void Peer::leech()
217 {
218   double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
219   XBT_DEBUG("Start downloading.");
220
221   /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
222   sendHandshakeToAllPeers();
223   XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->get_cname());
224
225   void* data = nullptr;
226   while (simgrid::s4u::Engine::get_clock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
227     if (comm_received == nullptr) {
228       comm_received = mailbox_->get_async(&data);
229     }
230     if (comm_received->test()) {
231       message = static_cast<Message*>(data);
232       handleMessage();
233       delete message;
234       comm_received = nullptr;
235     } else {
236       // We don't execute the choke algorithm if we don't already have a piece
237       if (simgrid::s4u::Engine::get_clock() >= next_choked_update && countPieces(bitfield_) > 0) {
238         updateChokedPeers();
239         next_choked_update += UPDATE_CHOKED_INTERVAL;
240       } else {
241         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
242       }
243     }
244   }
245   if (hasFinished())
246     XBT_DEBUG("%d becomes a seeder", id);
247 }
248
249 void Peer::seed()
250 {
251   double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
252   XBT_DEBUG("Start seeding.");
253   // start the main seed loop
254   void* data = nullptr;
255   while (simgrid::s4u::Engine::get_clock() < deadline) {
256     if (comm_received == nullptr) {
257       comm_received = mailbox_->get_async(&data);
258     }
259     if (comm_received->test()) {
260       message = static_cast<Message*>(data);
261       handleMessage();
262       delete message;
263       comm_received = nullptr;
264     } else {
265       if (simgrid::s4u::Engine::get_clock() >= next_choked_update) {
266         updateChokedPeers();
267         // TODO: Change the choked peer algorithm when seeding.
268         next_choked_update += UPDATE_CHOKED_INTERVAL;
269       } else {
270         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
271       }
272     }
273   }
274 }
275
276 void Peer::updateActivePeersSet(Connection* remote_peer)
277 {
278   if (remote_peer->interested && not remote_peer->choked_upload)
279     active_peers.insert(remote_peer);
280   else
281     active_peers.erase(remote_peer);
282 }
283
284 void Peer::handleMessage()
285 {
286   const char* type_names[10] = {"HANDSHAKE", "CHOKE",    "UNCHOKE", "INTERESTED", "NOTINTERESTED",
287                                 "HAVE",      "BITFIELD", "REQUEST", "PIECE",      "CANCEL"};
288
289   XBT_DEBUG("Received a %s message from %s", type_names[message->type], message->return_mailbox->get_cname());
290
291   auto known_peer         = connected_peers.find(message->peer_id);
292   Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : &known_peer->second;
293   xbt_assert(remote_peer != nullptr || message->type == MESSAGE_HANDSHAKE,
294              "The impossible did happened: A not-in-our-list peer sent us a message.");
295
296   switch (message->type) {
297     case MESSAGE_HANDSHAKE:
298       // Check if the peer is in our connection list.
299       if (remote_peer == nullptr) {
300         XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id);
301         connected_peers.emplace(message->peer_id, Connection(message->peer_id));
302         sendMessage(message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE);
303       }
304       // Send our bitfield to the peer
305       sendBitfield(message->return_mailbox);
306       break;
307     case MESSAGE_BITFIELD:
308       // Update the pieces list
309       updatePiecesCountFromBitfield(message->bitfield);
310       // Store the bitfield
311       remote_peer->bitfield = message->bitfield;
312       xbt_assert(not remote_peer->am_interested, "Should not be interested at first");
313       if (isInterestedBy(remote_peer)) {
314         remote_peer->am_interested = true;
315         sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
316       }
317       break;
318     case MESSAGE_INTERESTED:
319       // Update the interested state of the peer.
320       remote_peer->interested = true;
321       updateActivePeersSet(remote_peer);
322       break;
323     case MESSAGE_NOTINTERESTED:
324       remote_peer->interested = false;
325       updateActivePeersSet(remote_peer);
326       break;
327     case MESSAGE_UNCHOKE:
328       xbt_assert(remote_peer->choked_download);
329       remote_peer->choked_download = false;
330       // Send requests to the peer, since it has unchoked us
331       if (remote_peer->am_interested)
332         requestNewPieceTo(remote_peer);
333       break;
334     case MESSAGE_CHOKE:
335       xbt_assert(not remote_peer->choked_download);
336       remote_peer->choked_download = true;
337       if (remote_peer->current_piece != -1)
338         removeCurrentPiece(remote_peer, remote_peer->current_piece);
339       break;
340     case MESSAGE_HAVE:
341       XBT_DEBUG("\t for piece %d", message->piece);
342       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
343                  "Wrong HAVE message received");
344       remote_peer->bitfield = remote_peer->bitfield | (1U << static_cast<unsigned int>(message->piece));
345       pieces_count[message->piece]++;
346       // If the piece is in our pieces, we tell the peer that we are interested.
347       if (not remote_peer->am_interested && hasNotPiece(message->piece)) {
348         remote_peer->am_interested = true;
349         sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
350         if (not remote_peer->choked_download)
351           requestNewPieceTo(remote_peer);
352       }
353       break;
354     case MESSAGE_REQUEST:
355       xbt_assert(remote_peer->interested);
356       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
357                  "Wrong HAVE message received");
358       if (not remote_peer->choked_upload) {
359         XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
360                   message->block_index + message->block_length);
361         if (not hasNotPiece(message->piece)) {
362           sendPiece(message->return_mailbox, message->piece, message->block_index, message->block_length);
363         }
364       } else {
365         XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
366       }
367       break;
368     case MESSAGE_PIECE:
369       XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
370                 message->block_index + message->block_length);
371       xbt_assert(not remote_peer->choked_download);
372       xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE,
373                  "Can't received a piece if I'm not interested without end-game mode!"
374                  "piece (%d) bitfield (%u) remote bitfield (%u)",
375                  message->piece, bitfield_, remote_peer->bitfield);
376       xbt_assert(not remote_peer->choked_download, "Can't received a piece if I'm choked !");
377       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
378                  "Wrong piece received");
379       // TODO: Execute a computation.
380       if (hasNotPiece(static_cast<unsigned int>(message->piece))) {
381         updateBitfieldBlocks(message->piece, message->block_index, message->block_length);
382         if (hasCompletedPiece(static_cast<unsigned int>(message->piece))) {
383           // Removing the piece from our piece list
384           removeCurrentPiece(remote_peer, message->piece);
385           // Setting the fact that we have the piece
386           bitfield_ = bitfield_ | (1U << static_cast<unsigned int>(message->piece));
387           XBT_DEBUG("My status is now %s", getStatus().c_str());
388           // Sending the information to all the peers we are connected to
389           sendHaveToAllPeers(message->piece);
390           // sending UNINTERESTED to peers that do not have what we want.
391           updateInterestedAfterReceive();
392         } else {                                      // piece not completed
393           sendRequestTo(remote_peer, message->piece); // ask for the next block
394         }
395       } else {
396         XBT_DEBUG("However, we already have it");
397         xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
398         requestNewPieceTo(remote_peer);
399       }
400       break;
401     case MESSAGE_CANCEL:
402       break;
403     default:
404       THROW_IMPOSSIBLE;
405   }
406   // Update the peer speed.
407   if (remote_peer) {
408     remote_peer->addSpeedValue(1.0 / (simgrid::s4u::Engine::get_clock() - begin_receive_time));
409   }
410   begin_receive_time = simgrid::s4u::Engine::get_clock();
411 }
412
413 /** Selects the appropriate piece to download and requests it to the remote_peer */
414 void Peer::requestNewPieceTo(Connection* remote_peer)
415 {
416   int piece = selectPieceToDownload(remote_peer);
417   if (piece != -1) {
418     current_pieces |= (1U << (unsigned int)piece);
419     sendRequestTo(remote_peer, piece);
420   }
421 }
422
423 void Peer::removeCurrentPiece(Connection* remote_peer, unsigned int current_piece)
424 {
425   current_pieces &= ~(1U << current_piece);
426   remote_peer->current_piece = -1;
427 }
428
429 /** @brief Return the piece to be downloaded
430  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
431  * If a piece is partially downloaded, this piece will be selected prioritarily
432  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
433  * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
434  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
435  * @param remote_peer: information about the connection
436  * @return the piece to download if possible. -1 otherwise
437  */
438 int Peer::selectPieceToDownload(Connection* remote_peer)
439 {
440   int piece = partiallyDownloadedPiece(remote_peer);
441   // strict priority policy
442   if (piece != -1)
443     return piece;
444
445   // end game mode
446   if (countPieces(current_pieces) >= (FILE_PIECES - countPieces(bitfield_)) && isInterestedBy(remote_peer)) {
447     if (not ENABLE_END_GAME_MODE)
448       return -1;
449     int nb_interesting_pieces = 0;
450     // compute the number of interesting pieces
451     for (unsigned int i = 0; i < FILE_PIECES; i++)
452       if (hasNotPiece(i) && remote_peer->hasPiece(i))
453         nb_interesting_pieces++;
454
455     xbt_assert(nb_interesting_pieces != 0);
456     // get a random interesting piece
457     std::uniform_int_distribution<int> dist(0, nb_interesting_pieces - 1);
458     int random_piece_index = dist(generator);
459     int current_index      = 0;
460     for (unsigned int i = 0; i < FILE_PIECES; i++) {
461       if (hasNotPiece(i) && remote_peer->hasPiece(i)) {
462         if (random_piece_index == current_index) {
463           piece = i;
464           break;
465         }
466         current_index++;
467       }
468     }
469     xbt_assert(piece != -1);
470     return piece;
471   }
472   // Random first policy
473   if (countPieces(bitfield_) < 4 && isInterestedByFree(remote_peer)) {
474     int nb_interesting_pieces = 0;
475     // compute the number of interesting pieces
476     for (unsigned int i = 0; i < FILE_PIECES; i++)
477       if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
478         nb_interesting_pieces++;
479     xbt_assert(nb_interesting_pieces != 0);
480     // get a random interesting piece
481     std::uniform_int_distribution<int> dist(0, nb_interesting_pieces - 1);
482     int random_piece_index = dist(generator);
483     int current_index      = 0;
484     for (unsigned int i = 0; i < FILE_PIECES; i++) {
485       if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
486         if (random_piece_index == current_index) {
487           piece = i;
488           break;
489         }
490         current_index++;
491       }
492     }
493     xbt_assert(piece != -1);
494     return piece;
495   } else { // Rarest first policy
496     short min         = SHRT_MAX;
497     int nb_min_pieces = 0;
498     int current_index = 0;
499     // compute the smallest number of copies of available pieces
500     for (unsigned int i = 0; i < FILE_PIECES; i++) {
501       if (pieces_count[i] < min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
502         min = pieces_count[i];
503     }
504
505     xbt_assert(min != SHRT_MAX || not isInterestedByFree(remote_peer));
506     // compute the number of rarest pieces
507     for (unsigned int i = 0; i < FILE_PIECES; i++)
508       if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
509         nb_min_pieces++;
510
511     xbt_assert(nb_min_pieces != 0 || not isInterestedByFree(remote_peer));
512     // get a random rarest piece
513     int random_rarest_index = 0;
514     if (nb_min_pieces > 0) {
515       std::uniform_int_distribution<int> dist(0, nb_min_pieces - 1);
516       random_rarest_index = dist(generator);
517     }
518     for (unsigned int i = 0; i < FILE_PIECES; i++)
519       if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
520         if (random_rarest_index == current_index) {
521           piece = i;
522           break;
523         }
524         current_index++;
525       }
526
527     xbt_assert(piece != -1 || not isInterestedByFree(remote_peer));
528     return piece;
529   }
530 }
531
532 void Peer::updateChokedPeers()
533 {
534   if (nbInterestedPeers() == 0)
535     return;
536   XBT_DEBUG("(%d) update_choked peers %zu active peers", id, active_peers.size());
537   // update the current round
538   round_                  = (round_ + 1) % 3;
539   Connection* chosen_peer = nullptr;
540   // select first active peer and remove it from the set
541   Connection* choked_peer;
542   if (active_peers.empty()) {
543     choked_peer = nullptr;
544   } else {
545     choked_peer = *active_peers.begin();
546     active_peers.erase(choked_peer);
547   }
548
549   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
550   if (hasFinished()) {
551     double unchoke_time = simgrid::s4u::Engine::get_clock() + 1;
552     for (auto& kv : connected_peers) {
553       Connection& remote_peer = kv.second;
554       if (remote_peer.last_unchoke < unchoke_time && remote_peer.interested && remote_peer.choked_upload) {
555         unchoke_time = remote_peer.last_unchoke;
556         chosen_peer  = &remote_peer;
557       }
558     }
559   } else {
560     // Random optimistic unchoking
561     if (round_ == 0) {
562       int j = 0;
563       do {
564         // We choose a random peer to unchoke.
565         std::unordered_map<int, Connection>::iterator chosen_peer_it = connected_peers.begin();
566         std::uniform_int_distribution<int> dist(0, connected_peers.size() - 1);
567         std::advance(chosen_peer_it, dist(generator));
568         chosen_peer = &chosen_peer_it->second;
569         if (not chosen_peer->interested || not chosen_peer->choked_upload)
570           chosen_peer = nullptr;
571         else
572           XBT_DEBUG("Nothing to do, keep going");
573         j++;
574       } while (chosen_peer == nullptr && j < MAXIMUM_PEERS);
575     } else {
576       // Use the "fastest download" policy.
577       double fastest_speed = 0.0;
578       for (auto& kv : connected_peers) {
579         Connection& remote_peer = kv.second;
580         if (remote_peer.peer_speed > fastest_speed && remote_peer.choked_upload && remote_peer.interested) {
581           fastest_speed = remote_peer.peer_speed;
582           chosen_peer   = &remote_peer;
583         }
584       }
585     }
586   }
587
588   if (chosen_peer != nullptr)
589     XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", id, chosen_peer->id,
590               chosen_peer->interested, chosen_peer->choked_upload);
591
592   if (choked_peer != chosen_peer) {
593     if (choked_peer != nullptr) {
594       xbt_assert(not choked_peer->choked_upload, "Tries to choked a choked peer");
595       choked_peer->choked_upload = true;
596       updateActivePeersSet(choked_peer);
597       XBT_DEBUG("(%d) Sending a CHOKE to %d", id, choked_peer->id);
598       sendMessage(choked_peer->mailbox_, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE);
599     }
600     if (chosen_peer != nullptr) {
601       xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
602       chosen_peer->choked_upload = false;
603       active_peers.insert(chosen_peer);
604       chosen_peer->last_unchoke = simgrid::s4u::Engine::get_clock();
605       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id);
606       updateActivePeersSet(chosen_peer);
607       sendMessage(chosen_peer->mailbox_, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE);
608     }
609   }
610 }
611
612 /** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.*/
613 void Peer::updateInterestedAfterReceive()
614 {
615   for (auto& kv : connected_peers) {
616     Connection& remote_peer = kv.second;
617     if (remote_peer.am_interested) {
618       bool interested = false;
619       // Check if the peer still has a piece we want.
620       for (unsigned int i = 0; i < FILE_PIECES; i++)
621         if (hasNotPiece(i) && remote_peer.hasPiece(i)) {
622           interested = true;
623           break;
624         }
625
626       if (not interested) { // no more piece to download from connection
627         remote_peer.am_interested = false;
628         sendMessage(remote_peer.mailbox_, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE);
629       }
630     }
631   }
632 }
633
634 void Peer::updateBitfieldBlocks(int piece, int block_index, int block_length)
635 {
636   xbt_assert((piece >= 0 && static_cast<unsigned int>(piece) <= FILE_PIECES), "Wrong piece.");
637   xbt_assert((block_index >= 0 && static_cast<unsigned int>(block_index) <= PIECES_BLOCKS), "Wrong block : %d.",
638              block_index);
639   for (int i = block_index; i < (block_index + block_length); i++)
640     bitfield_blocks |= (1ULL << static_cast<unsigned int>(piece * PIECES_BLOCKS + i));
641 }
642
643 bool Peer::hasCompletedPiece(unsigned int piece)
644 {
645   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
646     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
647       return false;
648   return true;
649 }
650
651 int Peer::getFirstMissingBlockFrom(int piece)
652 {
653   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
654     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
655       return i;
656   return -1;
657 }
658
659 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
660 int Peer::partiallyDownloadedPiece(Connection* remote_peer)
661 {
662   for (unsigned int i = 0; i < FILE_PIECES; i++)
663     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i) && getFirstMissingBlockFrom(i) > 0)
664       return i;
665   return -1;
666 }