Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of github.com:simgrid/simgrid into s_type_cleanup
[simgrid.git] / examples / s4u / app-bittorrent / s4u_peer.cpp
1 /* Copyright (c) 2012-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <climits>
7 #include <xbt/ex.hpp>
8
9 #include "s4u_peer.hpp"
10 #include "s4u_tracker.hpp"
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_bt_peer, "Messages specific for the peers");
13
14 /*
15  * User parameters for transferred file data. For the test, the default values are :
16  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
17  */
18 #define FILE_PIECES 10UL
19 #define PIECES_BLOCKS 5UL
20 #define BLOCK_SIZE 16384
21
22 /** Number of blocks asked by each request */
23 #define BLOCKS_REQUESTED 2
24
25 #define ENABLE_END_GAME_MODE 1
26 #define SLEEP_DURATION 1
27 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
28
29 Peer::Peer(std::vector<std::string> args)
30 {
31   // Check arguments
32   xbt_assert(args.size() == 3 || args.size() == 4, "Wrong number of arguments");
33   try {
34     id       = std::stoi(args[1]);
35     mailbox_ = simgrid::s4u::Mailbox::byName(std::to_string(id));
36   } catch (std::invalid_argument& ia) {
37     throw std::invalid_argument(std::string("Invalid ID:") + args[1].c_str());
38   }
39
40   try {
41     deadline = std::stod(args[2]);
42   } catch (std::invalid_argument& ia) {
43     throw std::invalid_argument(std::string("Invalid deadline:") + args[2].c_str());
44   }
45   xbt_assert(deadline > 0, "Wrong deadline supplied");
46
47   stream = simgrid::s4u::this_actor::getHost()->extension<HostBittorrent>()->getStream();
48
49   if (args.size() == 4 && args[3] == "1") {
50     bitfield_       = (1U << FILE_PIECES) - 1U;
51     bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
52   }
53   pieces_count = new short[FILE_PIECES]{0};
54
55   XBT_INFO("Hi, I'm joining the network with id %d", id);
56 }
57
58 Peer::~Peer()
59 {
60   for (auto peer : connected_peers)
61     delete peer.second;
62   delete[] pieces_count;
63 }
64
65 /** Peer main function */
66 void Peer::operator()()
67 {
68   // Getting peer data from the tracker.
69   if (getPeersFromTracker()) {
70     XBT_DEBUG("Got %zu peers from the tracker. Current status is: %s", connected_peers.size(), getStatus().c_str());
71     begin_receive_time = simgrid::s4u::Engine::getClock();
72     mailbox_->setReceiver(simgrid::s4u::Actor::self());
73     if (hasFinished()) {
74       sendHandshakeToAllPeers();
75     } else {
76       leech();
77     }
78     seed();
79   } else {
80     XBT_INFO("Couldn't contact the tracker.");
81   }
82
83   XBT_INFO("Here is my current status: %s", getStatus().c_str());
84 }
85
86 bool Peer::getPeersFromTracker()
87 {
88   simgrid::s4u::MailboxPtr tracker_mailbox = simgrid::s4u::Mailbox::byName(TRACKER_MAILBOX);
89   // Build the task to send to the tracker
90   TrackerQuery* peer_request = new TrackerQuery(id, mailbox_);
91   try {
92     XBT_DEBUG("Sending a peer request to the tracker.");
93     tracker_mailbox->put(peer_request, TRACKER_COMM_SIZE, GET_PEERS_TIMEOUT);
94   } catch (xbt_ex& e) {
95     if (e.category == timeout_error) {
96       XBT_DEBUG("Timeout expired when requesting peers to tracker");
97       delete peer_request;
98       return false;
99     }
100   }
101
102   try {
103     TrackerAnswer* answer = static_cast<TrackerAnswer*>(mailbox_->get(GET_PEERS_TIMEOUT));
104     // Add the peers the tracker gave us to our peer list.
105     for (auto peer_id : *answer->getPeers())
106       if (id != peer_id)
107         connected_peers[peer_id] = new Connection(peer_id);
108     delete answer;
109   } catch (xbt_ex& e) {
110     if (e.category == timeout_error) {
111       XBT_DEBUG("Timeout expired when requesting peers to tracker");
112       return false;
113     }
114   }
115   return true;
116 }
117
118 void Peer::sendHandshakeToAllPeers()
119 {
120   for (auto kv : connected_peers) {
121     Connection* remote_peer = kv.second;
122     Message* handshake      = new Message(MESSAGE_HANDSHAKE, id, mailbox_);
123     remote_peer->mailbox_->put_init(handshake, MESSAGE_HANDSHAKE_SIZE)->detach();
124     XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer->id);
125   }
126 }
127
128 void Peer::sendMessage(simgrid::s4u::MailboxPtr mailbox, e_message_type type, uint64_t size)
129 {
130   const char* type_names[6] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "CANCEL"};
131   XBT_DEBUG("Sending %s to %s", type_names[type], mailbox->getName());
132   mailbox->put_init(new Message(type, id, bitfield_, mailbox_), size)->detach();
133 }
134
135 void Peer::sendBitfield(simgrid::s4u::MailboxPtr mailbox)
136 {
137   XBT_DEBUG("Sending a BITFIELD to %s", mailbox->getName());
138   mailbox
139       ->put_init(new Message(MESSAGE_BITFIELD, id, bitfield_, mailbox_),
140                  MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES))
141       ->detach();
142 }
143
144 void Peer::sendPiece(simgrid::s4u::MailboxPtr mailbox, unsigned int piece, int block_index, int block_length)
145 {
146   xbt_assert(not hasNotPiece(piece), "Tried to send a unavailable piece.");
147   XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->getName());
148   mailbox->put_init(new Message(MESSAGE_PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)->detach();
149 }
150
151 void Peer::sendHaveToAllPeers(unsigned int piece)
152 {
153   XBT_DEBUG("Sending HAVE message to all my peers");
154   for (auto kv : connected_peers) {
155     Connection* remote_peer = kv.second;
156     remote_peer->mailbox_->put_init(new Message(MESSAGE_HAVE, id, mailbox_, piece), MESSAGE_HAVE_SIZE)->detach();
157   }
158 }
159
160 void Peer::sendRequestTo(Connection* remote_peer, unsigned int piece)
161 {
162   remote_peer->current_piece = piece;
163   xbt_assert(remote_peer->hasPiece(piece));
164   int block_index = getFirstMissingBlockFrom(piece);
165   if (block_index != -1) {
166     int block_length = MIN(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
167     XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", remote_peer->mailbox_->getName(), piece, block_index,
168               block_length);
169     remote_peer->mailbox_
170         ->put_init(new Message(MESSAGE_REQUEST, id, mailbox_, piece, block_index, block_length), MESSAGE_REQUEST_SIZE)
171         ->detach();
172   }
173 }
174
175 std::string Peer::getStatus()
176 {
177   std::string res = std::string("");
178   for (int i = FILE_PIECES - 1; i >= 0; i--)
179     res = std::string((bitfield_ & (1U << i)) ? "1" : "0") + res;
180   return res;
181 }
182
183 bool Peer::hasFinished()
184 {
185   return bitfield_ == (1U << FILE_PIECES) - 1U;
186 }
187
188 /** Indicates if the remote peer has a piece not stored by the local peer */
189 bool Peer::isInterestedBy(Connection* remote_peer)
190 {
191   return remote_peer->bitfield & (bitfield_ ^ ((1 << FILE_PIECES) - 1));
192 }
193
194 bool Peer::isInterestedByFree(Connection* remote_peer)
195 {
196   for (unsigned int i = 0; i < FILE_PIECES; i++)
197     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
198       return true;
199   return false;
200 }
201
202 void Peer::updatePiecesCountFromBitfield(unsigned int bitfield)
203 {
204   for (unsigned int i = 0; i < FILE_PIECES; i++)
205     if (bitfield & (1U << i))
206       pieces_count[i]++;
207 }
208
209 unsigned int Peer::countPieces(unsigned int bitfield)
210 {
211   unsigned int count = 0U;
212   unsigned int n     = bitfield;
213   while (n) {
214     count += n & 1U;
215     n >>= 1U;
216   }
217   return count;
218 }
219
220 int Peer::nbInterestedPeers()
221 {
222   int nb = 0;
223   for (auto kv : connected_peers)
224     if (kv.second->interested)
225       nb++;
226   return nb;
227 }
228
229 void Peer::leech()
230 {
231   double next_choked_update = simgrid::s4u::Engine::getClock() + UPDATE_CHOKED_INTERVAL;
232   XBT_DEBUG("Start downloading.");
233
234   /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
235   sendHandshakeToAllPeers();
236   XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->getName());
237
238   void* data = nullptr;
239   while (simgrid::s4u::Engine::getClock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
240     if (comm_received == nullptr) {
241       comm_received = mailbox_->get_async(&data);
242     }
243     if (comm_received->test()) {
244       message = static_cast<Message*>(data);
245       handleMessage();
246       delete message;
247       comm_received = nullptr;
248     } else {
249       // We don't execute the choke algorithm if we don't already have a piece
250       if (simgrid::s4u::Engine::getClock() >= next_choked_update && countPieces(bitfield_) > 0) {
251         updateChokedPeers();
252         next_choked_update += UPDATE_CHOKED_INTERVAL;
253       } else {
254         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
255       }
256     }
257   }
258   if (hasFinished())
259     XBT_DEBUG("%d becomes a seeder", id);
260 }
261
262 void Peer::seed()
263 {
264   double next_choked_update = simgrid::s4u::Engine::getClock() + UPDATE_CHOKED_INTERVAL;
265   XBT_DEBUG("Start seeding.");
266   // start the main seed loop
267   void* data = nullptr;
268   while (simgrid::s4u::Engine::getClock() < deadline) {
269     if (comm_received == nullptr) {
270       comm_received = mailbox_->get_async(&data);
271     }
272     if (comm_received->test()) {
273       message = static_cast<Message*>(data);
274       handleMessage();
275       delete message;
276       comm_received = nullptr;
277     } else {
278       if (simgrid::s4u::Engine::getClock() >= next_choked_update) {
279         updateChokedPeers();
280         // TODO: Change the choked peer algorithm when seeding.
281         next_choked_update += UPDATE_CHOKED_INTERVAL;
282       } else {
283         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
284       }
285     }
286   }
287 }
288
289 void Peer::updateActivePeersSet(Connection* remote_peer)
290 {
291   if (remote_peer->interested && not remote_peer->choked_upload)
292     active_peers.insert(remote_peer);
293   else
294     active_peers.erase(remote_peer);
295 }
296
297 void Peer::handleMessage()
298 {
299   const char* type_names[10] = {"HANDSHAKE", "CHOKE",    "UNCHOKE", "INTERESTED", "NOTINTERESTED",
300                                 "HAVE",      "BITFIELD", "REQUEST", "PIECE",      "CANCEL"};
301
302   XBT_DEBUG("Received a %s message from %s", type_names[message->type], message->return_mailbox->getName());
303
304   auto known_peer         = connected_peers.find(message->peer_id);
305   Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : known_peer->second;
306   xbt_assert(remote_peer != nullptr || message->type == MESSAGE_HANDSHAKE,
307              "The impossible did happened: A not-in-our-list peer sent us a message.");
308
309   switch (message->type) {
310     case MESSAGE_HANDSHAKE:
311       // Check if the peer is in our connection list.
312       if (remote_peer == nullptr) {
313         XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id);
314         connected_peers[message->peer_id] = new Connection(message->peer_id);
315         sendMessage(message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE);
316       }
317       // Send our bitfield to the peer
318       sendBitfield(message->return_mailbox);
319       break;
320     case MESSAGE_BITFIELD:
321       // Update the pieces list
322       updatePiecesCountFromBitfield(message->bitfield);
323       // Store the bitfield
324       remote_peer->bitfield = message->bitfield;
325       xbt_assert(not remote_peer->am_interested, "Should not be interested at first");
326       if (isInterestedBy(remote_peer)) {
327         remote_peer->am_interested = true;
328         sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
329       }
330       break;
331     case MESSAGE_INTERESTED:
332       // Update the interested state of the peer.
333       remote_peer->interested = true;
334       updateActivePeersSet(remote_peer);
335       break;
336     case MESSAGE_NOTINTERESTED:
337       remote_peer->interested = false;
338       updateActivePeersSet(remote_peer);
339       break;
340     case MESSAGE_UNCHOKE:
341       xbt_assert(remote_peer->choked_download);
342       remote_peer->choked_download = false;
343       // Send requests to the peer, since it has unchoked us
344       if (remote_peer->am_interested)
345         requestNewPieceTo(remote_peer);
346       break;
347     case MESSAGE_CHOKE:
348       xbt_assert(not remote_peer->choked_download);
349       remote_peer->choked_download = true;
350       if (remote_peer->current_piece != -1)
351         removeCurrentPiece(remote_peer, remote_peer->current_piece);
352       break;
353     case MESSAGE_HAVE:
354       XBT_DEBUG("\t for piece %d", message->piece);
355       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
356                  "Wrong HAVE message received");
357       remote_peer->bitfield = remote_peer->bitfield | (1U << static_cast<unsigned int>(message->piece));
358       pieces_count[message->piece]++;
359       // If the piece is in our pieces, we tell the peer that we are interested.
360       if (not remote_peer->am_interested && hasNotPiece(message->piece)) {
361         remote_peer->am_interested = true;
362         sendMessage(message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
363         if (not remote_peer->choked_download)
364           requestNewPieceTo(remote_peer);
365       }
366       break;
367     case MESSAGE_REQUEST:
368       xbt_assert(remote_peer->interested);
369       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
370                  "Wrong HAVE message received");
371       if (not remote_peer->choked_upload) {
372         XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
373                   message->block_index + message->block_length);
374         if (not hasNotPiece(message->piece)) {
375           sendPiece(message->return_mailbox, message->piece, message->block_index, message->block_length);
376         }
377       } else {
378         XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
379       }
380       break;
381     case MESSAGE_PIECE:
382       XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
383                 message->block_index + message->block_length);
384       xbt_assert(not remote_peer->choked_download);
385       xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE,
386                  "Can't received a piece if I'm not interested without end-game mode!"
387                  "piece (%d) bitfield (%u) remote bitfield (%u)",
388                  message->piece, bitfield_, remote_peer->bitfield);
389       xbt_assert(not remote_peer->choked_download, "Can't received a piece if I'm choked !");
390       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
391                  "Wrong piece received");
392       // TODO: Execute a computation.
393       if (hasNotPiece(static_cast<unsigned int>(message->piece))) {
394         updateBitfieldBlocks(message->piece, message->block_index, message->block_length);
395         if (hasCompletedPiece(static_cast<unsigned int>(message->piece))) {
396           // Removing the piece from our piece list
397           removeCurrentPiece(remote_peer, message->piece);
398           // Setting the fact that we have the piece
399           bitfield_ = bitfield_ | (1U << static_cast<unsigned int>(message->piece));
400           XBT_DEBUG("My status is now %s", getStatus().c_str());
401           // Sending the information to all the peers we are connected to
402           sendHaveToAllPeers(message->piece);
403           // sending UNINTERESTED to peers that do not have what we want.
404           updateInterestedAfterReceive();
405         } else {                                      // piece not completed
406           sendRequestTo(remote_peer, message->piece); // ask for the next block
407         }
408       } else {
409         XBT_DEBUG("However, we already have it");
410         xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
411         requestNewPieceTo(remote_peer);
412       }
413       break;
414     case MESSAGE_CANCEL:
415       break;
416     default:
417       THROW_IMPOSSIBLE;
418   }
419   // Update the peer speed.
420   if (remote_peer) {
421     remote_peer->addSpeedValue(1.0 / (simgrid::s4u::Engine::getClock() - begin_receive_time));
422   }
423   begin_receive_time = simgrid::s4u::Engine::getClock();
424 }
425
426 /** Selects the appropriate piece to download and requests it to the remote_peer */
427 void Peer::requestNewPieceTo(Connection* remote_peer)
428 {
429   int piece = selectPieceToDownload(remote_peer);
430   if (piece != -1) {
431     current_pieces |= (1U << (unsigned int)piece);
432     sendRequestTo(remote_peer, piece);
433   }
434 }
435
436 void Peer::removeCurrentPiece(Connection* remote_peer, unsigned int current_piece)
437 {
438   current_pieces &= ~(1U << current_piece);
439   remote_peer->current_piece = -1;
440 }
441
442 /** @brief Return the piece to be downloaded
443  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
444  * If a piece is partially downloaded, this piece will be selected prioritarily
445  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
446  * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
447  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
448  * @param remote_peer: information about the connection
449  * @return the piece to download if possible. -1 otherwise
450  */
451 int Peer::selectPieceToDownload(Connection* remote_peer)
452 {
453   int piece = partiallyDownloadedPiece(remote_peer);
454   // strict priority policy
455   if (piece != -1)
456     return piece;
457
458   // end game mode
459   if (countPieces(current_pieces) >= (FILE_PIECES - countPieces(bitfield_)) && isInterestedBy(remote_peer)) {
460 #if ENABLE_END_GAME_MODE == 0
461     return -1;
462 #endif
463     int nb_interesting_pieces = 0;
464     // compute the number of interesting pieces
465     for (unsigned int i = 0; i < FILE_PIECES; i++)
466       if (hasNotPiece(i) && remote_peer->hasPiece(i))
467         nb_interesting_pieces++;
468
469     xbt_assert(nb_interesting_pieces != 0);
470     // get a random interesting piece
471     int random_piece_index = RngStream_RandInt(stream, 0, nb_interesting_pieces - 1);
472     int current_index      = 0;
473     for (unsigned int i = 0; i < FILE_PIECES; i++) {
474       if (hasNotPiece(i) && remote_peer->hasPiece(i)) {
475         if (random_piece_index == current_index) {
476           piece = i;
477           break;
478         }
479         current_index++;
480       }
481     }
482     xbt_assert(piece != -1);
483     return piece;
484   }
485   // Random first policy
486   if (countPieces(bitfield_) < 4 && isInterestedByFree(remote_peer)) {
487     int nb_interesting_pieces = 0;
488     // compute the number of interesting pieces
489     for (unsigned int i = 0; i < FILE_PIECES; i++)
490       if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
491         nb_interesting_pieces++;
492     xbt_assert(nb_interesting_pieces != 0);
493     // get a random interesting piece
494     int random_piece_index = RngStream_RandInt(stream, 0, nb_interesting_pieces - 1);
495     int current_index      = 0;
496     for (unsigned int i = 0; i < FILE_PIECES; i++) {
497       if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
498         if (random_piece_index == current_index) {
499           piece = i;
500           break;
501         }
502         current_index++;
503       }
504     }
505     xbt_assert(piece != -1);
506     return piece;
507   } else { // Rarest first policy
508     short min         = SHRT_MAX;
509     int nb_min_pieces = 0;
510     int current_index = 0;
511     // compute the smallest number of copies of available pieces
512     for (unsigned int i = 0; i < FILE_PIECES; i++) {
513       if (pieces_count[i] < min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
514         min = pieces_count[i];
515     }
516
517     xbt_assert(min != SHRT_MAX || not isInterestedByFree(remote_peer));
518     // compute the number of rarest pieces
519     for (unsigned int i = 0; i < FILE_PIECES; i++)
520       if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
521         nb_min_pieces++;
522
523     xbt_assert(nb_min_pieces != 0 || not isInterestedByFree(remote_peer));
524     // get a random rarest piece
525     int random_rarest_index = RngStream_RandInt(stream, 0, nb_min_pieces - 1);
526     for (unsigned int i = 0; i < FILE_PIECES; i++)
527       if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
528         if (random_rarest_index == current_index) {
529           piece = i;
530           break;
531         }
532         current_index++;
533       }
534
535     xbt_assert(piece != -1 || not isInterestedByFree(remote_peer));
536     return piece;
537   }
538 }
539
540 void Peer::updateChokedPeers()
541 {
542   if (nbInterestedPeers() == 0)
543     return;
544   XBT_DEBUG("(%d) update_choked peers %zu active peers", id, active_peers.size());
545   // update the current round
546   round_                  = (round_ + 1) % 3;
547   Connection* chosen_peer = nullptr;
548   // select first active peer and remove it from the set
549   Connection* choked_peer = *(active_peers.begin());
550   active_peers.erase(choked_peer);
551
552   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
553   if (hasFinished()) {
554     Connection* remote_peer;
555     double unchoke_time = simgrid::s4u::Engine::getClock() + 1;
556     for (auto kv : connected_peers) {
557       remote_peer = kv.second;
558       if (remote_peer->last_unchoke < unchoke_time && remote_peer->interested && remote_peer->choked_upload) {
559         unchoke_time = remote_peer->last_unchoke;
560         chosen_peer  = remote_peer;
561       }
562     }
563   } else {
564     // Random optimistic unchoking
565     if (round_ == 0) {
566       int j = 0;
567       do {
568         // We choose a random peer to unchoke.
569         std::unordered_map<int, Connection*>::iterator chosen_peer_it = connected_peers.begin();
570         std::advance(chosen_peer_it, RngStream_RandInt(stream, 0, connected_peers.size() - 1));
571         chosen_peer = chosen_peer_it->second;
572         if (chosen_peer == nullptr)
573           THROWF(unknown_error, 0, "A peer should have be selected at this point");
574         else if (not chosen_peer->interested || not chosen_peer->choked_upload)
575           chosen_peer = nullptr;
576         else
577           XBT_DEBUG("Nothing to do, keep going");
578         j++;
579       } while (chosen_peer == nullptr && j < MAXIMUM_PEERS);
580     } else {
581       // Use the "fastest download" policy.
582       double fastest_speed = 0.0;
583       for (auto kv : connected_peers) {
584         Connection* remote_peer = kv.second;
585         if (remote_peer->peer_speed > fastest_speed && remote_peer->choked_upload && remote_peer->interested) {
586           chosen_peer   = remote_peer;
587           fastest_speed = remote_peer->peer_speed;
588         }
589       }
590     }
591   }
592
593   if (chosen_peer != nullptr)
594     XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", id, chosen_peer->id,
595               chosen_peer->interested, chosen_peer->choked_upload);
596
597   if (choked_peer != chosen_peer) {
598     if (choked_peer != nullptr) {
599       xbt_assert(not choked_peer->choked_upload, "Tries to choked a choked peer");
600       choked_peer->choked_upload = true;
601       updateActivePeersSet(choked_peer);
602       XBT_DEBUG("(%d) Sending a CHOKE to %d", id, choked_peer->id);
603       sendMessage(choked_peer->mailbox_, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE);
604     }
605     if (chosen_peer != nullptr) {
606       xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
607       chosen_peer->choked_upload = false;
608       active_peers.insert(chosen_peer);
609       chosen_peer->last_unchoke = simgrid::s4u::Engine::getClock();
610       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id);
611       updateActivePeersSet(chosen_peer);
612       sendMessage(chosen_peer->mailbox_, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE);
613     }
614   }
615 }
616
617 /** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.*/
618 void Peer::updateInterestedAfterReceive()
619 {
620   for (auto kv : connected_peers) {
621     Connection* remote_peer = kv.second;
622     if (remote_peer->am_interested) {
623       bool interested = false;
624       // Check if the peer still has a piece we want.
625       for (unsigned int i = 0; i < FILE_PIECES; i++)
626         if (hasNotPiece(i) && remote_peer->hasPiece(i)) {
627           interested = true;
628           break;
629         }
630
631       if (not interested) { // no more piece to download from connection
632         remote_peer->am_interested = false;
633         sendMessage(remote_peer->mailbox_, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE);
634       }
635     }
636   }
637 }
638
639 void Peer::updateBitfieldBlocks(int piece, int block_index, int block_length)
640 {
641   xbt_assert((piece >= 0 && static_cast<unsigned int>(piece) <= FILE_PIECES), "Wrong piece.");
642   xbt_assert((block_index >= 0 && static_cast<unsigned int>(block_index) <= PIECES_BLOCKS), "Wrong block : %d.",
643              block_index);
644   for (int i = block_index; i < (block_index + block_length); i++)
645     bitfield_blocks |= (1ULL << static_cast<unsigned int>(piece * PIECES_BLOCKS + i));
646 }
647
648 bool Peer::hasCompletedPiece(unsigned int piece)
649 {
650   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
651     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
652       return false;
653   return true;
654 }
655
656 int Peer::getFirstMissingBlockFrom(int piece)
657 {
658   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
659     if (not(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
660       return i;
661   return -1;
662 }
663
664 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
665 int Peer::partiallyDownloadedPiece(Connection* remote_peer)
666 {
667   for (unsigned int i = 0; i < FILE_PIECES; i++)
668     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i) && getFirstMissingBlockFrom(i) > 0)
669       return i;
670   return -1;
671 }