Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
mv MSG version of bittorrent to teshsuite
[simgrid.git] / examples / s4u / app-bittorrent / s4u_peer.cpp
1 /* Copyright (c) 2012-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <climits>
7 #include <xbt/ex.hpp>
8
9 #include "s4u_peer.hpp"
10 #include "s4u_tracker.hpp"
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_bt_peer, "Messages specific for the peers");
13
14 /*
15  * User parameters for transferred file data. For the test, the default values are :
16  * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
17  */
18 #define FILE_PIECES 10UL
19 #define PIECES_BLOCKS 5UL
20 #define BLOCK_SIZE 16384
21 static const unsigned long int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
22
23 /** Number of blocks asked by each request */
24 #define BLOCKS_REQUESTED 2
25
26 #define ENABLE_END_GAME_MODE 1
27 #define SLEEP_DURATION 1
28 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
29
30 Peer::Peer(std::vector<std::string> args)
31 {
32   // Check arguments
33   xbt_assert(args.size() == 3 || args.size() == 4, "Wrong number of arguments");
34   try {
35     id       = std::stoi(args[1]);
36     mailbox_ = simgrid::s4u::Mailbox::byName(std::to_string(id));
37   } catch (std::invalid_argument& ia) {
38     throw std::invalid_argument(std::string("Invalid ID:") + args[1].c_str());
39   }
40
41   try {
42     deadline = std::stod(args[2]);
43   } catch (std::invalid_argument& ia) {
44     throw std::invalid_argument(std::string("Invalid deadline:") + args[2].c_str());
45   }
46   xbt_assert(deadline > 0, "Wrong deadline supplied");
47
48   stream = simgrid::s4u::this_actor::getHost()->extension<HostBittorrent>()->getStream();
49
50   if (args.size() == 4 && args[3] == "1") {
51     bitfield_       = (1U << FILE_PIECES) - 1U;
52     bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
53   }
54   pieces_count = new short[FILE_PIECES]{0};
55
56   XBT_INFO("Hi, I'm joining the network with id %d", id);
57 }
58
59 Peer::~Peer()
60 {
61   for (auto peer : connected_peers)
62     delete peer.second;
63   delete[] pieces_count;
64 }
65
66 /** Peer main function */
67 void Peer::operator()()
68 {
69   // Getting peer data from the tracker.
70   if (getPeersFromTracker()) {
71     XBT_DEBUG("Got %zu peers from the tracker. Current status is: %s", connected_peers.size(), getStatus().c_str());
72     begin_receive_time = simgrid::s4u::Engine::getClock();
73     mailbox_->setReceiver(simgrid::s4u::Actor::self());
74     if (hasFinished()) {
75       sendHandshakeToAllPeers();
76     } else {
77       leech();
78     }
79     seed();
80   } else {
81     XBT_INFO("Couldn't contact the tracker.");
82   }
83
84   XBT_INFO("Here is my current status: %s", getStatus().c_str());
85 }
86
87 bool Peer::getPeersFromTracker()
88 {
89   simgrid::s4u::MailboxPtr tracker_mailbox = simgrid::s4u::Mailbox::byName(TRACKER_MAILBOX);
90   // Build the task to send to the tracker
91   TrackerQuery* peer_request = new TrackerQuery(id, mailbox_, 0, 0, FILE_SIZE);
92   try {
93     XBT_DEBUG("Sending a peer request to the tracker.");
94     tracker_mailbox->put(peer_request, TRACKER_COMM_SIZE, GET_PEERS_TIMEOUT);
95   } catch (xbt_ex& e) {
96     if (e.category == timeout_error) {
97       XBT_DEBUG("Timeout expired when requesting peers to tracker");
98       delete peer_request;
99       return false;
100     }
101   }
102
103   try {
104     TrackerAnswer* answer = static_cast<TrackerAnswer*>(mailbox_->get(GET_PEERS_TIMEOUT));
105     // Add the peers the tracker gave us to our peer list.
106     for (auto peer_id : *answer->getPeers())
107       if (id != peer_id)
108         connected_peers[peer_id] = new Connection(peer_id);
109     delete answer;
110   } catch (xbt_ex& e) {
111     if (e.category == timeout_error) {
112       XBT_DEBUG("Timeout expired when requesting peers to tracker");
113       return false;
114     }
115   }
116   return true;
117 }
118
119 void Peer::sendHandshakeToAllPeers()
120 {
121   for (auto kv : connected_peers) {
122     Connection* remote_peer = kv.second;
123     Message* handshake      = new Message(MESSAGE_HANDSHAKE, id, mailbox_);
124     remote_peer->mailbox_->put_init(handshake, MESSAGE_HANDSHAKE_SIZE)->detach();
125     XBT_DEBUG("Sending a HANDSHAKE to %d", remote_peer->id);
126   }
127 }
128
129 void Peer::sendHandshake(simgrid::s4u::MailboxPtr mailbox)
130 {
131   XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox->getName());
132   mailbox->put_init(new Message(MESSAGE_HANDSHAKE, id, mailbox_), MESSAGE_HANDSHAKE_SIZE)->detach();
133 }
134
135 void Peer::sendBitfield(simgrid::s4u::MailboxPtr mailbox)
136 {
137   XBT_DEBUG("Sending a BITFIELD to %s", mailbox->getName());
138   mailbox
139       ->put_init(new Message(MESSAGE_BITFIELD, id, bitfield_, mailbox_),
140                  MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES))
141       ->detach();
142 }
143
144 void Peer::sendInterested(simgrid::s4u::MailboxPtr mailbox)
145 {
146   XBT_DEBUG("Sending INTERESTED to %s", mailbox->getName());
147   mailbox->put_init(new Message(MESSAGE_INTERESTED, id, bitfield_, mailbox_), MESSAGE_INTERESTED_SIZE)->detach();
148 }
149
150 void Peer::sendNotInterested(simgrid::s4u::MailboxPtr mailbox)
151 {
152   XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox->getName());
153   mailbox->put_init(new Message(MESSAGE_NOTINTERESTED, id, bitfield_, mailbox_), MESSAGE_NOTINTERESTED_SIZE)->detach();
154 }
155
156 void Peer::sendChoked(simgrid::s4u::MailboxPtr mailbox)
157 {
158   XBT_DEBUG("Sending CHOKE to %s", mailbox->getName());
159   mailbox->put_init(new Message(MESSAGE_CHOKE, id, mailbox_), MESSAGE_CHOKE_SIZE)->detach();
160 }
161
162 /** Send a "unchoked" message to a peer */
163 void Peer::sendUnchoked(simgrid::s4u::MailboxPtr mailbox)
164 {
165   XBT_DEBUG("Sending UNCHOKE to %s", mailbox->getName());
166   mailbox->put_init(new Message(MESSAGE_UNCHOKE, id, mailbox_), MESSAGE_UNCHOKE_SIZE)->detach();
167 }
168
169 void Peer::sendPiece(simgrid::s4u::MailboxPtr mailbox, unsigned int piece, int block_index, int block_length)
170 {
171   xbt_assert(!hasNotPiece(piece), "Tried to send a unavailable piece.");
172   XBT_DEBUG("Sending the PIECE %u (%d,%d) to %s", piece, block_index, block_length, mailbox->getName());
173   mailbox->put_init(new Message(MESSAGE_PIECE, id, mailbox_, piece, block_index, block_length), BLOCK_SIZE)->detach();
174 }
175
176 void Peer::sendRequest(simgrid::s4u::MailboxPtr mailbox, unsigned int piece, int block_index, int block_length)
177 {
178   XBT_DEBUG("Sending a REQUEST to %s for piece %u (%d,%d)", mailbox->getName(), piece, block_index, block_length);
179   mailbox->put_init(new Message(MESSAGE_REQUEST, id, mailbox_, piece, block_index, block_length), MESSAGE_REQUEST_SIZE)
180       ->detach();
181 }
182
183 void Peer::sendHaveToAllPeers(unsigned int piece)
184 {
185   XBT_DEBUG("Sending HAVE message to all my peers");
186   for (auto kv : connected_peers) {
187     Connection* remote_peer = kv.second;
188     remote_peer->mailbox_->put_init(new Message(MESSAGE_HAVE, id, mailbox_, piece), MESSAGE_HAVE_SIZE)->detach();
189   }
190 }
191
192 void Peer::sendRequestTo(Connection* remote_peer, unsigned int piece)
193 {
194   remote_peer->current_piece = piece;
195   xbt_assert(remote_peer->hasPiece(piece));
196   int block_index = getFirstMissingBlockFrom(piece);
197   if (block_index != -1) {
198     int block_length = MIN(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
199     sendRequest(remote_peer->mailbox_, piece, block_index, block_length);
200   }
201 }
202
203 std::string Peer::getStatus()
204 {
205   std::string res = std::string("");
206   for (int i = FILE_PIECES - 1; i >= 0; i--)
207     res = std::string((bitfield_ & (1U << i)) ? "1" : "0") + res;
208   return res;
209 }
210
211 bool Peer::hasFinished()
212 {
213   return bitfield_ == (1U << FILE_PIECES) - 1U;
214 }
215
216 /** Indicates if the remote peer has a piece not stored by the local peer */
217 bool Peer::isInterestedBy(Connection* remote_peer)
218 {
219   return remote_peer->bitfield & (bitfield_ ^ ((1 << FILE_PIECES) - 1));
220 }
221
222 void Peer::updatePiecesCountFromBitfield(unsigned int bitfield)
223 {
224   for (unsigned int i = 0; i < FILE_PIECES; i++)
225     if (bitfield & (1U << i))
226       pieces_count[i]++;
227 }
228
229 unsigned int Peer::countPieces(unsigned int bitfield)
230 {
231   unsigned int count = 0U;
232   unsigned int n     = bitfield;
233   while (n) {
234     count += n & 1U;
235     n >>= 1U;
236   }
237   return count;
238 }
239
240 int Peer::nbInterestedPeers()
241 {
242   int nb = 0;
243   for (auto kv : connected_peers)
244     if (kv.second->interested)
245       nb++;
246   return nb;
247 }
248
249 void Peer::leech()
250 {
251   double next_choked_update = simgrid::s4u::Engine::getClock() + UPDATE_CHOKED_INTERVAL;
252   XBT_DEBUG("Start downloading.");
253
254   /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
255   sendHandshakeToAllPeers();
256   XBT_DEBUG("Starting main leech loop listening on mailbox: %s", mailbox_->getName());
257
258   void* data = nullptr;
259   while (simgrid::s4u::Engine::getClock() < deadline && countPieces(bitfield_) < FILE_PIECES) {
260     if (comm_received == nullptr) {
261       comm_received = mailbox_->get_async(&data);
262     }
263     if (comm_received->test()) {
264       message = static_cast<Message*>(data);
265       handleMessage();
266       delete message;
267       comm_received = nullptr;
268     } else {
269       // We don't execute the choke algorithm if we don't already have a piece
270       if (simgrid::s4u::Engine::getClock() >= next_choked_update && countPieces(bitfield_) > 0) {
271         updateChokedPeers();
272         next_choked_update += UPDATE_CHOKED_INTERVAL;
273       } else {
274         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
275       }
276     }
277   }
278   if (hasFinished())
279     XBT_DEBUG("%d becomes a seeder", id);
280 }
281
282 void Peer::seed()
283 {
284   double next_choked_update = simgrid::s4u::Engine::getClock() + UPDATE_CHOKED_INTERVAL;
285   XBT_DEBUG("Start seeding.");
286   // start the main seed loop
287   void* data = nullptr;
288   while (simgrid::s4u::Engine::getClock() < deadline) {
289     if (comm_received == nullptr) {
290       comm_received = mailbox_->get_async(&data);
291     }
292     if (comm_received->test()) {
293       message = static_cast<Message*>(data);
294       handleMessage();
295       delete message;
296       comm_received = nullptr;
297     } else {
298       if (simgrid::s4u::Engine::getClock() >= next_choked_update) {
299         updateChokedPeers();
300         // TODO: Change the choked peer algorithm when seeding.
301         next_choked_update += UPDATE_CHOKED_INTERVAL;
302       } else {
303         simgrid::s4u::this_actor::sleep_for(SLEEP_DURATION);
304       }
305     }
306   }
307 }
308
309 void Peer::updateActivePeersSet(Connection* remote_peer)
310 {
311   if (remote_peer->interested && not remote_peer->choked_upload) {
312     // add in the active peers set
313     active_peers.insert(remote_peer);
314   } else if (active_peers.find(remote_peer) != active_peers.end()) {
315     active_peers.erase(remote_peer);
316   }
317 }
318
319 void Peer::handleMessage()
320 {
321   const char* type_names[10] = {"HANDSHAKE", "CHOKE",    "UNCHOKE", "INTERESTED", "NOTINTERESTED",
322                                 "HAVE",      "BITFIELD", "REQUEST", "PIECE",      "CANCEL"};
323
324   XBT_DEBUG("Received a %s message from %s", type_names[message->type], message->return_mailbox->getName());
325
326   auto known_peer         = connected_peers.find(message->peer_id);
327   Connection* remote_peer = (known_peer == connected_peers.end()) ? nullptr : known_peer->second;
328   switch (message->type) {
329     case MESSAGE_HANDSHAKE:
330       // Check if the peer is in our connection list.
331       if (remote_peer == nullptr) {
332         XBT_DEBUG("This peer %d was unknown, answer to its handshake", message->peer_id);
333         connected_peers[message->peer_id] = new Connection(message->peer_id);
334         sendHandshake(message->return_mailbox);
335       }
336       // Send our bitfield to the peer
337       sendBitfield(message->return_mailbox);
338       break;
339     case MESSAGE_BITFIELD:
340       // Update the pieces list
341       updatePiecesCountFromBitfield(message->bitfield);
342       // Store the bitfield
343       remote_peer->bitfield = message->bitfield;
344       xbt_assert(!remote_peer->am_interested, "Should not be interested at first");
345       if (isInterestedBy(remote_peer)) {
346         remote_peer->am_interested = true;
347         sendInterested(message->return_mailbox);
348       }
349       break;
350     case MESSAGE_INTERESTED:
351       xbt_assert((remote_peer != nullptr), "The impossible did happened: A not-in-our-list peer sent us a message.");
352       // Update the interested state of the peer.
353       remote_peer->interested = true;
354       updateActivePeersSet(remote_peer);
355       break;
356     case MESSAGE_NOTINTERESTED:
357       xbt_assert((remote_peer != nullptr), "The impossible did happened: A not-in-our-list peer sent us a message.");
358       remote_peer->interested = false;
359       updateActivePeersSet(remote_peer);
360       break;
361     case MESSAGE_UNCHOKE:
362       xbt_assert((remote_peer != nullptr), "The impossible did happened: A not-in-our-list peer sent us a message.");
363       xbt_assert(remote_peer->choked_download);
364       remote_peer->choked_download = false;
365       // Send requests to the peer, since it has unchoked us
366       // if (remote_peer->am_interested)
367       requestNewPieceTo(remote_peer);
368       break;
369     case MESSAGE_CHOKE:
370       xbt_assert((remote_peer != nullptr), "The impossible did happened: A not-in-our-list peer sent us a message.");
371       xbt_assert(not remote_peer->choked_download);
372       remote_peer->choked_download = true;
373       if (remote_peer->current_piece != -1)
374         removeCurrentPiece(remote_peer, remote_peer->current_piece);
375       break;
376     case MESSAGE_HAVE:
377       XBT_DEBUG("\t for piece %d", message->piece);
378       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
379                  "Wrong HAVE message received");
380       remote_peer->bitfield = remote_peer->bitfield | (1U << static_cast<unsigned int>(message->piece));
381       pieces_count[message->piece]++;
382       // If the piece is in our pieces, we tell the peer that we are interested.
383       if (not remote_peer->am_interested && hasNotPiece(message->piece)) {
384         remote_peer->am_interested = true;
385         sendInterested(message->return_mailbox);
386         if (not remote_peer->choked_download)
387           requestNewPieceTo(remote_peer);
388       }
389       break;
390     case MESSAGE_REQUEST:
391       xbt_assert(remote_peer->interested);
392       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
393                  "Wrong HAVE message received");
394       if (not remote_peer->choked_upload) {
395         XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
396                   message->block_index + message->block_length);
397         if (not hasNotPiece(message->piece)) {
398           sendPiece(message->return_mailbox, message->piece, message->block_index, message->block_length);
399         }
400       } else {
401         XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
402       }
403       break;
404     case MESSAGE_PIECE:
405       XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
406                 message->block_index + message->block_length);
407       xbt_assert(not remote_peer->choked_download);
408       xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE,
409                  "Can't received a piece if I'm not interested wihtout end-game mode!"
410                  "piece (%d) bitfield (%u) remote bitfield (%u)",
411                  message->piece, bitfield_, remote_peer->bitfield);
412       xbt_assert(not remote_peer->choked_download, "Can't received a piece if I'm choked !");
413       xbt_assert((message->piece >= 0 && static_cast<unsigned int>(message->piece) < FILE_PIECES),
414                  "Wrong piece received");
415       // TODO: Execute a computation.
416       if (hasNotPiece(static_cast<unsigned int>(message->piece))) {
417         updateBitfieldBlocks(message->piece, message->block_index, message->block_length);
418         if (hasCompletedPiece(static_cast<unsigned int>(message->piece))) {
419           // Removing the piece from our piece list
420           removeCurrentPiece(remote_peer, message->piece);
421           // Setting the fact that we have the piece
422           bitfield_ = bitfield_ | (1U << static_cast<unsigned int>(message->piece));
423           XBT_DEBUG("My status is now %s", getStatus().c_str());
424           // Sending the information to all the peers we are connected to
425           sendHaveToAllPeers(message->piece);
426           // sending UNINTERESTED to peers that do not have what we want.
427           updateInterestedAfterReceive();
428         } else {                                      // piece not completed
429           sendRequestTo(remote_peer, message->piece); // ask for the next block
430         }
431       } else {
432         XBT_DEBUG("However, we already have it");
433         xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
434         requestNewPieceTo(remote_peer);
435       }
436       break;
437     case MESSAGE_CANCEL:
438       break;
439     default:
440       THROW_IMPOSSIBLE;
441   }
442   // Update the peer speed.
443   if (remote_peer) {
444     remote_peer->addSpeedValue(1.0 / (simgrid::s4u::Engine::getClock() - begin_receive_time));
445   }
446   begin_receive_time = simgrid::s4u::Engine::getClock();
447 }
448
449 /** Selects the appropriate piece to download and requests it to the remote_peer */
450 void Peer::requestNewPieceTo(Connection* remote_peer)
451 {
452   int piece = selectPieceToDownload(remote_peer);
453   if (piece != -1) {
454     current_pieces |= (1U << (unsigned int)piece);
455     sendRequestTo(remote_peer, piece);
456   }
457 }
458
459 void Peer::removeCurrentPiece(Connection* remote_peer, unsigned int current_piece)
460 {
461   current_pieces &= ~(1U << current_piece);
462   remote_peer->current_piece = -1;
463 }
464
465 /** @brief Return the piece to be downloaded
466  * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
467  * If a piece is partially downloaded, this piece will be selected prioritarily
468  * If the peer has strictly less than 4 pieces, he chooses a piece at random.
469  * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
470  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
471  * @param remote_peer: information about the connection
472  * @return the piece to download if possible. -1 otherwise
473  */
474 int Peer::selectPieceToDownload(Connection* remote_peer)
475 {
476   int piece = partiallyDownloadedPiece(remote_peer);
477   // strict priority policy
478   if (piece != -1)
479     return piece;
480
481   // end game mode
482   if (countPieces(current_pieces) >= (FILE_PIECES - countPieces(bitfield_)) && isInterestedBy(remote_peer)) {
483 #if ENABLE_END_GAME_MODE == 0
484     return -1;
485 #endif
486     int nb_interesting_pieces = 0;
487     // compute the number of interesting pieces
488     for (unsigned int i = 0; i < FILE_PIECES; i++)
489       if (hasNotPiece(i) && remote_peer->hasPiece(i))
490         nb_interesting_pieces++;
491
492     xbt_assert(nb_interesting_pieces != 0);
493     // get a random interesting piece
494     int random_piece_index = RngStream_RandInt(stream, 0, nb_interesting_pieces - 1);
495     int current_index      = 0;
496     for (unsigned int i = 0; i < FILE_PIECES; i++) {
497       if (hasNotPiece(i) && remote_peer->hasPiece(i)) {
498         if (random_piece_index == current_index) {
499           piece = i;
500           break;
501         }
502         current_index++;
503       }
504     }
505     xbt_assert(piece != -1);
506     return piece;
507   }
508   // Random first policy
509   if (countPieces(bitfield_) < 4 && isInterestedByFree(remote_peer)) {
510     int nb_interesting_pieces = 0;
511     // compute the number of interesting pieces
512     for (unsigned int i = 0; i < FILE_PIECES; i++)
513       if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
514         nb_interesting_pieces++;
515     xbt_assert(nb_interesting_pieces != 0);
516     // get a random interesting piece
517     int random_piece_index = RngStream_RandInt(stream, 0, nb_interesting_pieces - 1);
518     int current_index      = 0;
519     for (unsigned int i = 0; i < FILE_PIECES; i++) {
520       if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
521         if (random_piece_index == current_index) {
522           piece = i;
523           break;
524         }
525         current_index++;
526       }
527     }
528     xbt_assert(piece != -1);
529     return piece;
530   } else { // Rarest first policy
531     short min         = SHRT_MAX;
532     int nb_min_pieces = 0;
533     int current_index = 0;
534     // compute the smallest number of copies of available pieces
535     for (unsigned int i = 0; i < FILE_PIECES; i++) {
536       if (pieces_count[i] < min)
537         if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
538           min = pieces_count[i];
539     }
540
541     xbt_assert(min != SHRT_MAX || not isInterestedByFree(remote_peer));
542     // compute the number of rarest pieces
543     for (unsigned int i = 0; i < FILE_PIECES; i++)
544       if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
545         nb_min_pieces++;
546
547     xbt_assert(nb_min_pieces != 0 || not isInterestedByFree(remote_peer));
548     // get a random rarest piece
549     int random_rarest_index = RngStream_RandInt(stream, 0, nb_min_pieces - 1);
550     for (unsigned int i = 0; i < FILE_PIECES; i++)
551       if (pieces_count[i] == min && hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i)) {
552         if (random_rarest_index == current_index) {
553           piece = i;
554           break;
555         }
556         current_index++;
557       }
558
559     xbt_assert(piece != -1 || not isInterestedByFree(remote_peer));
560     return piece;
561   }
562 }
563
564 void Peer::updateChokedPeers()
565 {
566   if (nbInterestedPeers() == 0)
567     return;
568   XBT_DEBUG("(%d) update_choked peers %zu active peers", id, active_peers.size());
569   // update the current round
570   round_                  = (round_ + 1) % 3;
571   Connection* chosen_peer = nullptr;
572   // select first active peer and remove it from the set
573   Connection* choked_peer = *(active_peers.begin());
574   active_peers.erase(choked_peer);
575
576   /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
577   if (hasFinished()) {
578     Connection* remote_peer;
579     double unchoke_time = simgrid::s4u::Engine::getClock() + 1;
580     for (auto kv : connected_peers) {
581       remote_peer = kv.second;
582       if (remote_peer->last_unchoke < unchoke_time && remote_peer->interested && remote_peer->choked_upload) {
583         unchoke_time = remote_peer->last_unchoke;
584         chosen_peer  = remote_peer;
585       }
586     }
587   } else {
588     // Random optimistic unchoking
589     if (round_ == 0) {
590       int j = 0;
591       do {
592         // We choose a random peer to unchoke.
593         std::unordered_map<int, Connection*>::iterator chosen_peer_it = connected_peers.begin();
594         std::advance(chosen_peer_it, RngStream_RandInt(stream, 0, connected_peers.size() - 1));
595         chosen_peer = chosen_peer_it->second;
596         if (chosen_peer == nullptr)
597           THROWF(unknown_error, 0, "A peer should have be selected at this point");
598         else if (not chosen_peer->interested || not chosen_peer->choked_upload)
599           chosen_peer = nullptr;
600         else
601           XBT_DEBUG("Nothing to do, keep going");
602         j++;
603       } while (chosen_peer == nullptr && j < MAXIMUM_PEERS);
604     } else {
605       // Use the "fastest download" policy.
606       double fastest_speed = 0.0;
607       for (auto kv : connected_peers) {
608         Connection* remote_peer = kv.second;
609         if (remote_peer->peer_speed > fastest_speed && remote_peer->choked_upload && remote_peer->interested) {
610           chosen_peer   = remote_peer;
611           fastest_speed = remote_peer->peer_speed;
612         }
613       }
614     }
615   }
616
617   if (chosen_peer != nullptr)
618     XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", id, chosen_peer->id,
619               chosen_peer->interested, chosen_peer->choked_upload);
620
621   if (choked_peer != chosen_peer) {
622     if (choked_peer != nullptr) {
623       xbt_assert((!choked_peer->choked_upload), "Tries to choked a choked peer");
624       choked_peer->choked_upload = true;
625       updateActivePeersSet(choked_peer);
626       XBT_DEBUG("(%d) Sending a CHOKE to %d", id, choked_peer->id);
627       sendChoked(choked_peer->mailbox_);
628     }
629     if (chosen_peer != nullptr) {
630       xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
631       chosen_peer->choked_upload = false;
632       active_peers.insert(chosen_peer);
633       chosen_peer->last_unchoke = simgrid::s4u::Engine::getClock();
634       XBT_DEBUG("(%d) Sending a UNCHOKE to %d", id, chosen_peer->id);
635       updateActivePeersSet(chosen_peer);
636       sendUnchoked(chosen_peer->mailbox_);
637     }
638   }
639 }
640
641 /** @brief Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want.*/
642 void Peer::updateInterestedAfterReceive()
643 {
644   for (auto kv : connected_peers) {
645     Connection* remote_peer = kv.second;
646     if (remote_peer->am_interested) {
647       bool interested = false;
648       // Check if the peer still has a piece we want.
649       for (unsigned int i = 0; i < FILE_PIECES; i++)
650         if (hasNotPiece(i) && remote_peer->hasPiece(i)) {
651           interested = true;
652           break;
653         }
654
655       if (not interested) { // no more piece to download from connection
656         remote_peer->am_interested = false;
657         sendNotInterested(remote_peer->mailbox_);
658       }
659     }
660   }
661 }
662
663 void Peer::updateBitfieldBlocks(int piece, int block_index, int block_length)
664 {
665   xbt_assert((piece >= 0 && static_cast<unsigned int>(piece) <= FILE_PIECES), "Wrong piece.");
666   xbt_assert((block_index >= 0 && static_cast<unsigned int>(block_index) <= PIECES_BLOCKS), "Wrong block : %d.",
667              block_index);
668   for (int i = block_index; i < (block_index + block_length); i++)
669     bitfield_blocks |= (1ULL << static_cast<unsigned int>(piece * PIECES_BLOCKS + i));
670 }
671
672 bool Peer::hasCompletedPiece(unsigned int piece)
673 {
674   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
675     if (!(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
676       return false;
677   return true;
678 }
679
680 int Peer::getFirstMissingBlockFrom(int piece)
681 {
682   for (unsigned int i = 0; i < PIECES_BLOCKS; i++)
683     if (!(bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i)))
684       return i;
685   return -1;
686 }
687
688 bool Peer::isInterestedByFree(Connection* remote_peer)
689 {
690   for (unsigned int i = 0; i < FILE_PIECES; i++)
691     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i))
692       return true;
693   return false;
694 }
695
696 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
697 int Peer::partiallyDownloadedPiece(Connection* remote_peer)
698 {
699   for (unsigned int i = 0; i < FILE_PIECES; i++)
700     if (hasNotPiece(i) && remote_peer->hasPiece(i) && isNotDownloadingPiece(i) && getFirstMissingBlockFrom(i) > 0)
701       return i;
702   return -1;
703 }