Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' into master
[simgrid.git] / examples / s4u / app-bittorrent / s4u-peer.cpp
1 /* Copyright (c) 2012-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <algorithm>
7 #include <climits>
8
9 #include "s4u-peer.hpp"
10 #include "s4u-tracker.hpp"
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_bt_peer, "Messages specific for the peers");
13
14 /*
15  * User parameters for transferred file data. For the test, the default values are :
16  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
17  */
18 #define FILE_PIECES 10UL
19 #define PIECES_BLOCKS 5UL
20 #define BLOCK_SIZE 16384
21
22 /** Number of blocks asked by each request */
23 #define BLOCKS_REQUESTED 2UL
24
25 #define ENABLE_END_GAME_MODE 1
26 #define SLEEP_DURATION 1
27 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
28
29 Peer::Peer(std::vector<std::string> args)
30 {
31   // Check arguments
32   xbt_assert(args.size() == 3 || args.size() == 4, "Wrong number of arguments");
33   try {
34     id       = std::stoi(args[1]);
35     mailbox_ = simgrid::s4u::Mailbox::by_name(std::to_string(id));
36   } catch (std::invalid_argument& ia) {
37     throw std::invalid_argument(std::string("Invalid ID:") + args[1].c_str());
38   }
39
40   try {
41     deadline = std::stod(args[2]);
42   } catch (std::invalid_argument& ia) {
43     throw std::invalid_argument(std::string("Invalid deadline:") + args[2].c_str());
44   }
45   xbt_assert(deadline > 0, "Wrong deadline supplied");
46
47   stream = simgrid::s4u::this_actor::get_host()->extension<HostBittorrent>()->getStream();
48
49   if (args.size() == 4 && args[3] == "1") {
50     bitfield_       = (1U << FILE_PIECES) - 1U;
51     bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
52   }
53   pieces_count = new short[FILE_PIECES]{0};
54
55   XBT_INFO("Hi, I'm joining the network with id %d", id);
56 }
57
58 Peer::~Peer()
59 {
60   for (auto const& peer : connected_peers)
61     delete peer.second;
62   delete[] pieces_count;
63 }
64
65 /** Peer main function */
66 void Peer::operator()()
67 {
68   // Getting peer data from the tracker.
69   if (getPeersFromTracker()) {
70     XBT_DEBUG("Got %zu peers from the tracker. Current status is: %s", connected_peers.size(), getStatus().c_str());
71     begin_receive_time = simgrid::s4u::Engine::get_clock();
72     mailbox_->set_receiver(simgrid::s4u::Actor::self());
73     if (hasFinished()) {
74       sendHandshakeToAllPeers();
75     } else {
76       leech();
77     }
78     seed();
79   } else {
80     XBT_INFO("Couldn't contact the tracker.");
81   }
82
83   XBT_INFO("Here is my current status: %s", getStatus().c_str());
84 }
85
86 bool Peer::getPeersFromTracker()
87 {
88   simgrid::s4u::MailboxPtr tracker_mailbox = simgrid::s4u::Mailbox::by_name(TRACKER_MAILBOX);
89   // Build the task to send to the tracker
90   TrackerQuery* peer_request = new TrackerQuery(id, mailbox_);
91   try {
92     XBT_DEBUG("Sending a peer request to the tracker.");
93     tracker_mailbox->put(peer_request, TRACKER_COMM_SIZE, GET_PEERS_TIMEOUT);
94   } catch (simgrid::TimeoutError& e) {
95     XBT_DEBUG("Timeout expired when requesting peers to tracker");
96     delete peer_request;
97     return false;
98   }
99
100   try {
101     TrackerAnswer* answer = static_cast<TrackerAnswer*>(mailbox_->get(GET_PEERS_TIMEOUT));
102     // Add the peers the tracker gave us to our peer list.
103     for (auto const& peer_id : *answer->getPeers())
104       if (id != peer_id)
105         connected_peers[peer_id] = new Connection(peer_id);
106     delete answer;
107   } catch (simgrid::TimeoutError& e) {
108     XBT_DEBUG("Timeout expired when requesting peers to tracker");
109     return false;
110   }
111   return true;
112 }
113
114 void Peer::sendHandshakeToAllPeers()
115 {
116   for (auto const& kv : connected_peers) {
117     Connection* remote_peer = kv.second;
118     Message* handshake      = new Message(MESSAGE_HANDSHAKE, id, mailbox_);
119     remote_peer->mailbox_->put_init(handshake, MESSAGE_HANDSHAKE_SIZE)->detach();
120     XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer->id);
121   }
122 }
123
124 void Peer::sendMessage(simgrid::s4u::MailboxPtr mailbox, e_message_type type, uint64_t size)
125 {
126   const char* type_names[6] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "CANCEL"};
127   XBT_DEBUG("Sending %s to %s", type_names[type], mailbox->get_cname());
128   mailbox->put_init(new Message(type, id, bitfield_, mailbox_), size)->detach();
129 }
130
131 void Peer::sendBitfield(simgrid::s4u::MailboxPtr mailbox)
132 {
133   XBT_DEBUG("Sending a BITFIELD to %s", mailbox->get_cname());
134   mailbox
135       ->put_init(new Message(MESSAGE_BITFIELD, id, bitfield_, mailbox_),
136                  MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES))
137       ->detach();
138 }
139
140 void Peer::sendPiece(simgrid::s4u::MailboxPtr mailbox, unsigned int piece, int block_index, int block_length)
141 {
142   xbt_assert(not hasNotPiece(piece), "Tried to send a unavailable piece.");
143   XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->get_cname());
144   mailbox->put_init(new Message(MESSAGE_PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)->detach();
145 }
146
147 void Peer::sendHaveToAllPeers(unsigned int piece)
148 {
149   XBT_DEBUG("Sending HAVE message to all my peers");
150   for (auto const& kv : connected_peers) {
151     Connection* remote_peer = kv.second;
152     remote_peer->mailbox_->put_init(new Message(MESSAGE_HAVE, id, mailbox_, piece), MESSAGE_HAVE_SIZE)->detach();
153   }
154 }
155
156 void Peer::sendRequestTo(Connection* remote_peer, unsigned int piece)
157 {
158   remote_peer->current_piece = piece;
159   xbt_assert(remote_peer->hasPiece(piece));
160   int block_index = getFirstMissingBlockFrom(piece);
161   if (block_index != -1) {
162     int block_length = std::min(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
163     XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", remote_peer->mailbox_->get_cname(), piece, block_index,
164               block_length);
165     remote_peer->mailbox_
166         ->put_init(new Message(MESSAGE_REQUEST, id, mailbox_, piece, block_index, block_length), MESSAGE_REQUEST_SIZE)
167         ->detach();
168   }
169 }
170
171 std::string Peer::getStatus()
172 {
173   std::string res = std::string("");
174   for (int i = FILE_PIECES - 1; i >= 0; i--)
175     res = std::string((bitfield_ & (1U << i)) ? "1" : "0") + res;
176   return res;
177 }
178
179 bool Peer::hasFinished()
180 {
181   return bitfield_ == (1U << FILE_PIECES) - 1U;
182 }
183
184 /** Indicates if the remote peer has a piece not stored by the local peer */
185 bool Peer::isInterestedBy(Connection* remote_peer)
186 {
187   return remote_peer->bitfield & (bitfield_ ^ ((1 << FILE_PIECES) - 1));
188 }
189
190 bool Peer::isInterestedByFree(Connection* remote_peer)
191 {
192   for (unsigned int i = 0; i < FILE_PIECES; i++)
193     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
194       return true;
195   return false;
196 }
197
198 void Peer::updatePiecesCountFromBitfield(unsigned int bitfield)
199 {
200   for (unsigned int i = 0; i < FILE_PIECES; i++)
201     if (bitfield & (1U << i))
202       pieces_count[i]++;
203 }
204
205 unsigned int Peer::countPieces(unsigned int bitfield)
206 {
207   unsigned int count = 0U;
208   unsigned int n     = bitfield;
209   while (n) {
210     count += n & 1U;
211     n >>= 1U;
212   }
213   return count;
214 }
215
216 int Peer::nbInterestedPeers()
217 {
218   int nb = 0;
219   for (auto const& kv : connected_peers)
220     if (kv.second->interested)
221       nb++;
222   return nb;
223 }
224
225 void Peer::leech()
226 {
227   double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
228   XBT_DEBUG("Start downloading.");
229
230   /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
231   sendHandshakeToAllPeers();
232   XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->get_cname());
233
234   void* data = nullptr;
235   while (simgrid::s4u::Engine::get_clock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
236     if (comm_received == nullptr) {
237       comm_received = mailbox_->get_async(&data);
238     }
239     if (comm_received->test()) {
240       message = static_cast<Message*>(data);
241       handleMessage();
242       delete message;
243       comm_received = nullptr;
244     } else {
245       // We don't execute the choke algorithm if we don't already have a piece
246       if (simgrid::s4u::Engine::get_clock() >= next_choked_update && countPieces(bitfield_) > 0) {
247         updateChokedPeers();
248         next_choked_update += UPDATE_CHOKED_INTERVAL;
249       } else {
250         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
251       }
252     }
253   }
254   if (hasFinished())
255     XBT_DEBUG("%d becomes a seeder", id);
256 }
257
258 void Peer::seed()
259 {
260   double next_choked_update = simgrid::s4u::Engine::get_clock() + UPDATE_CHOKED_INTERVAL;
261   XBT_DEBUG("Start seeding.");
262   // start the main seed loop
263   void* data = nullptr;
264   while (simgrid::s4u::Engine::get_clock() < deadline) {
265     if (comm_received == nullptr) {
266       comm_received = mailbox_->get_async(&data);
267     }
268     if (comm_received->test()) {
269       message = static_cast<Message*>(data);
270       handleMessage();
271       delete message;
272       comm_received = nullptr;
273     } else {
274       if (simgrid::s4u::Engine::get_clock() >= next_choked_update) {
275         updateChokedPeers();
276         // TODO: Change the choked peer algorithm when seeding.
277         next_choked_update += UPDATE_CHOKED_INTERVAL;
278       } else {
279         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
280       }
281     }
282   }
283 }
284
285 void Peer::updateActivePeersSet(Connection* remote_peer)
286 {
287   if (remote_peer->interested && not remote_peer->choked_upload)
288     active_peers.insert(remote_peer);
289   else
290     active_peers.erase(remote_peer);
291 }
292
293 void Peer::handleMessage()
294 {
295   const char* type_names[10] = {"HANDSHAKE", "CHOKE",    "UNCHOKE", "INTERESTED", "NOTINTERESTED",
296                                 "HAVE",      "BITFIELD", "REQUEST", "PIECE",      "CANCEL"};
297
298   XBT_DEBUG("Received a %s message from %s", type_names[message->type], message->return_mailbox->get_cname());
299
300   auto known_peer         = connected_peers.find(message->peer_id);
301   Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : known_peer->second;
302   xbt_assert(remote_peer != nullptr || message->type == MESSAGE_HANDSHAKE,
303              "The impossible did happened: A not-in-our-list peer sent us a message.");
304
305   switch (message->type) {
306     case MESSAGE_HANDSHAKE:
307       // Check if the peer is in our connection list.
308       if (remote_peer == nullptr) {
309         XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id);
310         connected_peers[message->peer_id] = new Connection(message->peer_id);
311         sendMessage(message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE);
312       }
313       // Send our bitfield to the peer
314       sendBitfield(message->return_mailbox);
315       break;
316     case MESSAGE_BITFIELD:
317       // Update the pieces list
318       updatePiecesCountFromBitfield(message->bitfield);
319       // Store the bitfield
320       remote_peer->bitfield = message->bitfield;
321       xbt_assert(not remote_peer->am_interested, "Should not be interested at first");
322       if (isInterestedBy(remote_peer)) {
323         remote_peer->am_interested = true;
324         sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
325       }
326       break;
327     case MESSAGE_INTERESTED:
328       // Update the interested state of the peer.
329       remote_peer->interested = true;
330       updateActivePeersSet(remote_peer);
331       break;
332     case MESSAGE_NOTINTERESTED:
333       remote_peer->interested = false;
334       updateActivePeersSet(remote_peer);
335       break;
336     case MESSAGE_UNCHOKE:
337       xbt_assert(remote_peer->choked_download);
338       remote_peer->choked_download = false;
339       // Send requests to the peer, since it has unchoked us
340       if (remote_peer->am_interested)
341         requestNewPieceTo(remote_peer);
342       break;
343     case MESSAGE_CHOKE:
344       xbt_assert(not remote_peer->choked_download);
345       remote_peer->choked_download = true;
346       if (remote_peer->current_piece != -1)
347         removeCurrentPiece(remote_peer, remote_peer->current_piece);
348       break;
349     case MESSAGE_HAVE:
350       XBT_DEBUG("\t for piece %d", message->piece);
351       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
352                  "Wrong HAVE message received");
353       remote_peer->bitfield = remote_peer->bitfield | (1U << static_cast<unsigned int>(message->piece));
354       pieces_count[message->piece]++;
355       // If the piece is in our pieces, we tell the peer that we are interested.
356       if (not remote_peer->am_interested && hasNotPiece(message->piece)) {
357         remote_peer->am_interested = true;
358         sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
359         if (not remote_peer->choked_download)
360           requestNewPieceTo(remote_peer);
361       }
362       break;
363     case MESSAGE_REQUEST:
364       xbt_assert(remote_peer->interested);
365       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
366                  "Wrong HAVE message received");
367       if (not remote_peer->choked_upload) {
368         XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
369                   message->block_index + message->block_length);
370         if (not hasNotPiece(message->piece)) {
371           sendPiece(message->return_mailbox, message->piece, message->block_index, message->block_length);
372         }
373       } else {
374         XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
375       }
376       break;
377     case MESSAGE_PIECE:
378       XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
379                 message->block_index + message->block_length);
380       xbt_assert(not remote_peer->choked_download);
381       xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE,
382                  "Can't received a piece if I'm not interested without end-game mode!"
383                  "piece (%d) bitfield (%u) remote bitfield (%u)",
384                  message->piece, bitfield_, remote_peer->bitfield);
385       xbt_assert(not remote_peer->choked_download, "Can't received a piece if I'm choked !");
386       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
387                  "Wrong piece received");
388       // TODO: Execute a computation.
389       if (hasNotPiece(static_cast<unsigned int>(message->piece))) {
390         updateBitfieldBlocks(message->piece, message->block_index, message->block_length);
391         if (hasCompletedPiece(static_cast<unsigned int>(message->piece))) {
392           // Removing the piece from our piece list
393           removeCurrentPiece(remote_peer, message->piece);
394           // Setting the fact that we have the piece
395           bitfield_ = bitfield_ | (1U << static_cast<unsigned int>(message->piece));
396           XBT_DEBUG("My status is now %s", getStatus().c_str());
397           // Sending the information to all the peers we are connected to
398           sendHaveToAllPeers(message->piece);
399           // sending UNINTERESTED to peers that do not have what we want.
400           updateInterestedAfterReceive();
401         } else {                                      // piece not completed
402           sendRequestTo(remote_peer, message->piece); // ask for the next block
403         }
404       } else {
405         XBT_DEBUG("However, we already have it");
406         xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
407         requestNewPieceTo(remote_peer);
408       }
409       break;
410     case MESSAGE_CANCEL:
411       break;
412     default:
413       THROW_IMPOSSIBLE;
414   }
415   // Update the peer speed.
416   if (remote_peer) {
417     remote_peer->addSpeedValue(1.0 / (simgrid::s4u::Engine::get_clock() - begin_receive_time));
418   }
419   begin_receive_time = simgrid::s4u::Engine::get_clock();
420 }
421
422 /** Selects the appropriate piece to download and requests it to the remote_peer */
423 void Peer::requestNewPieceTo(Connection* remote_peer)
424 {
425   int piece = selectPieceToDownload(remote_peer);
426   if (piece != -1) {
427     current_pieces |= (1U << (unsigned int)piece);
428     sendRequestTo(remote_peer, piece);
429   }
430 }
431
432 void Peer::removeCurrentPiece(Connection* remote_peer, unsigned int current_piece)
433 {
434   current_pieces &= ~(1U << current_piece);
435   remote_peer->current_piece = -1;
436 }
437
438 /** @brief Return the piece to be downloaded
439  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
440  * If a piece is partially downloaded, this piece will be selected prioritarily
441  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
442  * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
443  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
444  * @param remote_peer: information about the connection
445  * @return the piece to download if possible. -1 otherwise
446  */
447 int Peer::selectPieceToDownload(Connection* remote_peer)
448 {
449   int piece = partiallyDownloadedPiece(remote_peer);
450   // strict priority policy
451   if (piece != -1)
452     return piece;
453
454   // end game mode
455   if (countPieces(current_pieces) >= (FILE_PIECES - countPieces(bitfield_)) && isInterestedBy(remote_peer)) {
456 #if ENABLE_END_GAME_MODE == 0
457     return -1;
458 #endif
459     int nb_interesting_pieces = 0;
460     // compute the number of interesting pieces
461     for (unsigned int i = 0; i < FILE_PIECES; i++)
462       if (hasNotPiece(i) && remote_peer->hasPiece(i))
463         nb_interesting_pieces++;
464
465     xbt_assert(nb_interesting_pieces != 0);
466     // get a random interesting piece
467     int random_piece_index = RngStream_RandInt(stream, 0, nb_interesting_pieces - 1);
468     int current_index      = 0;
469     for (unsigned int i = 0; i < FILE_PIECES; i++) {
470       if (hasNotPiece(i) && remote_peer->hasPiece(i)) {
471         if (random_piece_index == current_index) {
472           piece = i;
473           break;
474         }
475         current_index++;
476       }
477     }
478     xbt_assert(piece != -1);
479     return piece;
480   }
481   // Random first policy
482   if (countPieces(bitfield_) < 4 && isInterestedByFree(remote_peer)) {
483     int nb_interesting_pieces = 0;
484     // compute the number of interesting pieces
485     for (unsigned int i = 0; i < FILE_PIECES; i++)
486       if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
487         nb_interesting_pieces++;
488     xbt_assert(nb_interesting_pieces != 0);
489     // get a random interesting piece
490     int random_piece_index = RngStream_RandInt(stream, 0, nb_interesting_pieces - 1);
491     int current_index      = 0;
492     for (unsigned int i = 0; i < FILE_PIECES; i++) {
493       if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
494         if (random_piece_index == current_index) {
495           piece = i;
496           break;
497         }
498         current_index++;
499       }
500     }
501     xbt_assert(piece != -1);
502     return piece;
503   } else { // Rarest first policy
504     short min         = SHRT_MAX;
505     int nb_min_pieces = 0;
506     int current_index = 0;
507     // compute the smallest number of copies of available pieces
508     for (unsigned int i = 0; i < FILE_PIECES; i++) {
509       if (pieces_count[i] < min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
510         min = pieces_count[i];
511     }
512
513     xbt_assert(min != SHRT_MAX || not isInterestedByFree(remote_peer));
514     // compute the number of rarest pieces
515     for (unsigned int i = 0; i < FILE_PIECES; i++)
516       if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
517         nb_min_pieces++;
518
519     xbt_assert(nb_min_pieces != 0 || not isInterestedByFree(remote_peer));
520     // get a random rarest piece
521     int random_rarest_index = RngStream_RandInt(stream, 0, nb_min_pieces - 1);
522     for (unsigned int i = 0; i < FILE_PIECES; i++)
523       if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
524         if (random_rarest_index == current_index) {
525           piece = i;
526           break;
527         }
528         current_index++;
529       }
530
531     xbt_assert(piece != -1 || not isInterestedByFree(remote_peer));
532     return piece;
533   }
534 }
535
536 void Peer::updateChokedPeers()
537 {
538   if (nbInterestedPeers() == 0)
539     return;
540   XBT_DEBUG("(%d) update_choked peers %zu active peers", id, active_peers.size());
541   // update the current round
542   round_                  = (round_ + 1) % 3;
543   Connection* chosen_peer = nullptr;
544   // select first active peer and remove it from the set
545   Connection* choked_peer;
546   if (active_peers.empty()) {
547     choked_peer = nullptr;
548   } else {
549     choked_peer = *active_peers.begin();
550     active_peers.erase(choked_peer);
551   }
552
553   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
554   if (hasFinished()) {
555     Connection* remote_peer;
556     double unchoke_time = simgrid::s4u::Engine::get_clock() + 1;
557     for (auto const& kv : connected_peers) {
558       remote_peer = kv.second;
559       if (remote_peer->last_unchoke < unchoke_time && remote_peer->interested && remote_peer->choked_upload) {
560         unchoke_time = remote_peer->last_unchoke;
561         chosen_peer  = remote_peer;
562       }
563     }
564   } else {
565     // Random optimistic unchoking
566     if (round_ == 0) {
567       int j = 0;
568       do {
569         // We choose a random peer to unchoke.
570         std::unordered_map<int, Connection*>::iterator chosen_peer_it = connected_peers.begin();
571         std::advance(chosen_peer_it, RngStream_RandInt(stream, 0, connected_peers.size() - 1));
572         chosen_peer = chosen_peer_it->second;
573         if (chosen_peer == nullptr)
574           THROWF(unknown_error, 0, "A peer should have be selected at this point");
575         else if (not chosen_peer->interested || not chosen_peer->choked_upload)
576           chosen_peer = nullptr;
577         else
578           XBT_DEBUG("Nothing to do, keep going");
579         j++;
580       } while (chosen_peer == nullptr && j < MAXIMUM_PEERS);
581     } else {
582       // Use the "fastest download" policy.
583       double fastest_speed = 0.0;
584       for (auto const& kv : connected_peers) {
585         Connection* remote_peer = kv.second;
586         if (remote_peer->peer_speed > fastest_speed && remote_peer->choked_upload && remote_peer->interested) {
587           chosen_peer   = remote_peer;
588           fastest_speed = remote_peer->peer_speed;
589         }
590       }
591     }
592   }
593
594   if (chosen_peer != nullptr)
595     XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", id, chosen_peer->id,
596               chosen_peer->interested, chosen_peer->choked_upload);
597
598   if (choked_peer != chosen_peer) {
599     if (choked_peer != nullptr) {
600       xbt_assert(not choked_peer->choked_upload, "Tries to choked a choked peer");
601       choked_peer->choked_upload = true;
602       updateActivePeersSet(choked_peer);
603       XBT_DEBUG("(%d) Sending a CHOKE to %d", id, choked_peer->id);
604       sendMessage(choked_peer->mailbox_, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE);
605     }
606     if (chosen_peer != nullptr) {
607       xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
608       chosen_peer->choked_upload = false;
609       active_peers.insert(chosen_peer);
610       chosen_peer->last_unchoke = simgrid::s4u::Engine::get_clock();
611       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id);
612       updateActivePeersSet(chosen_peer);
613       sendMessage(chosen_peer->mailbox_, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE);
614     }
615   }
616 }
617
618 /** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.*/
619 void Peer::updateInterestedAfterReceive()
620 {
621   for (auto const& kv : connected_peers) {
622     Connection* remote_peer = kv.second;
623     if (remote_peer->am_interested) {
624       bool interested = false;
625       // Check if the peer still has a piece we want.
626       for (unsigned int i = 0; i < FILE_PIECES; i++)
627         if (hasNotPiece(i) && remote_peer->hasPiece(i)) {
628           interested = true;
629           break;
630         }
631
632       if (not interested) { // no more piece to download from connection
633         remote_peer->am_interested = false;
634         sendMessage(remote_peer->mailbox_, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE);
635       }
636     }
637   }
638 }
639
640 void Peer::updateBitfieldBlocks(int piece, int block_index, int block_length)
641 {
642   xbt_assert((piece >= 0 && static_cast<unsigned int>(piece) <= FILE_PIECES), "Wrong piece.");
643   xbt_assert((block_index >= 0 && static_cast<unsigned int>(block_index) <= PIECES_BLOCKS), "Wrong block : %d.",
644              block_index);
645   for (int i = block_index; i < (block_index + block_length); i++)
646     bitfield_blocks |= (1ULL << static_cast<unsigned int>(piece * PIECES_BLOCKS + i));
647 }
648
649 bool Peer::hasCompletedPiece(unsigned int piece)
650 {
651   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
652     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
653       return false;
654   return true;
655 }
656
657 int Peer::getFirstMissingBlockFrom(int piece)
658 {
659   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
660     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
661       return i;
662   return -1;
663 }
664
665 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
666 int Peer::partiallyDownloadedPiece(Connection* remote_peer)
667 {
668   for (unsigned int i = 0; i < FILE_PIECES; i++)
669     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i) && getFirstMissingBlockFrom(i) > 0)
670       return i;
671   return -1;
672 }