Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
further snake_case s4u::Engine
[simgrid.git] / examples / s4u / app-bittorrent / s4u-peer.cpp
1 /* Copyright (c) 2012-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <algorithm>
7 #include <climits>
8 #include <xbt/ex.hpp>
9
10 #include "s4u-peer.hpp"
11 #include "s4u-tracker.hpp"
12
13 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_bt_peer, "Messages specific for the peers");
14
15 /*
16  * User parameters for transferred file data. For the test, the default values are :
17  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
18  */
19 #define FILE_PIECES 10UL
20 #define PIECES_BLOCKS 5UL
21 #define BLOCK_SIZE 16384
22
23 /** Number of blocks asked by each request */
24 #define BLOCKS_REQUESTED 2UL
25
26 #define ENABLE_END_GAME_MODE 1
27 #define SLEEP_DURATION 1
28 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
29
30 Peer::Peer(std::vector<std::string> args)
31 {
32   // Check arguments
33   xbt_assert(args.size() == 3 || args.size() == 4, "Wrong number of arguments");
34   try {
35     id       = std::stoi(args[1]);
36     mailbox_ = simgrid::s4u::Mailbox::byName(std::to_string(id));
37   } catch (std::invalid_argument& ia) {
38     throw std::invalid_argument(std::string("Invalid ID:") + args[1].c_str());
39   }
40
41   try {
42     deadline = std::stod(args[2]);
43   } catch (std::invalid_argument& ia) {
44     throw std::invalid_argument(std::string("Invalid deadline:") + args[2].c_str());
45   }
46   xbt_assert(deadline > 0, "Wrong deadline supplied");
47
48   stream = simgrid::s4u::this_actor::get_host()->extension<HostBittorrent>()->getStream();
49
50   if (args.size() == 4 && args[3] == "1") {
51     bitfield_       = (1U << FILE_PIECES) - 1U;
52     bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
53   }
54   pieces_count = new short[FILE_PIECES]{0};
55
56   XBT_INFO("Hi, I'm joining the network with id %d", id);
57 }
58
59 Peer::~Peer()
60 {
61   for (auto const& peer : connected_peers)
62     delete peer.second;
63   delete[] pieces_count;
64 }
65
66 /** Peer main function */
67 void Peer::operator()()
68 {
69   // Getting peer data from the tracker.
70   if (getPeersFromTracker()) {
71     XBT_DEBUG("Got %zu peers from the tracker. Current status is: %s", connected_peers.size(), getStatus().c_str());
72     begin_receive_time = simgrid::s4u::Engine::get_clock();
73     mailbox_->setReceiver(simgrid::s4u::Actor::self());
74     if (hasFinished()) {
75       sendHandshakeToAllPeers();
76     } else {
77       leech();
78     }
79     seed();
80   } else {
81     XBT_INFO("Couldn't contact the tracker.");
82   }
83
84   XBT_INFO("Here is my current status: %s", getStatus().c_str());
85 }
86
87 bool Peer::getPeersFromTracker()
88 {
89   simgrid::s4u::MailboxPtr tracker_mailbox = simgrid::s4u::Mailbox::byName(TRACKER_MAILBOX);
90   // Build the task to send to the tracker
91   TrackerQuery* peer_request = new TrackerQuery(id, mailbox_);
92   try {
93     XBT_DEBUG("Sending a peer request to the tracker.");
94     tracker_mailbox->put(peer_request, TRACKER_COMM_SIZE, GET_PEERS_TIMEOUT);
95   } catch (xbt_ex& e) {
96     if (e.category == timeout_error) {
97       XBT_DEBUG("Timeout expired when requesting peers to tracker");
98       delete peer_request;
99       return false;
100     }
101   }
102
103   try {
104     TrackerAnswer* answer = static_cast<TrackerAnswer*>(mailbox_->get(GET_PEERS_TIMEOUT));
105     // Add the peers the tracker gave us to our peer list.
106     for (auto const& peer_id : *answer->getPeers())
107       if (id != peer_id)
108         connected_peers[peer_id] = new Connection(peer_id);
109     delete answer;
110   } catch (xbt_ex& e) {
111     if (e.category == timeout_error) {
112       XBT_DEBUG("Timeout expired when requesting peers to tracker");
113       return false;
114     }
115   }
116   return true;
117 }
118
119 void Peer::sendHandshakeToAllPeers()
120 {
121   for (auto const& kv : connected_peers) {
122     Connection* remote_peer = kv.second;
123     Message* handshake      = new Message(MESSAGE_HANDSHAKE, id, mailbox_);
124     remote_peer->mailbox_->put_init(handshake, MESSAGE_HANDSHAKE_SIZE)->detach();
125     XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer->id);
126   }
127 }
128
129 void Peer::sendMessage(simgrid::s4u::MailboxPtr mailbox, e_message_type type, uint64_t size)
130 {
131   const char* type_names[6] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "CANCEL"};
132   XBT_DEBUG("Sending %s to %s", type_names[type], mailbox->get_cname());
133   mailbox->put_init(new Message(type, id, bitfield_, mailbox_), size)->detach();
134 }
135
136 void Peer::sendBitfield(simgrid::s4u::MailboxPtr mailbox)
137 {
138   XBT_DEBUG("Sending a BITFIELD to %s", mailbox->get_cname());
139   mailbox
140       ->put_init(new Message(MESSAGE_BITFIELD, id, bitfield_, mailbox_),
141                  MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES))
142       ->detach();
143 }
144
145 void Peer::sendPiece(simgrid::s4u::MailboxPtr mailbox, unsigned int piece, int block_index, int block_length)
146 {
147   xbt_assert(not hasNotPiece(piece), "Tried to send a unavailable piece.");
148   XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->get_cname());
149   mailbox->put_init(new Message(MESSAGE_PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)->detach();
150 }
151
152 void Peer::sendHaveToAllPeers(unsigned int piece)
153 {
154   XBT_DEBUG("Sending HAVE message to all my peers");
155   for (auto const& kv : connected_peers) {
156     Connection* remote_peer = kv.second;
157     remote_peer->mailbox_->put_init(new Message(MESSAGE_HAVE, id, mailbox_, piece), MESSAGE_HAVE_SIZE)->detach();
158   }
159 }
160
161 void Peer::sendRequestTo(Connection* remote_peer, unsigned int piece)
162 {
163   remote_peer->current_piece = piece;
164   xbt_assert(remote_peer->hasPiece(piece));
165   int block_index = getFirstMissingBlockFrom(piece);
166   if (block_index != -1) {
167     int block_length = std::min(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
168     XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", remote_peer->mailbox_->get_cname(), piece, block_index,
169               block_length);
170     remote_peer->mailbox_
171         ->put_init(new Message(MESSAGE_REQUEST, id, mailbox_, piece, block_index, block_length), MESSAGE_REQUEST_SIZE)
172         ->detach();
173   }
174 }
175
176 std::string Peer::getStatus()
177 {
178   std::string res = std::string("");
179   for (int i = FILE_PIECES - 1; i >= 0; i--)
180     res = std::string((bitfield_ & (1U << i)) ? "1" : "0") + res;
181   return res;
182 }
183
184 bool Peer::hasFinished()
185 {
186   return bitfield_ == (1U << FILE_PIECES) - 1U;
187 }
188
189 /** Indicates if the remote peer has a piece not stored by the local peer */
190 bool Peer::isInterestedBy(Connection* remote_peer)
191 {
192   return remote_peer->bitfield & (bitfield_ ^ ((1 << FILE_PIECES) - 1));
193 }
194
195 bool Peer::isInterestedByFree(Connection* remote_peer)
196 {
197   for (unsigned int i = 0; i < FILE_PIECES; i++)
198     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
199       return true;
200   return false;
201 }
202
203 void Peer::updatePiecesCountFromBitfield(unsigned int bitfield)
204 {
205   for (unsigned int i = 0; i < FILE_PIECES; i++)
206     if (bitfield & (1U << i))
207       pieces_count[i]++;
208 }
209
210 unsigned int Peer::countPieces(unsigned int bitfield)
211 {
212   unsigned int count = 0U;
213   unsigned int n     = bitfield;
214   while (n) {
215     count += n & 1U;
216     n >>= 1U;
217   }
218   return count;
219 }
220
221 int Peer::nbInterestedPeers()
222 {
223   int nb = 0;
224   for (auto const& kv : connected_peers)
225     if (kv.second->interested)
226       nb++;
227   return nb;
228 }
229
230 void Peer::leech()
231 {
232   double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
233   XBT_DEBUG("Start downloading.");
234
235   /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
236   sendHandshakeToAllPeers();
237   XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->get_cname());
238
239   void* data = nullptr;
240   while (simgrid::s4u::Engine::get_clock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
241     if (comm_received == nullptr) {
242       comm_received = mailbox_->get_async(&data);
243     }
244     if (comm_received->test()) {
245       message = static_cast<Message*>(data);
246       handleMessage();
247       delete message;
248       comm_received = nullptr;
249     } else {
250       // We don't execute the choke algorithm if we don't already have a piece
251       if (simgrid::s4u::Engine::get_clock() >= next_choked_update && countPieces(bitfield_) > 0) {
252         updateChokedPeers();
253         next_choked_update += UPDATE_CHOKED_INTERVAL;
254       } else {
255         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
256       }
257     }
258   }
259   if (hasFinished())
260     XBT_DEBUG("%d becomes a seeder", id);
261 }
262
263 void Peer::seed()
264 {
265   double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
266   XBT_DEBUG("Start seeding.");
267   // start the main seed loop
268   void* data = nullptr;
269   while (simgrid::s4u::Engine::get_clock() < deadline) {
270     if (comm_received == nullptr) {
271       comm_received = mailbox_->get_async(&data);
272     }
273     if (comm_received->test()) {
274       message = static_cast<Message*>(data);
275       handleMessage();
276       delete message;
277       comm_received = nullptr;
278     } else {
279       if (simgrid::s4u::Engine::get_clock() >= next_choked_update) {
280         updateChokedPeers();
281         // TODO: Change the choked peer algorithm when seeding.
282         next_choked_update += UPDATE_CHOKED_INTERVAL;
283       } else {
284         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
285       }
286     }
287   }
288 }
289
290 void Peer::updateActivePeersSet(Connection* remote_peer)
291 {
292   if (remote_peer->interested && not remote_peer->choked_upload)
293     active_peers.insert(remote_peer);
294   else
295     active_peers.erase(remote_peer);
296 }
297
298 void Peer::handleMessage()
299 {
300   const char* type_names[10] = {"HANDSHAKE", "CHOKE",    "UNCHOKE", "INTERESTED", "NOTINTERESTED",
301                                 "HAVE",      "BITFIELD", "REQUEST", "PIECE",      "CANCEL"};
302
303   XBT_DEBUG("Received a %s message from %s", type_names[message->type], message->return_mailbox->get_cname());
304
305   auto known_peer         = connected_peers.find(message->peer_id);
306   Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : known_peer->second;
307   xbt_assert(remote_peer != nullptr || message->type == MESSAGE_HANDSHAKE,
308              "The impossible did happened: A not-in-our-list peer sent us a message.");
309
310   switch (message->type) {
311     case MESSAGE_HANDSHAKE:
312       // Check if the peer is in our connection list.
313       if (remote_peer == nullptr) {
314         XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id);
315         connected_peers[message->peer_id] = new Connection(message->peer_id);
316         sendMessage(message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE);
317       }
318       // Send our bitfield to the peer
319       sendBitfield(message->return_mailbox);
320       break;
321     case MESSAGE_BITFIELD:
322       // Update the pieces list
323       updatePiecesCountFromBitfield(message->bitfield);
324       // Store the bitfield
325       remote_peer->bitfield = message->bitfield;
326       xbt_assert(not remote_peer->am_interested, "Should not be interested at first");
327       if (isInterestedBy(remote_peer)) {
328         remote_peer->am_interested = true;
329         sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
330       }
331       break;
332     case MESSAGE_INTERESTED:
333       // Update the interested state of the peer.
334       remote_peer->interested = true;
335       updateActivePeersSet(remote_peer);
336       break;
337     case MESSAGE_NOTINTERESTED:
338       remote_peer->interested = false;
339       updateActivePeersSet(remote_peer);
340       break;
341     case MESSAGE_UNCHOKE:
342       xbt_assert(remote_peer->choked_download);
343       remote_peer->choked_download = false;
344       // Send requests to the peer, since it has unchoked us
345       if (remote_peer->am_interested)
346         requestNewPieceTo(remote_peer);
347       break;
348     case MESSAGE_CHOKE:
349       xbt_assert(not remote_peer->choked_download);
350       remote_peer->choked_download = true;
351       if (remote_peer->current_piece != -1)
352         removeCurrentPiece(remote_peer, remote_peer->current_piece);
353       break;
354     case MESSAGE_HAVE:
355       XBT_DEBUG("\t for piece %d", message->piece);
356       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
357                  "Wrong HAVE message received");
358       remote_peer->bitfield = remote_peer->bitfield | (1U << static_cast<unsigned int>(message->piece));
359       pieces_count[message->piece]++;
360       // If the piece is in our pieces, we tell the peer that we are interested.
361       if (not remote_peer->am_interested && hasNotPiece(message->piece)) {
362         remote_peer->am_interested = true;
363         sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
364         if (not remote_peer->choked_download)
365           requestNewPieceTo(remote_peer);
366       }
367       break;
368     case MESSAGE_REQUEST:
369       xbt_assert(remote_peer->interested);
370       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
371                  "Wrong HAVE message received");
372       if (not remote_peer->choked_upload) {
373         XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
374                   message->block_index + message->block_length);
375         if (not hasNotPiece(message->piece)) {
376           sendPiece(message->return_mailbox, message->piece, message->block_index, message->block_length);
377         }
378       } else {
379         XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
380       }
381       break;
382     case MESSAGE_PIECE:
383       XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
384                 message->block_index + message->block_length);
385       xbt_assert(not remote_peer->choked_download);
386       xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE,
387                  "Can't received a piece if I'm not interested without end-game mode!"
388                  "piece (%d) bitfield (%u) remote bitfield (%u)",
389                  message->piece, bitfield_, remote_peer->bitfield);
390       xbt_assert(not remote_peer->choked_download, "Can't received a piece if I'm choked !");
391       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
392                  "Wrong piece received");
393       // TODO: Execute a computation.
394       if (hasNotPiece(static_cast<unsigned int>(message->piece))) {
395         updateBitfieldBlocks(message->piece, message->block_index, message->block_length);
396         if (hasCompletedPiece(static_cast<unsigned int>(message->piece))) {
397           // Removing the piece from our piece list
398           removeCurrentPiece(remote_peer, message->piece);
399           // Setting the fact that we have the piece
400           bitfield_ = bitfield_ | (1U << static_cast<unsigned int>(message->piece));
401           XBT_DEBUG("My status is now %s", getStatus().c_str());
402           // Sending the information to all the peers we are connected to
403           sendHaveToAllPeers(message->piece);
404           // sending UNINTERESTED to peers that do not have what we want.
405           updateInterestedAfterReceive();
406         } else {                                      // piece not completed
407           sendRequestTo(remote_peer, message->piece); // ask for the next block
408         }
409       } else {
410         XBT_DEBUG("However, we already have it");
411         xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
412         requestNewPieceTo(remote_peer);
413       }
414       break;
415     case MESSAGE_CANCEL:
416       break;
417     default:
418       THROW_IMPOSSIBLE;
419   }
420   // Update the peer speed.
421   if (remote_peer) {
422     remote_peer->addSpeedValue(1.0 / (simgrid::s4u::Engine::get_clock() - begin_receive_time));
423   }
424   begin_receive_time = simgrid::s4u::Engine::get_clock();
425 }
426
427 /** Selects the appropriate piece to download and requests it to the remote_peer */
428 void Peer::requestNewPieceTo(Connection* remote_peer)
429 {
430   int piece = selectPieceToDownload(remote_peer);
431   if (piece != -1) {
432     current_pieces |= (1U << (unsigned int)piece);
433     sendRequestTo(remote_peer, piece);
434   }
435 }
436
437 void Peer::removeCurrentPiece(Connection* remote_peer, unsigned int current_piece)
438 {
439   current_pieces &= ~(1U << current_piece);
440   remote_peer->current_piece = -1;
441 }
442
443 /** @brief Return the piece to be downloaded
444  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
445  * If a piece is partially downloaded, this piece will be selected prioritarily
446  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
447  * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
448  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
449  * @param remote_peer: information about the connection
450  * @return the piece to download if possible. -1 otherwise
451  */
452 int Peer::selectPieceToDownload(Connection* remote_peer)
453 {
454   int piece = partiallyDownloadedPiece(remote_peer);
455   // strict priority policy
456   if (piece != -1)
457     return piece;
458
459   // end game mode
460   if (countPieces(current_pieces) >= (FILE_PIECES - countPieces(bitfield_)) && isInterestedBy(remote_peer)) {
461 #if ENABLE_END_GAME_MODE == 0
462     return -1;
463 #endif
464     int nb_interesting_pieces = 0;
465     // compute the number of interesting pieces
466     for (unsigned int i = 0; i < FILE_PIECES; i++)
467       if (hasNotPiece(i) && remote_peer->hasPiece(i))
468         nb_interesting_pieces++;
469
470     xbt_assert(nb_interesting_pieces != 0);
471     // get a random interesting piece
472     int random_piece_index = RngStream_RandInt(stream, 0, nb_interesting_pieces - 1);
473     int current_index      = 0;
474     for (unsigned int i = 0; i < FILE_PIECES; i++) {
475       if (hasNotPiece(i) && remote_peer->hasPiece(i)) {
476         if (random_piece_index == current_index) {
477           piece = i;
478           break;
479         }
480         current_index++;
481       }
482     }
483     xbt_assert(piece != -1);
484     return piece;
485   }
486   // Random first policy
487   if (countPieces(bitfield_) < 4 && isInterestedByFree(remote_peer)) {
488     int nb_interesting_pieces = 0;
489     // compute the number of interesting pieces
490     for (unsigned int i = 0; i < FILE_PIECES; i++)
491       if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
492         nb_interesting_pieces++;
493     xbt_assert(nb_interesting_pieces != 0);
494     // get a random interesting piece
495     int random_piece_index = RngStream_RandInt(stream, 0, nb_interesting_pieces - 1);
496     int current_index      = 0;
497     for (unsigned int i = 0; i < FILE_PIECES; i++) {
498       if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
499         if (random_piece_index == current_index) {
500           piece = i;
501           break;
502         }
503         current_index++;
504       }
505     }
506     xbt_assert(piece != -1);
507     return piece;
508   } else { // Rarest first policy
509     short min         = SHRT_MAX;
510     int nb_min_pieces = 0;
511     int current_index = 0;
512     // compute the smallest number of copies of available pieces
513     for (unsigned int i = 0; i < FILE_PIECES; i++) {
514       if (pieces_count[i] < min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
515         min = pieces_count[i];
516     }
517
518     xbt_assert(min != SHRT_MAX || not isInterestedByFree(remote_peer));
519     // compute the number of rarest pieces
520     for (unsigned int i = 0; i < FILE_PIECES; i++)
521       if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
522         nb_min_pieces++;
523
524     xbt_assert(nb_min_pieces != 0 || not isInterestedByFree(remote_peer));
525     // get a random rarest piece
526     int random_rarest_index = RngStream_RandInt(stream, 0, nb_min_pieces - 1);
527     for (unsigned int i = 0; i < FILE_PIECES; i++)
528       if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
529         if (random_rarest_index == current_index) {
530           piece = i;
531           break;
532         }
533         current_index++;
534       }
535
536     xbt_assert(piece != -1 || not isInterestedByFree(remote_peer));
537     return piece;
538   }
539 }
540
541 void Peer::updateChokedPeers()
542 {
543   if (nbInterestedPeers() == 0)
544     return;
545   XBT_DEBUG("(%d) update_choked peers %zu active peers", id, active_peers.size());
546   // update the current round
547   round_                  = (round_ + 1) % 3;
548   Connection* chosen_peer = nullptr;
549   // select first active peer and remove it from the set
550   Connection* choked_peer;
551   if (active_peers.empty()) {
552     choked_peer = nullptr;
553   } else {
554     choked_peer = *active_peers.begin();
555     active_peers.erase(choked_peer);
556   }
557
558   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
559   if (hasFinished()) {
560     Connection* remote_peer;
561     double unchoke_time = simgrid::s4u::Engine::get_clock() + 1;
562     for (auto const& kv : connected_peers) {
563       remote_peer = kv.second;
564       if (remote_peer->last_unchoke < unchoke_time && remote_peer->interested && remote_peer->choked_upload) {
565         unchoke_time = remote_peer->last_unchoke;
566         chosen_peer  = remote_peer;
567       }
568     }
569   } else {
570     // Random optimistic unchoking
571     if (round_ == 0) {
572       int j = 0;
573       do {
574         // We choose a random peer to unchoke.
575         std::unordered_map<int, Connection*>::iterator chosen_peer_it = connected_peers.begin();
576         std::advance(chosen_peer_it, RngStream_RandInt(stream, 0, connected_peers.size() - 1));
577         chosen_peer = chosen_peer_it->second;
578         if (chosen_peer == nullptr)
579           THROWF(unknown_error, 0, "A peer should have be selected at this point");
580         else if (not chosen_peer->interested || not chosen_peer->choked_upload)
581           chosen_peer = nullptr;
582         else
583           XBT_DEBUG("Nothing to do, keep going");
584         j++;
585       } while (chosen_peer == nullptr && j < MAXIMUM_PEERS);
586     } else {
587       // Use the "fastest download" policy.
588       double fastest_speed = 0.0;
589       for (auto const& kv : connected_peers) {
590         Connection* remote_peer = kv.second;
591         if (remote_peer->peer_speed > fastest_speed && remote_peer->choked_upload && remote_peer->interested) {
592           chosen_peer   = remote_peer;
593           fastest_speed = remote_peer->peer_speed;
594         }
595       }
596     }
597   }
598
599   if (chosen_peer != nullptr)
600     XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", id, chosen_peer->id,
601               chosen_peer->interested, chosen_peer->choked_upload);
602
603   if (choked_peer != chosen_peer) {
604     if (choked_peer != nullptr) {
605       xbt_assert(not choked_peer->choked_upload, "Tries to choked a choked peer");
606       choked_peer->choked_upload = true;
607       updateActivePeersSet(choked_peer);
608       XBT_DEBUG("(%d) Sending a CHOKE to %d", id, choked_peer->id);
609       sendMessage(choked_peer->mailbox_, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE);
610     }
611     if (chosen_peer != nullptr) {
612       xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
613       chosen_peer->choked_upload = false;
614       active_peers.insert(chosen_peer);
615       chosen_peer->last_unchoke = simgrid::s4u::Engine::get_clock();
616       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id);
617       updateActivePeersSet(chosen_peer);
618       sendMessage(chosen_peer->mailbox_, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE);
619     }
620   }
621 }
622
623 /** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.*/
624 void Peer::updateInterestedAfterReceive()
625 {
626   for (auto const& kv : connected_peers) {
627     Connection* remote_peer = kv.second;
628     if (remote_peer->am_interested) {
629       bool interested = false;
630       // Check if the peer still has a piece we want.
631       for (unsigned int i = 0; i < FILE_PIECES; i++)
632         if (hasNotPiece(i) && remote_peer->hasPiece(i)) {
633           interested = true;
634           break;
635         }
636
637       if (not interested) { // no more piece to download from connection
638         remote_peer->am_interested = false;
639         sendMessage(remote_peer->mailbox_, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE);
640       }
641     }
642   }
643 }
644
645 void Peer::updateBitfieldBlocks(int piece, int block_index, int block_length)
646 {
647   xbt_assert((piece >= 0 && static_cast<unsigned int>(piece) <= FILE_PIECES), "Wrong piece.");
648   xbt_assert((block_index >= 0 && static_cast<unsigned int>(block_index) <= PIECES_BLOCKS), "Wrong block : %d.",
649              block_index);
650   for (int i = block_index; i < (block_index + block_length); i++)
651     bitfield_blocks |= (1ULL << static_cast<unsigned int>(piece * PIECES_BLOCKS + i));
652 }
653
654 bool Peer::hasCompletedPiece(unsigned int piece)
655 {
656   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
657     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
658       return false;
659   return true;
660 }
661
662 int Peer::getFirstMissingBlockFrom(int piece)
663 {
664   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
665     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
666       return i;
667   return -1;
668 }
669
670 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
671 int Peer::partiallyDownloadedPiece(Connection* remote_peer)
672 {
673   for (unsigned int i = 0; i < FILE_PIECES; i++)
674     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i) && getFirstMissingBlockFrom(i) > 0)
675       return i;
676   return -1;
677 }