Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'Adrien.Gougeon/simgrid-master'
[simgrid.git] / examples / s4u / app-bittorrent / s4u-peer.cpp
1 /* Copyright (c) 2012-2020. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <algorithm>
7 #include <array>
8 #include <climits>
9
10 #include "s4u-peer.hpp"
11 #include "s4u-tracker.hpp"
12
13 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_bt_peer, "Messages specific for the peers");
14
15 /*
16  * User parameters for transferred file data. For the test, the default values are :
17  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
18  */
19 constexpr unsigned long FILE_PIECES   = 10UL;
20 constexpr unsigned long PIECES_BLOCKS = 5UL;
21 constexpr int BLOCK_SIZE              = 16384;
22
23 /** Number of blocks asked by each request */
24 constexpr unsigned long BLOCKS_REQUESTED = 2UL;
25
26 constexpr double SLEEP_DURATION     = 1.0;
27 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
28
29 /** Message sizes
30  * Sizes based on report by A. Legout et al, Understanding BitTorrent: An Experimental Perspective
31  * http://hal.inria.fr/inria-00000156/en
32  */
33 constexpr unsigned message_size(MessageType type)
34 {
35   constexpr std::array<unsigned, 10> sizes{{/* HANDSHAKE     */ 68,
36                                             /* CHOKE         */ 5,
37                                             /* UNCHOKE       */ 5,
38                                             /* INTERESTED    */ 5,
39                                             /* NOTINTERESTED */ 5,
40                                             /* HAVE          */ 9,
41                                             /* BITFIELD      */ 5,
42                                             /* REQUEST       */ 17,
43                                             /* PIECE         */ 13,
44                                             /* CANCEL        */ 17}};
45   return sizes[static_cast<int>(type)];
46 }
47
48 constexpr const char* message_name(MessageType type)
49 {
50   constexpr std::array<const char*, 10> names{{"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "HAVE",
51                                                "BITFIELD", "REQUEST", "PIECE", "CANCEL"}};
52   return names[static_cast<int>(type)];
53 }
54
55 Peer::Peer(std::vector<std::string> args)
56 {
57   // Check arguments
58   xbt_assert(args.size() == 3 || args.size() == 4, "Wrong number of arguments");
59   try {
60     id       = std::stoi(args[1]);
61     mailbox_ = simgrid::s4u::Mailbox::by_name(std::to_string(id));
62   } catch (const std::invalid_argument&) {
63     throw std::invalid_argument("Invalid ID:" + args[1]);
64   }
65   random.set_seed(id);
66
67   try {
68     deadline = std::stod(args[2]);
69   } catch (const std::invalid_argument&) {
70     throw std::invalid_argument("Invalid deadline:" + args[2]);
71   }
72   xbt_assert(deadline > 0, "Wrong deadline supplied");
73
74   if (args.size() == 4 && args[3] == "1") {
75     bitfield_       = (1U << FILE_PIECES) - 1U;
76     bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
77   }
78   pieces_count.resize(FILE_PIECES);
79
80   XBT_INFO("Hi, I'm joining the network with id %d", id);
81 }
82
83 /** Peer main function */
84 void Peer::operator()()
85 {
86   // Getting peer data from the tracker.
87   if (getPeersFromTracker()) {
88     XBT_DEBUG("Got %zu peers from the tracker. Current status is: %s", connected_peers.size(), getStatus().c_str());
89     begin_receive_time = simgrid::s4u::Engine::get_clock();
90     mailbox_->set_receiver(simgrid::s4u::Actor::self());
91     if (hasFinished()) {
92       sendHandshakeToAllPeers();
93     } else {
94       leech();
95     }
96     seed();
97   } else {
98     XBT_INFO("Couldn't contact the tracker.");
99   }
100
101   XBT_INFO("Here is my current status: %s", getStatus().c_str());
102 }
103
104 bool Peer::getPeersFromTracker()
105 {
106   simgrid::s4u::Mailbox* tracker_mailbox = simgrid::s4u::Mailbox::by_name(TRACKER_MAILBOX);
107   // Build the task to send to the tracker
108   auto* peer_request = new TrackerQuery(id, mailbox_);
109   try {
110     XBT_DEBUG("Sending a peer request to the tracker.");
111     tracker_mailbox->put(peer_request, TRACKER_COMM_SIZE, GET_PEERS_TIMEOUT);
112   } catch (const simgrid::TimeoutException&) {
113     XBT_DEBUG("Timeout expired when requesting peers to tracker");
114     delete peer_request;
115     return false;
116   }
117
118   try {
119     auto* answer = static_cast<TrackerAnswer*>(mailbox_->get(GET_PEERS_TIMEOUT));
120     // Add the peers the tracker gave us to our peer list.
121     for (auto const& peer_id : answer->getPeers())
122       if (id != peer_id)
123         connected_peers.emplace(peer_id, Connection(peer_id));
124     delete answer;
125   } catch (const simgrid::TimeoutException&) {
126     XBT_DEBUG("Timeout expired when requesting peers to tracker");
127     return false;
128   }
129   return true;
130 }
131
132 void Peer::sendHandshakeToAllPeers()
133 {
134   for (auto const& kv : connected_peers) {
135     const Connection& remote_peer = kv.second;
136     auto* handshake               = new Message(MessageType::HANDSHAKE, id, mailbox_);
137     remote_peer.mailbox_->put_init(handshake, message_size(MessageType::HANDSHAKE))->detach();
138     XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer.id);
139   }
140 }
141
142 void Peer::sendMessage(simgrid::s4u::Mailbox* mailbox, MessageType type, uint64_t size)
143 {
144   XBT_DEBUG("Sending %s to %s", message_name(type), mailbox->get_cname());
145   mailbox->put_init(new Message(type, id, bitfield_, mailbox_), size)->detach();
146 }
147
148 void Peer::sendBitfield(simgrid::s4u::Mailbox* mailbox)
149 {
150   XBT_DEBUG("Sending a BITFIELD to %s", mailbox->get_cname());
151   mailbox
152       ->put_init(new Message(MessageType::BITFIELD, id, bitfield_, mailbox_),
153                  message_size(MessageType::BITFIELD) + BITS_TO_BYTES(FILE_PIECES))
154       ->detach();
155 }
156
157 void Peer::sendPiece(simgrid::s4u::Mailbox* mailbox, unsigned int piece, int block_index, int block_length)
158 {
159   xbt_assert(not hasNotPiece(piece), "Tried to send a unavailable piece.");
160   XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->get_cname());
161   mailbox->put_init(new Message(MessageType::PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)
162       ->detach();
163 }
164
165 void Peer::sendHaveToAllPeers(unsigned int piece)
166 {
167   XBT_DEBUG("Sending HAVE message to all my peers");
168   for (auto const& kv : connected_peers) {
169     const Connection& remote_peer = kv.second;
170     remote_peer.mailbox_->put_init(new Message(MessageType::HAVE, id, mailbox_, piece), message_size(MessageType::HAVE))
171         ->detach();
172   }
173 }
174
175 void Peer::sendRequestTo(Connection* remote_peer, unsigned int piece)
176 {
177   remote_peer->current_piece = piece;
178   xbt_assert(remote_peer->hasPiece(piece));
179   int block_index = getFirstMissingBlockFrom(piece);
180   if (block_index != -1) {
181     int block_length = static_cast<int>(std::min(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index));
182     XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", remote_peer->mailbox_->get_cname(), piece, block_index,
183               block_length);
184     remote_peer->mailbox_
185         ->put_init(new Message(MessageType::REQUEST, id, mailbox_, piece, block_index, block_length),
186                    message_size(MessageType::REQUEST))
187         ->detach();
188   }
189 }
190
191 std::string Peer::getStatus() const
192 {
193   std::string res;
194   for (unsigned i = 0; i < FILE_PIECES; i++)
195     res += (bitfield_ & (1U << i)) ? '1' : '0';
196   return res;
197 }
198
199 bool Peer::hasFinished() const
200 {
201   return bitfield_ == (1U << FILE_PIECES) - 1U;
202 }
203
204 /** Indicates if the remote peer has a piece not stored by the local peer */
205 bool Peer::isInterestedBy(const Connection* remote_peer) const
206 {
207   return remote_peer->bitfield & (bitfield_ ^ ((1 << FILE_PIECES) - 1));
208 }
209
210 bool Peer::isInterestedByFree(const Connection* remote_peer) const
211 {
212   for (unsigned int i = 0; i < FILE_PIECES; i++)
213     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
214       return true;
215   return false;
216 }
217
218 void Peer::updatePiecesCountFromBitfield(unsigned int bitfield)
219 {
220   for (unsigned int i = 0; i < FILE_PIECES; i++)
221     if (bitfield & (1U << i))
222       pieces_count[i]++;
223 }
224
225 unsigned int Peer::countPieces(unsigned int bitfield) const
226 {
227   unsigned int count = 0U;
228   unsigned int n     = bitfield;
229   while (n) {
230     count += n & 1U;
231     n >>= 1U;
232   }
233   return count;
234 }
235
236 int Peer::nbInterestedPeers() const
237 {
238   int nb = 0;
239   for (auto const& kv : connected_peers)
240     if (kv.second.interested)
241       nb++;
242   return nb;
243 }
244
245 void Peer::leech()
246 {
247   double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
248   XBT_DEBUG("Start downloading.");
249
250   /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
251   sendHandshakeToAllPeers();
252   XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->get_cname());
253
254   void* data = nullptr;
255   while (simgrid::s4u::Engine::get_clock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
256     if (comm_received == nullptr) {
257       comm_received = mailbox_->get_async(&data);
258     }
259     if (comm_received->test()) {
260       message = static_cast<Message*>(data);
261       handleMessage();
262       delete message;
263       comm_received = nullptr;
264     } else {
265       // We don't execute the choke algorithm if we don't already have a piece
266       if (simgrid::s4u::Engine::get_clock() >= next_choked_update && countPieces(bitfield_) > 0) {
267         updateChokedPeers();
268         next_choked_update += UPDATE_CHOKED_INTERVAL;
269       } else {
270         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
271       }
272     }
273   }
274   if (hasFinished())
275     XBT_DEBUG("%d becomes a seeder", id);
276 }
277
278 void Peer::seed()
279 {
280   double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
281   XBT_DEBUG("Start seeding.");
282   // start the main seed loop
283   void* data = nullptr;
284   while (simgrid::s4u::Engine::get_clock() < deadline) {
285     if (comm_received == nullptr) {
286       comm_received = mailbox_->get_async(&data);
287     }
288     if (comm_received->test()) {
289       message = static_cast<Message*>(data);
290       handleMessage();
291       delete message;
292       comm_received = nullptr;
293     } else {
294       if (simgrid::s4u::Engine::get_clock() >= next_choked_update) {
295         updateChokedPeers();
296         // TODO: Change the choked peer algorithm when seeding.
297         next_choked_update += UPDATE_CHOKED_INTERVAL;
298       } else {
299         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
300       }
301     }
302   }
303 }
304
305 void Peer::updateActivePeersSet(Connection* remote_peer)
306 {
307   if (remote_peer->interested && not remote_peer->choked_upload)
308     active_peers.insert(remote_peer);
309   else
310     active_peers.erase(remote_peer);
311 }
312
313 void Peer::handleMessage()
314 {
315   XBT_DEBUG("Received a %s message from %s", message_name(message->type), message->return_mailbox->get_cname());
316
317   auto known_peer         = connected_peers.find(message->peer_id);
318   Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : &known_peer->second;
319   xbt_assert(remote_peer != nullptr || message->type == MessageType::HANDSHAKE,
320              "The impossible did happened: A not-in-our-list peer sent us a message.");
321
322   switch (message->type) {
323     case MessageType::HANDSHAKE:
324       // Check if the peer is in our connection list.
325       if (remote_peer == nullptr) {
326         XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id);
327         connected_peers.emplace(message->peer_id, Connection(message->peer_id));
328         sendMessage(message->return_mailbox, MessageType::HANDSHAKE, message_size(MessageType::HANDSHAKE));
329       }
330       // Send our bitfield to the peer
331       sendBitfield(message->return_mailbox);
332       break;
333     case MessageType::BITFIELD:
334       // Update the pieces list
335       updatePiecesCountFromBitfield(message->bitfield);
336       // Store the bitfield
337       remote_peer->bitfield = message->bitfield;
338       xbt_assert(not remote_peer->am_interested, "Should not be interested at first");
339       if (isInterestedBy(remote_peer)) {
340         remote_peer->am_interested = true;
341         sendMessage(message->return_mailbox, MessageType::INTERESTED, message_size(MessageType::INTERESTED));
342       }
343       break;
344     case MessageType::INTERESTED:
345       // Update the interested state of the peer.
346       remote_peer->interested = true;
347       updateActivePeersSet(remote_peer);
348       break;
349     case MessageType::NOTINTERESTED:
350       remote_peer->interested = false;
351       updateActivePeersSet(remote_peer);
352       break;
353     case MessageType::UNCHOKE:
354       xbt_assert(remote_peer->choked_download);
355       remote_peer->choked_download = false;
356       // Send requests to the peer, since it has unchoked us
357       if (remote_peer->am_interested)
358         requestNewPieceTo(remote_peer);
359       break;
360     case MessageType::CHOKE:
361       xbt_assert(not remote_peer->choked_download);
362       remote_peer->choked_download = true;
363       if (remote_peer->current_piece != -1)
364         removeCurrentPiece(remote_peer, remote_peer->current_piece);
365       break;
366     case MessageType::HAVE:
367       XBT_DEBUG("\t for piece %d", message->piece);
368       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
369                  "Wrong HAVE message received");
370       remote_peer->bitfield = remote_peer->bitfield | (1U << static_cast<unsigned int>(message->piece));
371       pieces_count[message->piece]++;
372       // If the piece is in our pieces, we tell the peer that we are interested.
373       if (not remote_peer->am_interested && hasNotPiece(message->piece)) {
374         remote_peer->am_interested = true;
375         sendMessage(message->return_mailbox, MessageType::INTERESTED, message_size(MessageType::INTERESTED));
376         if (not remote_peer->choked_download)
377           requestNewPieceTo(remote_peer);
378       }
379       break;
380     case MessageType::REQUEST:
381       xbt_assert(remote_peer->interested);
382       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
383                  "Wrong HAVE message received");
384       if (not remote_peer->choked_upload) {
385         XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
386                   message->block_index + message->block_length);
387         if (not hasNotPiece(message->piece)) {
388           sendPiece(message->return_mailbox, message->piece, message->block_index, message->block_length);
389         }
390       } else {
391         XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
392       }
393       break;
394     case MessageType::PIECE:
395       XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
396                 message->block_index + message->block_length);
397       xbt_assert(not remote_peer->choked_download);
398       xbt_assert(not remote_peer->choked_download, "Can't received a piece if I'm choked !");
399       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
400                  "Wrong piece received");
401       // TODO: Execute a computation.
402       if (hasNotPiece(static_cast<unsigned int>(message->piece))) {
403         updateBitfieldBlocks(message->piece, message->block_index, message->block_length);
404         if (hasCompletedPiece(static_cast<unsigned int>(message->piece))) {
405           // Removing the piece from our piece list
406           removeCurrentPiece(remote_peer, message->piece);
407           // Setting the fact that we have the piece
408           bitfield_ = bitfield_ | (1U << static_cast<unsigned int>(message->piece));
409           XBT_DEBUG("My status is now %s", getStatus().c_str());
410           // Sending the information to all the peers we are connected to
411           sendHaveToAllPeers(message->piece);
412           // sending UNINTERESTED to peers that do not have what we want.
413           updateInterestedAfterReceive();
414         } else {                                      // piece not completed
415           sendRequestTo(remote_peer, message->piece); // ask for the next block
416         }
417       } else {
418         XBT_DEBUG("However, we already have it");
419         requestNewPieceTo(remote_peer);
420       }
421       break;
422     case MessageType::CANCEL:
423       break;
424     default:
425       THROW_IMPOSSIBLE;
426   }
427   // Update the peer speed.
428   if (remote_peer) {
429     remote_peer->addSpeedValue(1.0 / (simgrid::s4u::Engine::get_clock() - begin_receive_time));
430   }
431   begin_receive_time = simgrid::s4u::Engine::get_clock();
432 }
433
434 /** Selects the appropriate piece to download and requests it to the remote_peer */
435 void Peer::requestNewPieceTo(Connection* remote_peer)
436 {
437   int piece = selectPieceToDownload(remote_peer);
438   if (piece != -1) {
439     current_pieces |= (1U << (unsigned int)piece);
440     sendRequestTo(remote_peer, piece);
441   }
442 }
443
444 void Peer::removeCurrentPiece(Connection* remote_peer, unsigned int current_piece)
445 {
446   current_pieces &= ~(1U << current_piece);
447   remote_peer->current_piece = -1;
448 }
449
450 /** @brief Return the piece to be downloaded
451  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
452  * If a piece is partially downloaded, this piece will be selected prioritarily
453  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
454  * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
455  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
456  * @param remote_peer: information about the connection
457  * @return the piece to download if possible. -1 otherwise
458  */
459 int Peer::selectPieceToDownload(const Connection* remote_peer)
460 {
461   int piece = partiallyDownloadedPiece(remote_peer);
462   // strict priority policy
463   if (piece != -1)
464     return piece;
465
466   // end game mode
467   if (countPieces(current_pieces) >= (FILE_PIECES - countPieces(bitfield_)) && isInterestedBy(remote_peer)) {
468     int nb_interesting_pieces = 0;
469     // compute the number of interesting pieces
470     for (unsigned int i = 0; i < FILE_PIECES; i++)
471       if (remotePeerHasMissingPiece(remote_peer, i))
472         nb_interesting_pieces++;
473
474     xbt_assert(nb_interesting_pieces != 0);
475     // get a random interesting piece
476     int random_piece_index = random.uniform_int(0, nb_interesting_pieces - 1);
477     int current_index      = 0;
478     for (unsigned int i = 0; i < FILE_PIECES; i++) {
479       if (remotePeerHasMissingPiece(remote_peer, i)) {
480         if (random_piece_index == current_index) {
481           piece = i;
482           break;
483         }
484         current_index++;
485       }
486     }
487     xbt_assert(piece != -1);
488     return piece;
489   }
490   // Random first policy
491   if (countPieces(bitfield_) < 4 && isInterestedByFree(remote_peer)) {
492     int nb_interesting_pieces = 0;
493     // compute the number of interesting pieces
494     for (unsigned int i = 0; i < FILE_PIECES; i++)
495       if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i))
496         nb_interesting_pieces++;
497     xbt_assert(nb_interesting_pieces != 0);
498     // get a random interesting piece
499     int random_piece_index = random.uniform_int(0, nb_interesting_pieces - 1);
500     int current_index      = 0;
501     for (unsigned int i = 0; i < FILE_PIECES; i++) {
502       if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i)) {
503         if (random_piece_index == current_index) {
504           piece = i;
505           break;
506         }
507         current_index++;
508       }
509     }
510     xbt_assert(piece != -1);
511     return piece;
512   } else { // Rarest first policy
513     short min         = SHRT_MAX;
514     int nb_min_pieces = 0;
515     int current_index = 0;
516     // compute the smallest number of copies of available pieces
517     for (unsigned int i = 0; i < FILE_PIECES; i++) {
518       if (pieces_count[i] < min && remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i))
519         min = pieces_count[i];
520     }
521
522     xbt_assert(min != SHRT_MAX || not isInterestedByFree(remote_peer));
523     // compute the number of rarest pieces
524     for (unsigned int i = 0; i < FILE_PIECES; i++)
525       if (pieces_count[i] == min && remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i))
526         nb_min_pieces++;
527
528     xbt_assert(nb_min_pieces != 0 || not isInterestedByFree(remote_peer));
529     // get a random rarest piece
530     int random_rarest_index = 0;
531     if (nb_min_pieces > 0) {
532       random_rarest_index = random.uniform_int(0, nb_min_pieces - 1);
533     }
534     for (unsigned int i = 0; i < FILE_PIECES; i++)
535       if (pieces_count[i] == min && remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i)) {
536         if (random_rarest_index == current_index) {
537           piece = i;
538           break;
539         }
540         current_index++;
541       }
542
543     xbt_assert(piece != -1 || not isInterestedByFree(remote_peer));
544     return piece;
545   }
546 }
547
548 void Peer::updateChokedPeers()
549 {
550   if (nbInterestedPeers() == 0)
551     return;
552   XBT_DEBUG("(%d) update_choked peers %zu active peers", id, active_peers.size());
553   // update the current round
554   round_                  = (round_ + 1) % 3;
555   Connection* chosen_peer = nullptr;
556   // select first active peer and remove it from the set
557   Connection* choked_peer;
558   if (active_peers.empty()) {
559     choked_peer = nullptr;
560   } else {
561     choked_peer = *active_peers.begin();
562     active_peers.erase(choked_peer);
563   }
564
565   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
566   if (hasFinished()) {
567     double unchoke_time = simgrid::s4u::Engine::get_clock() + 1;
568     for (auto& kv : connected_peers) {
569       Connection& remote_peer = kv.second;
570       if (remote_peer.last_unchoke < unchoke_time && remote_peer.interested && remote_peer.choked_upload) {
571         unchoke_time = remote_peer.last_unchoke;
572         chosen_peer  = &remote_peer;
573       }
574     }
575   } else {
576     // Random optimistic unchoking
577     if (round_ == 0) {
578       int j = 0;
579       do {
580         // We choose a random peer to unchoke.
581         auto chosen_peer_it = connected_peers.begin();
582         std::advance(chosen_peer_it, random.uniform_int(0, static_cast<int>(connected_peers.size() - 1)));
583         chosen_peer = &chosen_peer_it->second;
584         if (not chosen_peer->interested || not chosen_peer->choked_upload)
585           chosen_peer = nullptr;
586         else
587           XBT_DEBUG("Nothing to do, keep going");
588         j++;
589       } while (chosen_peer == nullptr && j < MAXIMUM_PEERS);
590     } else {
591       // Use the "fastest download" policy.
592       double fastest_speed = 0.0;
593       for (auto& kv : connected_peers) {
594         Connection& remote_peer = kv.second;
595         if (remote_peer.peer_speed > fastest_speed && remote_peer.choked_upload && remote_peer.interested) {
596           fastest_speed = remote_peer.peer_speed;
597           chosen_peer   = &remote_peer;
598         }
599       }
600     }
601   }
602
603   if (chosen_peer != nullptr)
604     XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", id, chosen_peer->id,
605               chosen_peer->interested, chosen_peer->choked_upload);
606
607   if (choked_peer != chosen_peer) {
608     if (choked_peer != nullptr) {
609       xbt_assert(not choked_peer->choked_upload, "Tries to choked a choked peer");
610       choked_peer->choked_upload = true;
611       updateActivePeersSet(choked_peer);
612       XBT_DEBUG("(%d) Sending a CHOKE to %d", id, choked_peer->id);
613       sendMessage(choked_peer->mailbox_, MessageType::CHOKE, message_size(MessageType::CHOKE));
614     }
615     if (chosen_peer != nullptr) {
616       xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
617       chosen_peer->choked_upload = false;
618       active_peers.insert(chosen_peer);
619       chosen_peer->last_unchoke = simgrid::s4u::Engine::get_clock();
620       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id);
621       updateActivePeersSet(chosen_peer);
622       sendMessage(chosen_peer->mailbox_, MessageType::UNCHOKE, message_size(MessageType::UNCHOKE));
623     }
624   }
625 }
626
627 /** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.*/
628 void Peer::updateInterestedAfterReceive()
629 {
630   for (auto& kv : connected_peers) {
631     Connection& remote_peer = kv.second;
632     if (remote_peer.am_interested) {
633       bool interested = false;
634       // Check if the peer still has a piece we want.
635       for (unsigned int i = 0; i < FILE_PIECES; i++)
636         if (remotePeerHasMissingPiece(&remote_peer, i)) {
637           interested = true;
638           break;
639         }
640
641       if (not interested) { // no more piece to download from connection
642         remote_peer.am_interested = false;
643         sendMessage(remote_peer.mailbox_, MessageType::NOTINTERESTED, message_size(MessageType::NOTINTERESTED));
644       }
645     }
646   }
647 }
648
649 void Peer::updateBitfieldBlocks(int piece, int block_index, int block_length)
650 {
651   xbt_assert((piece >= 0 && static_cast<unsigned int>(piece) <= FILE_PIECES), "Wrong piece.");
652   xbt_assert((block_index >= 0 && static_cast<unsigned int>(block_index) <= PIECES_BLOCKS), "Wrong block : %d.",
653              block_index);
654   for (int i = block_index; i < (block_index + block_length); i++)
655     bitfield_blocks |= (1ULL << static_cast<unsigned int>(piece * PIECES_BLOCKS + i));
656 }
657
658 bool Peer::hasCompletedPiece(unsigned int piece) const
659 {
660   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
661     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
662       return false;
663   return true;
664 }
665
666 int Peer::getFirstMissingBlockFrom(int piece) const
667 {
668   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
669     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
670       return i;
671   return -1;
672 }
673
674 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
675 int Peer::partiallyDownloadedPiece(const Connection* remote_peer) const
676 {
677   for (unsigned int i = 0; i < FILE_PIECES; i++)
678     if (remotePeerHasMissingPiece(remote_peer, i) && isNotDownloadingPiece(i) && getFirstMissingBlockFrom(i) > 0)
679       return i;
680   return -1;
681 }