Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
1f28b3ee41b72c0f160bba4b4071e9edfa5753b2
[simgrid.git] / examples / s4u / app-bittorrent / s4u-peer.cpp
1 /* Copyright (c) 2012-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <algorithm>
7 #include <climits>
8
9 #include "s4u-peer.hpp"
10 #include "s4u-tracker.hpp"
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_bt_peer, "Messages specific for the peers");
13
14 /*
15  * User parameters for transferred file data. For the test, the default values are :
16  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
17  */
18 constexpr unsigned long FILE_PIECES   = 10UL;
19 constexpr unsigned long PIECES_BLOCKS = 5UL;
20 constexpr int BLOCK_SIZE              = 16384;
21
22 /** Number of blocks asked by each request */
23 constexpr unsigned long BLOCKS_REQUESTED = 2UL;
24
25 constexpr bool ENABLE_END_GAME_MODE = true;
26 constexpr double SLEEP_DURATION     = 1.0;
27 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
28
29 Peer::Peer(std::vector<std::string> args)
30 {
31   // Check arguments
32   xbt_assert(args.size() == 3 || args.size() == 4, "Wrong number of arguments");
33   try {
34     id       = std::stoi(args[1]);
35     mailbox_ = simgrid::s4u::Mailbox::by_name(std::to_string(id));
36   } catch (std::invalid_argument& ia) {
37     throw std::invalid_argument(std::string("Invalid ID:") + args[1].c_str());
38   }
39
40   try {
41     deadline = std::stod(args[2]);
42   } catch (std::invalid_argument& ia) {
43     throw std::invalid_argument(std::string("Invalid deadline:") + args[2].c_str());
44   }
45   xbt_assert(deadline > 0, "Wrong deadline supplied");
46
47   stream = simgrid::s4u::this_actor::get_host()->extension<HostBittorrent>()->getStream();
48
49   if (args.size() == 4 && args[3] == "1") {
50     bitfield_       = (1U << FILE_PIECES) - 1U;
51     bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
52   }
53   pieces_count.resize(FILE_PIECES);
54
55   XBT_INFO("Hi, I'm joining the network with id %d", id);
56 }
57
58 /** Peer main function */
59 void Peer::operator()()
60 {
61   // Getting peer data from the tracker.
62   if (getPeersFromTracker()) {
63     XBT_DEBUG("Got %zu peers from the tracker. Current status is: %s", connected_peers.size(), getStatus().c_str());
64     begin_receive_time = simgrid::s4u::Engine::get_clock();
65     mailbox_->set_receiver(simgrid::s4u::Actor::self());
66     if (hasFinished()) {
67       sendHandshakeToAllPeers();
68     } else {
69       leech();
70     }
71     seed();
72   } else {
73     XBT_INFO("Couldn't contact the tracker.");
74   }
75
76   XBT_INFO("Here is my current status: %s", getStatus().c_str());
77 }
78
79 bool Peer::getPeersFromTracker()
80 {
81   simgrid::s4u::Mailbox* tracker_mailbox = simgrid::s4u::Mailbox::by_name(TRACKER_MAILBOX);
82   // Build the task to send to the tracker
83   TrackerQuery* peer_request = new TrackerQuery(id, mailbox_);
84   try {
85     XBT_DEBUG("Sending a peer request to the tracker.");
86     tracker_mailbox->put(peer_request, TRACKER_COMM_SIZE, GET_PEERS_TIMEOUT);
87   } catch (simgrid::TimeoutError& e) {
88     XBT_DEBUG("Timeout expired when requesting peers to tracker");
89     delete peer_request;
90     return false;
91   }
92
93   try {
94     TrackerAnswer* answer = static_cast<TrackerAnswer*>(mailbox_->get(GET_PEERS_TIMEOUT));
95     // Add the peers the tracker gave us to our peer list.
96     for (auto const& peer_id : answer->getPeers())
97       if (id != peer_id)
98         connected_peers.emplace(peer_id, Connection(peer_id));
99     delete answer;
100   } catch (simgrid::TimeoutError& e) {
101     XBT_DEBUG("Timeout expired when requesting peers to tracker");
102     return false;
103   }
104   return true;
105 }
106
107 void Peer::sendHandshakeToAllPeers()
108 {
109   for (auto const& kv : connected_peers) {
110     const Connection& remote_peer = kv.second;
111     Message* handshake      = new Message(MESSAGE_HANDSHAKE, id, mailbox_);
112     remote_peer.mailbox_->put_init(handshake, MESSAGE_HANDSHAKE_SIZE)->detach();
113     XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer.id);
114   }
115 }
116
117 void Peer::sendMessage(simgrid::s4u::Mailbox* mailbox, e_message_type type, uint64_t size)
118 {
119   const char* type_names[6] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "CANCEL"};
120   XBT_DEBUG("Sending %s to %s", type_names[type], mailbox->get_cname());
121   mailbox->put_init(new Message(type, id, bitfield_, mailbox_), size)->detach();
122 }
123
124 void Peer::sendBitfield(simgrid::s4u::Mailbox* mailbox)
125 {
126   XBT_DEBUG("Sending a BITFIELD to %s", mailbox->get_cname());
127   mailbox
128       ->put_init(new Message(MESSAGE_BITFIELD, id, bitfield_, mailbox_),
129                  MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES))
130       ->detach();
131 }
132
133 void Peer::sendPiece(simgrid::s4u::Mailbox* mailbox, unsigned int piece, int block_index, int block_length)
134 {
135   xbt_assert(not hasNotPiece(piece), "Tried to send a unavailable piece.");
136   XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->get_cname());
137   mailbox->put_init(new Message(MESSAGE_PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)->detach();
138 }
139
140 void Peer::sendHaveToAllPeers(unsigned int piece)
141 {
142   XBT_DEBUG("Sending HAVE message to all my peers");
143   for (auto const& kv : connected_peers) {
144     const Connection& remote_peer = kv.second;
145     remote_peer.mailbox_->put_init(new Message(MESSAGE_HAVE, id, mailbox_, piece), MESSAGE_HAVE_SIZE)->detach();
146   }
147 }
148
149 void Peer::sendRequestTo(Connection* remote_peer, unsigned int piece)
150 {
151   remote_peer->current_piece = piece;
152   xbt_assert(remote_peer->hasPiece(piece));
153   int block_index = getFirstMissingBlockFrom(piece);
154   if (block_index != -1) {
155     int block_length = std::min(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
156     XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", remote_peer->mailbox_->get_cname(), piece, block_index,
157               block_length);
158     remote_peer->mailbox_
159         ->put_init(new Message(MESSAGE_REQUEST, id, mailbox_, piece, block_index, block_length), MESSAGE_REQUEST_SIZE)
160         ->detach();
161   }
162 }
163
164 std::string Peer::getStatus()
165 {
166   std::string res = std::string("");
167   for (int i = FILE_PIECES - 1; i >= 0; i--)
168     res = std::string((bitfield_ & (1U << i)) ? "1" : "0") + res;
169   return res;
170 }
171
172 bool Peer::hasFinished()
173 {
174   return bitfield_ == (1U << FILE_PIECES) - 1U;
175 }
176
177 /** Indicates if the remote peer has a piece not stored by the local peer */
178 bool Peer::isInterestedBy(Connection* remote_peer)
179 {
180   return remote_peer->bitfield & (bitfield_ ^ ((1 << FILE_PIECES) - 1));
181 }
182
183 bool Peer::isInterestedByFree(Connection* remote_peer)
184 {
185   for (unsigned int i = 0; i < FILE_PIECES; i++)
186     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
187       return true;
188   return false;
189 }
190
191 void Peer::updatePiecesCountFromBitfield(unsigned int bitfield)
192 {
193   for (unsigned int i = 0; i < FILE_PIECES; i++)
194     if (bitfield & (1U << i))
195       pieces_count[i]++;
196 }
197
198 unsigned int Peer::countPieces(unsigned int bitfield)
199 {
200   unsigned int count = 0U;
201   unsigned int n     = bitfield;
202   while (n) {
203     count += n & 1U;
204     n >>= 1U;
205   }
206   return count;
207 }
208
209 int Peer::nbInterestedPeers()
210 {
211   int nb = 0;
212   for (auto const& kv : connected_peers)
213     if (kv.second.interested)
214       nb++;
215   return nb;
216 }
217
218 void Peer::leech()
219 {
220   double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
221   XBT_DEBUG("Start downloading.");
222
223   /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
224   sendHandshakeToAllPeers();
225   XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->get_cname());
226
227   void* data = nullptr;
228   while (simgrid::s4u::Engine::get_clock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
229     if (comm_received == nullptr) {
230       comm_received = mailbox_->get_async(&data);
231     }
232     if (comm_received->test()) {
233       message = static_cast<Message*>(data);
234       handleMessage();
235       delete message;
236       comm_received = nullptr;
237     } else {
238       // We don't execute the choke algorithm if we don't already have a piece
239       if (simgrid::s4u::Engine::get_clock() >= next_choked_update && countPieces(bitfield_) > 0) {
240         updateChokedPeers();
241         next_choked_update += UPDATE_CHOKED_INTERVAL;
242       } else {
243         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
244       }
245     }
246   }
247   if (hasFinished())
248     XBT_DEBUG("%d becomes a seeder", id);
249 }
250
251 void Peer::seed()
252 {
253   double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
254   XBT_DEBUG("Start seeding.");
255   // start the main seed loop
256   void* data = nullptr;
257   while (simgrid::s4u::Engine::get_clock() < deadline) {
258     if (comm_received == nullptr) {
259       comm_received = mailbox_->get_async(&data);
260     }
261     if (comm_received->test()) {
262       message = static_cast<Message*>(data);
263       handleMessage();
264       delete message;
265       comm_received = nullptr;
266     } else {
267       if (simgrid::s4u::Engine::get_clock() >= next_choked_update) {
268         updateChokedPeers();
269         // TODO: Change the choked peer algorithm when seeding.
270         next_choked_update += UPDATE_CHOKED_INTERVAL;
271       } else {
272         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
273       }
274     }
275   }
276 }
277
278 void Peer::updateActivePeersSet(Connection* remote_peer)
279 {
280   if (remote_peer->interested && not remote_peer->choked_upload)
281     active_peers.insert(remote_peer);
282   else
283     active_peers.erase(remote_peer);
284 }
285
286 void Peer::handleMessage()
287 {
288   const char* type_names[10] = {"HANDSHAKE", "CHOKE",    "UNCHOKE", "INTERESTED", "NOTINTERESTED",
289                                 "HAVE",      "BITFIELD", "REQUEST", "PIECE",      "CANCEL"};
290
291   XBT_DEBUG("Received a %s message from %s", type_names[message->type], message->return_mailbox->get_cname());
292
293   auto known_peer         = connected_peers.find(message->peer_id);
294   Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : &known_peer->second;
295   xbt_assert(remote_peer != nullptr || message->type == MESSAGE_HANDSHAKE,
296              "The impossible did happened: A not-in-our-list peer sent us a message.");
297
298   switch (message->type) {
299     case MESSAGE_HANDSHAKE:
300       // Check if the peer is in our connection list.
301       if (remote_peer == nullptr) {
302         XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id);
303         connected_peers.emplace(message->peer_id, Connection(message->peer_id));
304         sendMessage(message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE);
305       }
306       // Send our bitfield to the peer
307       sendBitfield(message->return_mailbox);
308       break;
309     case MESSAGE_BITFIELD:
310       // Update the pieces list
311       updatePiecesCountFromBitfield(message->bitfield);
312       // Store the bitfield
313       remote_peer->bitfield = message->bitfield;
314       xbt_assert(not remote_peer->am_interested, "Should not be interested at first");
315       if (isInterestedBy(remote_peer)) {
316         remote_peer->am_interested = true;
317         sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
318       }
319       break;
320     case MESSAGE_INTERESTED:
321       // Update the interested state of the peer.
322       remote_peer->interested = true;
323       updateActivePeersSet(remote_peer);
324       break;
325     case MESSAGE_NOTINTERESTED:
326       remote_peer->interested = false;
327       updateActivePeersSet(remote_peer);
328       break;
329     case MESSAGE_UNCHOKE:
330       xbt_assert(remote_peer->choked_download);
331       remote_peer->choked_download = false;
332       // Send requests to the peer, since it has unchoked us
333       if (remote_peer->am_interested)
334         requestNewPieceTo(remote_peer);
335       break;
336     case MESSAGE_CHOKE:
337       xbt_assert(not remote_peer->choked_download);
338       remote_peer->choked_download = true;
339       if (remote_peer->current_piece != -1)
340         removeCurrentPiece(remote_peer, remote_peer->current_piece);
341       break;
342     case MESSAGE_HAVE:
343       XBT_DEBUG("\t for piece %d", message->piece);
344       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
345                  "Wrong HAVE message received");
346       remote_peer->bitfield = remote_peer->bitfield | (1U << static_cast<unsigned int>(message->piece));
347       pieces_count[message->piece]++;
348       // If the piece is in our pieces, we tell the peer that we are interested.
349       if (not remote_peer->am_interested && hasNotPiece(message->piece)) {
350         remote_peer->am_interested = true;
351         sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
352         if (not remote_peer->choked_download)
353           requestNewPieceTo(remote_peer);
354       }
355       break;
356     case MESSAGE_REQUEST:
357       xbt_assert(remote_peer->interested);
358       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
359                  "Wrong HAVE message received");
360       if (not remote_peer->choked_upload) {
361         XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
362                   message->block_index + message->block_length);
363         if (not hasNotPiece(message->piece)) {
364           sendPiece(message->return_mailbox, message->piece, message->block_index, message->block_length);
365         }
366       } else {
367         XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
368       }
369       break;
370     case MESSAGE_PIECE:
371       XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
372                 message->block_index + message->block_length);
373       xbt_assert(not remote_peer->choked_download);
374       xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE,
375                  "Can't received a piece if I'm not interested without end-game mode!"
376                  "piece (%d) bitfield (%u) remote bitfield (%u)",
377                  message->piece, bitfield_, remote_peer->bitfield);
378       xbt_assert(not remote_peer->choked_download, "Can't received a piece if I'm choked !");
379       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
380                  "Wrong piece received");
381       // TODO: Execute a computation.
382       if (hasNotPiece(static_cast<unsigned int>(message->piece))) {
383         updateBitfieldBlocks(message->piece, message->block_index, message->block_length);
384         if (hasCompletedPiece(static_cast<unsigned int>(message->piece))) {
385           // Removing the piece from our piece list
386           removeCurrentPiece(remote_peer, message->piece);
387           // Setting the fact that we have the piece
388           bitfield_ = bitfield_ | (1U << static_cast<unsigned int>(message->piece));
389           XBT_DEBUG("My status is now %s", getStatus().c_str());
390           // Sending the information to all the peers we are connected to
391           sendHaveToAllPeers(message->piece);
392           // sending UNINTERESTED to peers that do not have what we want.
393           updateInterestedAfterReceive();
394         } else {                                      // piece not completed
395           sendRequestTo(remote_peer, message->piece); // ask for the next block
396         }
397       } else {
398         XBT_DEBUG("However, we already have it");
399         xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
400         requestNewPieceTo(remote_peer);
401       }
402       break;
403     case MESSAGE_CANCEL:
404       break;
405     default:
406       THROW_IMPOSSIBLE;
407   }
408   // Update the peer speed.
409   if (remote_peer) {
410     remote_peer->addSpeedValue(1.0 / (simgrid::s4u::Engine::get_clock() - begin_receive_time));
411   }
412   begin_receive_time = simgrid::s4u::Engine::get_clock();
413 }
414
415 /** Selects the appropriate piece to download and requests it to the remote_peer */
416 void Peer::requestNewPieceTo(Connection* remote_peer)
417 {
418   int piece = selectPieceToDownload(remote_peer);
419   if (piece != -1) {
420     current_pieces |= (1U << (unsigned int)piece);
421     sendRequestTo(remote_peer, piece);
422   }
423 }
424
425 void Peer::removeCurrentPiece(Connection* remote_peer, unsigned int current_piece)
426 {
427   current_pieces &= ~(1U << current_piece);
428   remote_peer->current_piece = -1;
429 }
430
431 /** @brief Return the piece to be downloaded
432  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
433  * If a piece is partially downloaded, this piece will be selected prioritarily
434  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
435  * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
436  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
437  * @param remote_peer: information about the connection
438  * @return the piece to download if possible. -1 otherwise
439  */
440 int Peer::selectPieceToDownload(Connection* remote_peer)
441 {
442   int piece = partiallyDownloadedPiece(remote_peer);
443   // strict priority policy
444   if (piece != -1)
445     return piece;
446
447   // end game mode
448   if (countPieces(current_pieces) >= (FILE_PIECES - countPieces(bitfield_)) && isInterestedBy(remote_peer)) {
449     if (not ENABLE_END_GAME_MODE)
450       return -1;
451     int nb_interesting_pieces = 0;
452     // compute the number of interesting pieces
453     for (unsigned int i = 0; i < FILE_PIECES; i++)
454       if (hasNotPiece(i) && remote_peer->hasPiece(i))
455         nb_interesting_pieces++;
456
457     xbt_assert(nb_interesting_pieces != 0);
458     // get a random interesting piece
459     int random_piece_index = RngStream_RandInt(stream, 0, nb_interesting_pieces - 1);
460     int current_index      = 0;
461     for (unsigned int i = 0; i < FILE_PIECES; i++) {
462       if (hasNotPiece(i) && remote_peer->hasPiece(i)) {
463         if (random_piece_index == current_index) {
464           piece = i;
465           break;
466         }
467         current_index++;
468       }
469     }
470     xbt_assert(piece != -1);
471     return piece;
472   }
473   // Random first policy
474   if (countPieces(bitfield_) < 4 && isInterestedByFree(remote_peer)) {
475     int nb_interesting_pieces = 0;
476     // compute the number of interesting pieces
477     for (unsigned int i = 0; i < FILE_PIECES; i++)
478       if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
479         nb_interesting_pieces++;
480     xbt_assert(nb_interesting_pieces != 0);
481     // get a random interesting piece
482     int random_piece_index = RngStream_RandInt(stream, 0, nb_interesting_pieces - 1);
483     int current_index      = 0;
484     for (unsigned int i = 0; i < FILE_PIECES; i++) {
485       if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
486         if (random_piece_index == current_index) {
487           piece = i;
488           break;
489         }
490         current_index++;
491       }
492     }
493     xbt_assert(piece != -1);
494     return piece;
495   } else { // Rarest first policy
496     short min         = SHRT_MAX;
497     int nb_min_pieces = 0;
498     int current_index = 0;
499     // compute the smallest number of copies of available pieces
500     for (unsigned int i = 0; i < FILE_PIECES; i++) {
501       if (pieces_count[i] < min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
502         min = pieces_count[i];
503     }
504
505     xbt_assert(min != SHRT_MAX || not isInterestedByFree(remote_peer));
506     // compute the number of rarest pieces
507     for (unsigned int i = 0; i < FILE_PIECES; i++)
508       if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
509         nb_min_pieces++;
510
511     xbt_assert(nb_min_pieces != 0 || not isInterestedByFree(remote_peer));
512     // get a random rarest piece
513     int random_rarest_index = RngStream_RandInt(stream, 0, nb_min_pieces - 1);
514     for (unsigned int i = 0; i < FILE_PIECES; i++)
515       if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
516         if (random_rarest_index == current_index) {
517           piece = i;
518           break;
519         }
520         current_index++;
521       }
522
523     xbt_assert(piece != -1 || not isInterestedByFree(remote_peer));
524     return piece;
525   }
526 }
527
528 void Peer::updateChokedPeers()
529 {
530   if (nbInterestedPeers() == 0)
531     return;
532   XBT_DEBUG("(%d) update_choked peers %zu active peers", id, active_peers.size());
533   // update the current round
534   round_                  = (round_ + 1) % 3;
535   Connection* chosen_peer = nullptr;
536   // select first active peer and remove it from the set
537   Connection* choked_peer;
538   if (active_peers.empty()) {
539     choked_peer = nullptr;
540   } else {
541     choked_peer = *active_peers.begin();
542     active_peers.erase(choked_peer);
543   }
544
545   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
546   if (hasFinished()) {
547     double unchoke_time = simgrid::s4u::Engine::get_clock() + 1;
548     for (auto& kv : connected_peers) {
549       Connection& remote_peer = kv.second;
550       if (remote_peer.last_unchoke < unchoke_time && remote_peer.interested && remote_peer.choked_upload) {
551         unchoke_time = remote_peer.last_unchoke;
552         chosen_peer  = &remote_peer;
553       }
554     }
555   } else {
556     // Random optimistic unchoking
557     if (round_ == 0) {
558       int j = 0;
559       do {
560         // We choose a random peer to unchoke.
561         std::unordered_map<int, Connection>::iterator chosen_peer_it = connected_peers.begin();
562         std::advance(chosen_peer_it, RngStream_RandInt(stream, 0, connected_peers.size() - 1));
563         chosen_peer = &chosen_peer_it->second;
564         if (not chosen_peer->interested || not chosen_peer->choked_upload)
565           chosen_peer = nullptr;
566         else
567           XBT_DEBUG("Nothing to do, keep going");
568         j++;
569       } while (chosen_peer == nullptr && j < MAXIMUM_PEERS);
570     } else {
571       // Use the "fastest download" policy.
572       double fastest_speed = 0.0;
573       for (auto& kv : connected_peers) {
574         Connection& remote_peer = kv.second;
575         if (remote_peer.peer_speed > fastest_speed && remote_peer.choked_upload && remote_peer.interested) {
576           fastest_speed = remote_peer.peer_speed;
577           chosen_peer   = &remote_peer;
578         }
579       }
580     }
581   }
582
583   if (chosen_peer != nullptr)
584     XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", id, chosen_peer->id,
585               chosen_peer->interested, chosen_peer->choked_upload);
586
587   if (choked_peer != chosen_peer) {
588     if (choked_peer != nullptr) {
589       xbt_assert(not choked_peer->choked_upload, "Tries to choked a choked peer");
590       choked_peer->choked_upload = true;
591       updateActivePeersSet(choked_peer);
592       XBT_DEBUG("(%d) Sending a CHOKE to %d", id, choked_peer->id);
593       sendMessage(choked_peer->mailbox_, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE);
594     }
595     if (chosen_peer != nullptr) {
596       xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
597       chosen_peer->choked_upload = false;
598       active_peers.insert(chosen_peer);
599       chosen_peer->last_unchoke = simgrid::s4u::Engine::get_clock();
600       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id);
601       updateActivePeersSet(chosen_peer);
602       sendMessage(chosen_peer->mailbox_, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE);
603     }
604   }
605 }
606
607 /** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.*/
608 void Peer::updateInterestedAfterReceive()
609 {
610   for (auto& kv : connected_peers) {
611     Connection& remote_peer = kv.second;
612     if (remote_peer.am_interested) {
613       bool interested = false;
614       // Check if the peer still has a piece we want.
615       for (unsigned int i = 0; i < FILE_PIECES; i++)
616         if (hasNotPiece(i) && remote_peer.hasPiece(i)) {
617           interested = true;
618           break;
619         }
620
621       if (not interested) { // no more piece to download from connection
622         remote_peer.am_interested = false;
623         sendMessage(remote_peer.mailbox_, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE);
624       }
625     }
626   }
627 }
628
629 void Peer::updateBitfieldBlocks(int piece, int block_index, int block_length)
630 {
631   xbt_assert((piece >= 0 && static_cast<unsigned int>(piece) <= FILE_PIECES), "Wrong piece.");
632   xbt_assert((block_index >= 0 && static_cast<unsigned int>(block_index) <= PIECES_BLOCKS), "Wrong block : %d.",
633              block_index);
634   for (int i = block_index; i < (block_index + block_length); i++)
635     bitfield_blocks |= (1ULL << static_cast<unsigned int>(piece * PIECES_BLOCKS + i));
636 }
637
638 bool Peer::hasCompletedPiece(unsigned int piece)
639 {
640   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
641     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
642       return false;
643   return true;
644 }
645
646 int Peer::getFirstMissingBlockFrom(int piece)
647 {
648   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
649     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
650       return i;
651   return -1;
652 }
653
654 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
655 int Peer::partiallyDownloadedPiece(Connection* remote_peer)
656 {
657   for (unsigned int i = 0; i < FILE_PIECES; i++)
658     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i) && getFirstMissingBlockFrom(i) > 0)
659       return i;
660   return -1;
661 }