1 /* Copyright (c) 2012-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "bittorrent-peer.h"
8 #include <simgrid/forward.h>
11 #include <stdio.h> /* snprintf */
13 XBT_LOG_NEW_DEFAULT_CATEGORY(bittorrent_peers, "Messages specific for the peers");
16 * User parameters for transferred file data. For the test, the default values are :
17 * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
19 #define FILE_PIECES 10UL
20 #define PIECES_BLOCKS 5UL
21 #define BLOCK_SIZE 16384
23 /** Number of blocks asked by each request */
24 #define BLOCKS_REQUESTED 2UL
26 #define SLEEP_DURATION 1
27 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
29 const char* const message_type_names[10] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED",
30 "HAVE", "BITFIELD", "REQUEST", "PIECE", "CANCEL"};
33 #define MIN(a, b) ((a) < (b) ? (a) : (b))
36 static peer_t peer_init(int id, int seed)
38 peer_t peer = xbt_new(s_peer_t, 1);
41 char mailbox_name[MAILBOX_SIZE];
42 snprintf(mailbox_name, MAILBOX_SIZE - 1, "%d", id);
43 peer->mailbox = sg_mailbox_by_name(mailbox_name);
45 peer->connected_peers = xbt_dict_new_homogeneous(NULL);
46 peer->active_peers = xbt_dict_new_homogeneous(NULL);
49 peer->bitfield = (1U << FILE_PIECES) - 1U;
50 peer->bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
53 peer->bitfield_blocks = 0;
56 peer->current_pieces = 0;
57 peer->pieces_count = xbt_new0(short, FILE_PIECES);
58 peer->comm_received = NULL;
64 static void peer_free(peer_t peer)
67 connection_t connection;
68 xbt_dict_cursor_t cursor;
69 xbt_dict_foreach (peer->connected_peers, cursor, key, connection)
72 xbt_dict_free(&peer->connected_peers);
73 xbt_dict_free(&peer->active_peers);
74 xbt_free(peer->pieces_count);
78 /** Peer main function */
79 void peer(int argc, char* argv[])
82 xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
85 peer_t peer = peer_init((int)xbt_str_parse_int(argv[1], "Invalid ID: %s"), argc == 4 ? 1 : 0);
88 peer->deadline = xbt_str_parse_double(argv[2], "Invalid deadline: %s");
89 xbt_assert(peer->deadline > 0, "Wrong deadline supplied");
91 char* status = xbt_malloc0(FILE_PIECES + 1);
92 get_status(&status, peer->bitfield);
94 XBT_INFO("Hi, I'm joining the network with id %d", peer->id);
96 // Getting peer data from the tracker.
97 if (get_peers_from_tracker(peer)) {
98 XBT_DEBUG("Got %d peers from the tracker. Current status is: %s", xbt_dict_length(peer->connected_peers), status);
99 peer->begin_receive_time = simgrid_get_clock();
100 sg_mailbox_set_receiver(sg_mailbox_get_name(peer->mailbox));
102 if (has_finished(peer->bitfield)) {
103 send_handshake_to_all_peers(peer);
109 XBT_INFO("Couldn't contact the tracker.");
112 get_status(&status, peer->bitfield);
113 XBT_INFO("Here is my current status: %s", status);
114 if (peer->comm_received) {
115 sg_comm_unref(peer->comm_received);
122 /** @brief Retrieves the peer list from the tracker */
123 int get_peers_from_tracker(const_peer_t peer)
125 sg_mailbox_t tracker_mailbox = sg_mailbox_by_name(TRACKER_MAILBOX);
127 // Build the task to send to the tracker
128 tracker_query_t peer_request = tracker_query_new(peer->id, peer->mailbox);
130 XBT_DEBUG("Sending a peer request to the tracker.");
131 sg_comm_t request = sg_mailbox_put_async(tracker_mailbox, peer_request, TRACKER_COMM_SIZE);
132 sg_error_t res = sg_comm_wait_for(request, GET_PEERS_TIMEOUT);
134 if (res == SG_ERROR_TIMEOUT) {
135 XBT_DEBUG("Timeout expired when requesting peers to tracker");
136 xbt_free(peer_request);
140 void* message = NULL;
141 sg_comm_t comm_received = sg_mailbox_get_async(peer->mailbox, &message);
142 res = sg_comm_wait_for(comm_received, GET_PEERS_TIMEOUT);
144 const_tracker_answer_t ta = (const_tracker_answer_t)message;
145 // Add the peers the tracker gave us to our peer list.
148 // Add the peers the tracker gave us to our peer list.
149 xbt_dynar_foreach (ta->peers, i, peer_id) {
150 if (peer_id != peer->id)
151 xbt_dict_set_ext(peer->connected_peers, (char*)&peer_id, sizeof(int), connection_new(peer_id));
153 tracker_answer_free(message);
154 } else if (res == SG_ERROR_TIMEOUT) {
155 XBT_DEBUG("Timeout expired when requesting peers to tracker");
156 tracker_answer_free(message);
163 /** @brief Send a handshake message to all the peers the peer has. */
164 void send_handshake_to_all_peers(const_peer_t peer)
166 connection_t remote_peer;
167 xbt_dict_cursor_t cursor = NULL;
169 xbt_dict_foreach (peer->connected_peers, cursor, key, remote_peer) {
170 message_t handshake = message_new(MESSAGE_HANDSHAKE, peer->id, peer->mailbox);
171 sg_comm_t comm = sg_mailbox_put_init(remote_peer->mailbox, handshake, MESSAGE_HANDSHAKE_SIZE);
172 sg_comm_detach(comm, NULL);
173 XBT_DEBUG("Sending a HANDSHAKE to %s", sg_mailbox_get_name(remote_peer->mailbox));
177 void send_message(const_peer_t peer, sg_mailbox_t mailbox, e_message_type type, uint64_t size)
179 XBT_DEBUG("Sending %s to %s", message_type_names[type], sg_mailbox_get_name(mailbox));
180 message_t message = message_other_new(type, peer->id, peer->mailbox, peer->bitfield);
181 sg_comm_t comm = sg_mailbox_put_init(mailbox, message, size);
182 sg_comm_detach(comm, NULL);
185 /** @brief Send a bitfield message to all the peers the peer has */
186 void send_bitfield(const_peer_t peer, sg_mailbox_t mailbox)
188 XBT_DEBUG("Sending a BITFIELD to %s", sg_mailbox_get_name(mailbox));
189 message_t message = message_other_new(MESSAGE_BITFIELD, peer->id, peer->mailbox, peer->bitfield);
190 sg_comm_t comm = sg_mailbox_put_init(mailbox, message, MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES));
191 sg_comm_detach(comm, NULL);
194 /** Send a "piece" message to a pair, containing a piece of the file */
195 void send_piece(const_peer_t peer, sg_mailbox_t mailbox, int piece, int block_index, int block_length)
197 XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index, block_length, sg_mailbox_get_name(mailbox));
198 xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
199 xbt_assert(!peer_has_not_piece(peer, piece), "Tried to send a piece that we doesn't have.");
200 message_t message = message_piece_new(peer->id, peer->mailbox, piece, block_index, block_length);
201 sg_comm_t comm = sg_mailbox_put_init(mailbox, message, BLOCK_SIZE);
202 sg_comm_detach(comm, NULL);
205 /** Send a "HAVE" message to all peers we are connected to */
206 void send_have_to_all_peers(const_peer_t peer, int piece)
208 XBT_DEBUG("Sending HAVE message to all my peers");
209 connection_t remote_peer;
210 xbt_dict_cursor_t cursor = NULL;
212 xbt_dict_foreach (peer->connected_peers, cursor, key, remote_peer) {
213 message_t message = message_index_new(MESSAGE_HAVE, peer->id, peer->mailbox, piece);
214 sg_comm_t comm = sg_mailbox_put_init(remote_peer->mailbox, message, MESSAGE_HAVE_SIZE);
215 sg_comm_detach(comm, NULL);
219 /** @brief Send request messages to a peer that have unchoked us */
220 void send_request_to_peer(const_peer_t peer, connection_t remote_peer, int piece)
222 remote_peer->current_piece = piece;
223 xbt_assert(connection_has_piece(remote_peer, piece));
224 int block_index = get_first_missing_block_from(peer, piece);
225 if (block_index != -1) {
226 int block_length = MIN(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
227 XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", sg_mailbox_get_name(remote_peer->mailbox), piece,
228 block_index, block_length);
229 message_t message = message_request_new(peer->id, peer->mailbox, piece, block_index, block_length);
230 sg_comm_t comm = sg_mailbox_put_init(remote_peer->mailbox, message, MESSAGE_REQUEST_SIZE);
231 sg_comm_detach(comm, NULL);
235 void get_status(char** status, unsigned int bitfield)
237 for (int i = FILE_PIECES - 1; i >= 0; i--)
238 (*status)[i] = (bitfield & (1U << i)) ? '1' : '0';
239 (*status)[FILE_PIECES] = '\0';
242 int has_finished(unsigned int bitfield)
244 return bitfield == (1U << FILE_PIECES) - 1U;
247 /** Indicates if the remote peer has a piece not stored by the local peer */
248 int is_interested(const_peer_t peer, const_connection_t remote_peer)
250 return remote_peer->bitfield & (peer->bitfield ^ ((1 << FILE_PIECES) - 1));
253 /** Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer */
254 int is_interested_and_free(const_peer_t peer, const_connection_t remote_peer)
256 for (int i = 0; i < FILE_PIECES; i++)
257 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i))
262 /** @brief Updates the list of who has a piece from a bitfield */
263 void update_pieces_count_from_bitfield(const_peer_t peer, unsigned int bitfield)
265 for (int i = 0; i < FILE_PIECES; i++)
266 if (bitfield & (1U << i))
267 peer->pieces_count[i]++;
270 int count_pieces(unsigned int bitfield)
273 unsigned int n = bitfield;
281 int nb_interested_peers(const_peer_t peer)
283 xbt_dict_cursor_t cursor = NULL;
285 connection_t connection;
287 xbt_dict_foreach (peer->connected_peers, cursor, key, connection)
288 if (connection->interested)
294 /** @brief Peer main loop when it is leeching. */
295 void leech(peer_t peer)
297 double next_choked_update = simgrid_get_clock() + UPDATE_CHOKED_INTERVAL;
298 XBT_DEBUG("Start downloading.");
300 /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
301 send_handshake_to_all_peers(peer);
302 XBT_DEBUG("Starting main leech loop");
305 while (simgrid_get_clock() < peer->deadline && count_pieces(peer->bitfield) < FILE_PIECES) {
306 if (peer->comm_received == NULL)
307 peer->comm_received = sg_mailbox_get_async(peer->mailbox, &data);
309 if (sg_comm_test(peer->comm_received)) {
310 peer->message = (message_t)data;
311 handle_message(peer, peer->message);
312 xbt_free(peer->message);
313 peer->comm_received = NULL;
315 // We don't execute the choke algorithm if we don't already have a piece
316 if (simgrid_get_clock() >= next_choked_update && count_pieces(peer->bitfield) > 0) {
317 update_choked_peers(peer);
318 next_choked_update += UPDATE_CHOKED_INTERVAL;
320 sg_actor_sleep_for(SLEEP_DURATION);
324 if (has_finished(peer->bitfield))
325 XBT_DEBUG("%d becomes a seeder", peer->id);
328 /** @brief Peer main loop when it is seeding */
329 void seed(peer_t peer)
331 double next_choked_update = simgrid_get_clock() + UPDATE_CHOKED_INTERVAL;
332 XBT_DEBUG("Start seeding.");
333 // start the main seed loop
335 while (simgrid_get_clock() < peer->deadline) {
336 if (peer->comm_received == NULL)
337 peer->comm_received = sg_mailbox_get_async(peer->mailbox, &data);
339 if (sg_comm_test(peer->comm_received)) {
340 peer->message = (message_t)data;
341 handle_message(peer, peer->message);
342 xbt_free(peer->message);
343 peer->comm_received = NULL;
345 if (simgrid_get_clock() >= next_choked_update) {
346 update_choked_peers(peer);
347 // TODO: Change the choked peer algorithm when seeding.
348 next_choked_update += UPDATE_CHOKED_INTERVAL;
350 sg_actor_sleep_for(SLEEP_DURATION);
356 void update_active_peers_set(const s_peer_t* peer, connection_t remote_peer)
358 if ((remote_peer->interested != 0) && (remote_peer->choked_upload == 0)) {
359 // add in the active peers set
360 xbt_dict_set_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int), remote_peer);
361 } else if (xbt_dict_get_or_null_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int))) {
362 xbt_dict_remove_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int));
366 /** @brief Handle a received message sent by another peer */
367 void handle_message(peer_t peer, message_t message)
369 XBT_DEBUG("Received a %s message from %s", message_type_names[message->type],
370 sg_mailbox_get_name(message->return_mailbox));
372 connection_t remote_peer = xbt_dict_get_or_null_ext(peer->connected_peers, (char*)&message->peer_id, sizeof(int));
373 xbt_assert(remote_peer != NULL || message->type == MESSAGE_HANDSHAKE,
374 "The impossible did happened: A not-in-our-list peer sent us a message.");
376 switch (message->type) {
377 case MESSAGE_HANDSHAKE:
378 // Check if the peer is in our connection list.
379 if (remote_peer == 0) {
380 xbt_dict_set_ext(peer->connected_peers, (char*)&message->peer_id, sizeof(int),
381 connection_new(message->peer_id));
382 send_message(peer, message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE);
384 // Send our bitfield to the peer
385 send_bitfield(peer, message->return_mailbox);
387 case MESSAGE_BITFIELD:
388 // Update the pieces list
389 update_pieces_count_from_bitfield(peer, message->bitfield);
390 // Store the bitfield
391 remote_peer->bitfield = message->bitfield;
392 xbt_assert(!remote_peer->am_interested, "Should not be interested at first");
393 if (is_interested(peer, remote_peer)) {
394 remote_peer->am_interested = 1;
395 send_message(peer, message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
398 case MESSAGE_INTERESTED:
399 xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
400 // Update the interested state of the peer.
401 remote_peer->interested = 1;
402 update_active_peers_set(peer, remote_peer);
404 case MESSAGE_NOTINTERESTED:
405 xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
406 remote_peer->interested = 0;
407 update_active_peers_set(peer, remote_peer);
409 case MESSAGE_UNCHOKE:
410 xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
411 xbt_assert(remote_peer->choked_download);
412 remote_peer->choked_download = 0;
413 // Send requests to the peer, since it has unchoked us
414 if (remote_peer->am_interested)
415 request_new_piece_to_peer(peer, remote_peer);
418 xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
419 xbt_assert(!remote_peer->choked_download);
420 remote_peer->choked_download = 1;
421 if (remote_peer->current_piece != -1)
422 remove_current_piece(peer, remote_peer, remote_peer->current_piece);
425 XBT_DEBUG("\t for piece %d", message->piece);
426 xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong HAVE message received");
427 remote_peer->bitfield = remote_peer->bitfield | (1U << message->piece);
428 peer->pieces_count[message->piece]++;
429 // If the piece is in our pieces, we tell the peer that we are interested.
430 if ((remote_peer->am_interested == 0) && peer_has_not_piece(peer, message->piece)) {
431 remote_peer->am_interested = 1;
432 send_message(peer, message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
433 if (remote_peer->choked_download == 0)
434 request_new_piece_to_peer(peer, remote_peer);
437 case MESSAGE_REQUEST:
438 xbt_assert(remote_peer->interested);
439 xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong request received");
440 if (remote_peer->choked_upload == 0) {
441 XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
442 message->block_index + message->block_length);
443 if (!peer_has_not_piece(peer, message->piece)) {
444 send_piece(peer, message->return_mailbox, message->piece, message->block_index, message->block_length);
447 XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
451 XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
452 message->block_index + message->block_length);
453 xbt_assert(!remote_peer->choked_download);
454 xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
455 xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong piece received");
456 // TODO: Execute à computation.
457 if (peer_has_not_piece(peer, message->piece)) {
458 update_bitfield_blocks(peer, message->piece, message->block_index, message->block_length);
459 if (piece_complete(peer, message->piece)) {
460 // Removing the piece from our piece list
461 remove_current_piece(peer, remote_peer, message->piece);
462 // Setting the fact that we have the piece
463 peer->bitfield = peer->bitfield | (1U << message->piece);
464 char* status = xbt_malloc0(FILE_PIECES + 1);
465 get_status(&status, peer->bitfield);
466 XBT_DEBUG("My status is now %s", status);
468 // Sending the information to all the peers we are connected to
469 send_have_to_all_peers(peer, message->piece);
470 // sending UNINTERESTED to peers that do not have what we want.
471 update_interested_after_receive(peer);
472 } else { // piece not completed
473 send_request_to_peer(peer, remote_peer, message->piece); // ask for the next block
476 XBT_DEBUG("However, we already have it");
477 request_new_piece_to_peer(peer, remote_peer);
485 // Update the peer speed.
487 connection_add_speed_value(remote_peer, 1.0 / (simgrid_get_clock() - peer->begin_receive_time));
489 peer->begin_receive_time = simgrid_get_clock();
492 /** Selects the appropriate piece to download and requests it to the remote_peer */
493 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
495 int piece = select_piece_to_download(peer, remote_peer);
497 peer->current_pieces |= (1U << (unsigned int)piece);
498 send_request_to_peer(peer, remote_peer, piece);
502 /** remove current_piece from the list of currently downloaded pieces. */
503 void remove_current_piece(peer_t peer, connection_t remote_peer, unsigned int current_piece)
505 peer->current_pieces &= ~(1U << current_piece);
506 remote_peer->current_piece = -1;
509 /** @brief Return the piece to be downloaded
510 * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
511 * If a piece is partially downloaded, this piece will be selected prioritarily
512 * If the peer has strictly less than 4 pieces, he chooses a piece at random.
513 * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
514 * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
515 * @param peer: local peer
516 * @param remote_peer: information about the connection
517 * @return the piece to download if possible. -1 otherwise
519 int select_piece_to_download(const_peer_t peer, const_connection_t remote_peer)
521 int piece = partially_downloaded_piece(peer, remote_peer);
522 // strict priority policy
527 if (count_pieces(peer->current_pieces) >= (FILE_PIECES - count_pieces(peer->bitfield)) &&
528 (is_interested(peer, remote_peer) != 0)) {
529 int nb_interesting_pieces = 0;
530 // compute the number of interesting pieces
531 for (int i = 0; i < FILE_PIECES; i++) {
532 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i)) {
533 nb_interesting_pieces++;
536 xbt_assert(nb_interesting_pieces != 0);
537 // get a random interesting piece
538 int random_piece_index = rand() % nb_interesting_pieces;
539 int current_index = 0;
540 for (int i = 0; i < FILE_PIECES; i++) {
541 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i)) {
542 if (random_piece_index == current_index) {
549 xbt_assert(piece != -1);
552 // Random first policy
553 if (count_pieces(peer->bitfield) < 4 && (is_interested_and_free(peer, remote_peer) != 0)) {
554 int nb_interesting_pieces = 0;
555 // compute the number of interesting pieces
556 for (int i = 0; i < FILE_PIECES; i++) {
557 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
558 peer_is_not_downloading_piece(peer, i)) {
559 nb_interesting_pieces++;
562 xbt_assert(nb_interesting_pieces != 0);
563 // get a random interesting piece
564 int random_piece_index = rand() % nb_interesting_pieces;
565 int current_index = 0;
566 for (int i = 0; i < FILE_PIECES; i++) {
567 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
568 peer_is_not_downloading_piece(peer, i)) {
569 if (random_piece_index == current_index) {
576 xbt_assert(piece != -1);
578 } else { // Rarest first policy
579 short min = SHRT_MAX;
580 int nb_min_pieces = 0;
581 int current_index = 0;
582 // compute the smallest number of copies of available pieces
583 for (int i = 0; i < FILE_PIECES; i++) {
584 if (peer->pieces_count[i] < min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
585 peer_is_not_downloading_piece(peer, i))
586 min = peer->pieces_count[i];
588 xbt_assert(min != SHRT_MAX || (is_interested_and_free(peer, remote_peer) == 0));
589 // compute the number of rarest pieces
590 for (int i = 0; i < FILE_PIECES; i++) {
591 if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
592 peer_is_not_downloading_piece(peer, i))
595 xbt_assert(nb_min_pieces != 0 || (is_interested_and_free(peer, remote_peer) == 0));
596 // get a random rarest piece
597 int random_rarest_index = 0;
598 if (nb_min_pieces > 0) {
599 random_rarest_index = rand() % nb_min_pieces;
601 for (int i = 0; i < FILE_PIECES; i++) {
602 if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
603 peer_is_not_downloading_piece(peer, i)) {
604 if (random_rarest_index == current_index) {
611 xbt_assert(piece != -1 || (is_interested_and_free(peer, remote_peer) == 0));
616 /** Update the list of current choked and unchoked peers, using the choke algorithm */
617 void update_choked_peers(peer_t peer)
619 if (nb_interested_peers(peer) == 0)
621 XBT_DEBUG("(%d) update_choked peers %u active peers", peer->id, xbt_dict_size(peer->active_peers));
622 // update the current round
623 peer->round = (peer->round + 1) % 3;
625 char* choked_key = NULL;
626 connection_t chosen_peer = NULL;
627 connection_t choked_peer = NULL;
628 // remove a peer from the list
629 xbt_dict_cursor_t cursor = NULL;
630 xbt_dict_cursor_first(peer->active_peers, &cursor);
631 if (!xbt_dict_is_empty(peer->active_peers)) {
632 choked_key = xbt_dict_cursor_get_key(cursor);
633 choked_peer = xbt_dict_cursor_get_data(cursor);
635 xbt_dict_cursor_free(&cursor);
637 /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
638 if (has_finished(peer->bitfield)) {
639 connection_t connection;
640 double unchoke_time = simgrid_get_clock() + 1;
642 xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
643 if (connection->last_unchoke < unchoke_time && (connection->interested != 0) &&
644 (connection->choked_upload != 0)) {
645 unchoke_time = connection->last_unchoke;
646 chosen_peer = connection;
650 // Random optimistic unchoking
651 if (peer->round == 0) {
654 // We choose a random peer to unchoke.
656 if (xbt_dict_length(peer->connected_peers) > 0) {
657 id_chosen = rand() % xbt_dict_length(peer->connected_peers);
660 connection_t connection;
661 xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
662 if (i == id_chosen) {
663 chosen_peer = connection;
668 xbt_dict_cursor_free(&cursor);
669 xbt_assert(chosen_peer != NULL, "A peer should have been selected at this point");
670 if ((chosen_peer->interested == 0) || (chosen_peer->choked_upload == 0))
673 XBT_DEBUG("Nothing to do, keep going");
675 } while (chosen_peer == NULL && j < MAXIMUM_PEERS);
677 // Use the "fastest download" policy.
678 connection_t connection;
679 double fastest_speed = 0.0;
680 xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
681 if (connection->peer_speed > fastest_speed && (connection->choked_upload != 0) &&
682 (connection->interested != 0)) {
683 chosen_peer = connection;
684 fastest_speed = connection->peer_speed;
690 if (chosen_peer != NULL)
691 XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", peer->id, chosen_peer->id,
692 chosen_peer->interested, chosen_peer->choked_upload);
694 if (choked_peer != chosen_peer) {
695 if (choked_peer != NULL) {
696 xbt_assert((!choked_peer->choked_upload), "Tries to choked a choked peer");
697 choked_peer->choked_upload = 1;
698 xbt_assert((*((int*)choked_key) == choked_peer->id));
699 update_active_peers_set(peer, choked_peer);
700 XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, choked_peer->id);
701 send_message(peer, choked_peer->mailbox, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE);
703 if (chosen_peer != NULL) {
704 xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
705 chosen_peer->choked_upload = 0;
706 xbt_dict_set_ext(peer->active_peers, (char*)&chosen_peer->id, sizeof(int), chosen_peer);
707 chosen_peer->last_unchoke = simgrid_get_clock();
708 XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, chosen_peer->id);
709 update_active_peers_set(peer, chosen_peer);
710 send_message(peer, chosen_peer->mailbox, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE);
715 /** Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want. */
716 void update_interested_after_receive(const_peer_t peer)
719 xbt_dict_cursor_t cursor;
720 connection_t connection;
721 xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
722 if (connection->am_interested != 0) {
724 // Check if the peer still has a piece we want.
725 for (int i = 0; i < FILE_PIECES; i++) {
726 if (peer_has_not_piece(peer, i) && connection_has_piece(connection, i)) {
731 if (!interested) { // no more piece to download from connection
732 connection->am_interested = 0;
733 send_message(peer, connection->mailbox, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE);
739 void update_bitfield_blocks(peer_t peer, int index, int block_index, int block_length)
741 xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
742 xbt_assert((block_index >= 0 && block_index <= PIECES_BLOCKS), "Wrong block : %d.", block_index);
743 for (int i = block_index; i < (block_index + block_length); i++) {
744 peer->bitfield_blocks |= (1ULL << (unsigned int)(index * PIECES_BLOCKS + i));
748 /** Returns if a peer has completed the download of a piece */
749 int piece_complete(const_peer_t peer, int index)
751 for (int i = 0; i < PIECES_BLOCKS; i++) {
752 if (!(peer->bitfield_blocks & 1ULL << (index * PIECES_BLOCKS + i))) {
759 /** Returns the first block that a peer doesn't have in a piece. If the peer has all blocks of the piece, returns -1. */
760 int get_first_missing_block_from(const_peer_t peer, int piece)
762 for (int i = 0; i < PIECES_BLOCKS; i++) {
763 if (!(peer->bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i))) {
770 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
771 int partially_downloaded_piece(const_peer_t peer, const_connection_t remote_peer)
773 for (int i = 0; i < FILE_PIECES; i++) {
774 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i) &&
775 get_first_missing_block_from(peer, i) > 0)
781 int peer_has_not_piece(const_peer_t peer, unsigned int piece)
783 return !(peer->bitfield & 1U << piece);
786 /** Check that a piece is not currently being download by the peer. */
787 int peer_is_not_downloading_piece(const_peer_t peer, unsigned int piece)
789 return !(peer->current_pieces & 1U << piece);
792 /***************** Connection internal functions ***********************/
793 connection_t connection_new(int id)
795 connection_t connection = xbt_new(s_connection_t, 1);
796 char mailbox_name[MAILBOX_SIZE];
797 snprintf(mailbox_name, MAILBOX_SIZE - 1, "%d", id);
799 connection->mailbox = sg_mailbox_by_name(mailbox_name);
800 connection->bitfield = 0;
801 connection->peer_speed = 0;
802 connection->last_unchoke = 0;
803 connection->current_piece = -1;
804 connection->am_interested = 0;
805 connection->interested = 0;
806 connection->choked_upload = 1;
807 connection->choked_download = 1;
812 void connection_add_speed_value(connection_t connection, double speed)
814 connection->peer_speed = connection->peer_speed * 0.6 + speed * 0.4;
817 int connection_has_piece(const_connection_t connection, unsigned int piece)
819 return (connection->bitfield & 1U << piece);
822 /***************** Messages creation functions ***********************/
823 /** @brief Build a new empty message */
824 message_t message_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox)
826 message_t message = xbt_new(s_message_t, 1);
827 message->peer_id = peer_id;
828 message->return_mailbox = return_mailbox;
829 message->type = type;
833 /** Builds a message containing an index. */
834 message_t message_index_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, int index)
836 message_t message = message_new(type, peer_id, return_mailbox);
837 message->piece = index;
841 message_t message_other_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, unsigned int bitfield)
843 message_t message = message_new(type, peer_id, return_mailbox);
844 message->bitfield = bitfield;
848 message_t message_request_new(int peer_id, sg_mailbox_t return_mailbox, int piece, int block_index, int block_length)
850 message_t message = message_index_new(MESSAGE_REQUEST, peer_id, return_mailbox, piece);
851 message->block_index = block_index;
852 message->block_length = block_length;
856 message_t message_piece_new(int peer_id, sg_mailbox_t return_mailbox, int piece, int block_index, int block_length)
858 message_t message = message_index_new(MESSAGE_PIECE, peer_id, return_mailbox, piece);
859 message->block_index = block_index;
860 message->block_length = block_length;