1 /* Copyright (c) 2012-2020. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "bittorrent-peer.h"
8 #include <simgrid/forward.h>
11 #include <stdio.h> /* snprintf */
13 XBT_LOG_NEW_DEFAULT_CATEGORY(bittorrent_peers, "Messages specific for the peers");
16 * User parameters for transferred file data. For the test, the default values are :
17 * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
19 #define FILE_PIECES 10UL
20 #define PIECES_BLOCKS 5UL
21 #define BLOCK_SIZE 16384
23 /** Number of blocks asked by each request */
24 #define BLOCKS_REQUESTED 2UL
26 #define SLEEP_DURATION 1
27 #define BITS_TO_BYTES(x) (((x) / 8 + (x) % 8) ? 1 : 0)
30 #define MIN(a, b) ((a) < (b) ? (a) : (b))
33 static peer_t peer_init(int id, int seed)
35 peer_t peer = xbt_new(s_peer_t, 1);
38 char mailbox_name[MAILBOX_SIZE];
39 snprintf(mailbox_name, MAILBOX_SIZE - 1, "%d", id);
40 peer->mailbox = sg_mailbox_by_name(mailbox_name);
42 peer->connected_peers = xbt_dict_new_homogeneous(NULL);
43 peer->active_peers = xbt_dict_new_homogeneous(NULL);
46 peer->bitfield = (1U << FILE_PIECES) - 1U;
47 peer->bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
50 peer->bitfield_blocks = 0;
53 peer->current_pieces = 0;
54 peer->pieces_count = xbt_new0(short, FILE_PIECES);
55 peer->comm_received = NULL;
61 static void peer_free(peer_t peer)
64 connection_t connection;
65 xbt_dict_cursor_t cursor;
66 xbt_dict_foreach (peer->connected_peers, cursor, key, connection)
69 xbt_dict_free(&peer->connected_peers);
70 xbt_dict_free(&peer->active_peers);
71 xbt_free(peer->pieces_count);
75 /** Peer main function */
76 void peer(int argc, char* argv[])
79 xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
82 peer_t peer = peer_init(xbt_str_parse_int(argv[1], "Invalid ID: %s"), argc == 4 ? 1 : 0);
85 peer->deadline = xbt_str_parse_double(argv[2], "Invalid deadline: %s");
86 xbt_assert(peer->deadline > 0, "Wrong deadline supplied");
88 char* status = xbt_malloc0(FILE_PIECES + 1);
89 get_status(&status, peer->bitfield);
91 XBT_INFO("Hi, I'm joining the network with id %d", peer->id);
93 // Getting peer data from the tracker.
94 if (get_peers_from_tracker(peer)) {
95 XBT_DEBUG("Got %d peers from the tracker. Current status is: %s", xbt_dict_length(peer->connected_peers), status);
96 peer->begin_receive_time = simgrid_get_clock();
97 sg_mailbox_set_receiver(sg_mailbox_get_name(peer->mailbox));
99 if (has_finished(peer->bitfield)) {
100 send_handshake_to_all_peers(peer);
106 XBT_INFO("Couldn't contact the tracker.");
109 get_status(&status, peer->bitfield);
110 XBT_INFO("Here is my current status: %s", status);
111 if (peer->comm_received) {
112 sg_comm_unref(peer->comm_received);
119 /** @brief Retrieves the peer list from the tracker */
120 int get_peers_from_tracker(const_peer_t peer)
122 sg_mailbox_t tracker_mailbox = sg_mailbox_by_name(TRACKER_MAILBOX);
124 // Build the task to send to the tracker
125 tracker_query_t peer_request = tracker_query_new(peer->id, peer->mailbox);
127 XBT_DEBUG("Sending a peer request to the tracker.");
128 sg_comm_t request = sg_mailbox_put_async(tracker_mailbox, peer_request, TRACKER_COMM_SIZE);
129 sg_error_t res = sg_comm_wait_for(request, GET_PEERS_TIMEOUT);
131 if (res == SG_ERROR_TIMEOUT) {
132 XBT_DEBUG("Timeout expired when requesting peers to tracker");
133 xbt_free(peer_request);
137 void* message = NULL;
138 sg_comm_t comm_received = sg_mailbox_get_async(peer->mailbox, &message);
139 res = sg_comm_wait_for(comm_received, GET_PEERS_TIMEOUT);
141 const_tracker_answer_t ta = (const_tracker_answer_t)message;
142 // Add the peers the tracker gave us to our peer list.
145 // Add the peers the tracker gave us to our peer list.
146 xbt_dynar_foreach (ta->peers, i, peer_id) {
147 if (peer_id != peer->id)
148 xbt_dict_set_ext(peer->connected_peers, (char*)&peer_id, sizeof(int), connection_new(peer_id));
150 tracker_answer_free(message);
151 } else if (res == SG_ERROR_TIMEOUT) {
152 XBT_DEBUG("Timeout expired when requesting peers to tracker");
153 tracker_answer_free(message);
160 /** @brief Send a handshake message to all the peers the peer has. */
161 void send_handshake_to_all_peers(const_peer_t peer)
163 connection_t remote_peer;
164 xbt_dict_cursor_t cursor = NULL;
166 xbt_dict_foreach (peer->connected_peers, cursor, key, remote_peer) {
167 message_t handshake = message_new(MESSAGE_HANDSHAKE, peer->id, peer->mailbox);
168 sg_comm_t comm = sg_mailbox_put_init(remote_peer->mailbox, handshake, MESSAGE_HANDSHAKE_SIZE);
169 sg_comm_detach(comm, NULL);
170 XBT_DEBUG("Sending a HANDSHAKE to %s", sg_mailbox_get_name(remote_peer->mailbox));
174 void send_message(const_peer_t peer, sg_mailbox_t mailbox, e_message_type type, uint64_t size)
176 const char* type_names[6] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED", "CANCEL"};
177 XBT_DEBUG("Sending %s to %s", type_names[type], sg_mailbox_get_name(mailbox));
178 message_t message = message_other_new(type, peer->id, peer->mailbox, peer->bitfield);
179 sg_comm_t comm = sg_mailbox_put_init(mailbox, message, size);
180 sg_comm_detach(comm, NULL);
183 /** @brief Send a bitfield message to all the peers the peer has */
184 void send_bitfield(const_peer_t peer, sg_mailbox_t mailbox)
186 XBT_DEBUG("Sending a BITFIELD to %s", sg_mailbox_get_name(mailbox));
187 message_t message = message_other_new(MESSAGE_BITFIELD, peer->id, peer->mailbox, peer->bitfield);
188 sg_comm_t comm = sg_mailbox_put_init(mailbox, message, MESSAGE_BITFIELD_SIZE + BITS_TO_BYTES(FILE_PIECES));
189 sg_comm_detach(comm, NULL);
192 /** Send a "piece" message to a pair, containing a piece of the file */
193 void send_piece(const_peer_t peer, sg_mailbox_t mailbox, int piece, int block_index, int block_length)
195 XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index, block_length, sg_mailbox_get_name(mailbox));
196 xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
197 xbt_assert(!peer_has_not_piece(peer, piece), "Tried to send a piece that we doesn't have.");
198 message_t message = message_piece_new(peer->id, peer->mailbox, piece, block_index, block_length);
199 sg_comm_t comm = sg_mailbox_put_init(mailbox, message, BLOCK_SIZE);
200 sg_comm_detach(comm, NULL);
203 /** Send a "HAVE" message to all peers we are connected to */
204 void send_have_to_all_peers(const_peer_t peer, int piece)
206 XBT_DEBUG("Sending HAVE message to all my peers");
207 connection_t remote_peer;
208 xbt_dict_cursor_t cursor = NULL;
210 xbt_dict_foreach (peer->connected_peers, cursor, key, remote_peer) {
211 message_t message = message_index_new(MESSAGE_HAVE, peer->id, peer->mailbox, piece);
212 sg_comm_t comm = sg_mailbox_put_init(remote_peer->mailbox, message, MESSAGE_HAVE_SIZE);
213 sg_comm_detach(comm, NULL);
217 /** @brief Send request messages to a peer that have unchoked us */
218 void send_request_to_peer(const_peer_t peer, connection_t remote_peer, int piece)
220 remote_peer->current_piece = piece;
221 xbt_assert(connection_has_piece(remote_peer, piece));
222 int block_index = get_first_missing_block_from(peer, piece);
223 if (block_index != -1) {
224 int block_length = MIN(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
225 XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", sg_mailbox_get_name(remote_peer->mailbox), piece,
226 block_index, block_length);
227 message_t message = message_request_new(peer->id, peer->mailbox, piece, block_index, block_length);
228 sg_comm_t comm = sg_mailbox_put_init(remote_peer->mailbox, message, MESSAGE_REQUEST_SIZE);
229 sg_comm_detach(comm, NULL);
233 void get_status(char** status, unsigned int bitfield)
235 for (int i = FILE_PIECES - 1; i >= 0; i--)
236 (*status)[i] = (bitfield & (1U << i)) ? '1' : '0';
237 (*status)[FILE_PIECES] = '\0';
240 int has_finished(unsigned int bitfield)
242 return bitfield == (1U << FILE_PIECES) - 1U;
245 /** Indicates if the remote peer has a piece not stored by the local peer */
246 int is_interested(const_peer_t peer, const_connection_t remote_peer)
248 return remote_peer->bitfield & (peer->bitfield ^ ((1 << FILE_PIECES) - 1));
251 /** Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer */
252 int is_interested_and_free(const_peer_t peer, const_connection_t remote_peer)
254 for (int i = 0; i < FILE_PIECES; i++)
255 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i))
260 /** @brief Updates the list of who has a piece from a bitfield */
261 void update_pieces_count_from_bitfield(const_peer_t peer, unsigned int bitfield)
263 for (int i = 0; i < FILE_PIECES; i++)
264 if (bitfield & (1U << i))
265 peer->pieces_count[i]++;
268 int count_pieces(unsigned int bitfield)
271 unsigned int n = bitfield;
279 int nb_interested_peers(const_peer_t peer)
281 xbt_dict_cursor_t cursor = NULL;
283 connection_t connection;
285 xbt_dict_foreach (peer->connected_peers, cursor, key, connection)
286 if (connection->interested)
292 /** @brief Peer main loop when it is leeching. */
293 void leech(peer_t peer)
295 double next_choked_update = simgrid_get_clock() + UPDATE_CHOKED_INTERVAL;
296 XBT_DEBUG("Start downloading.");
298 /* Send a "handshake" message to all the peers it got (since it couldn't have gotten more than 50 peers) */
299 send_handshake_to_all_peers(peer);
300 XBT_DEBUG("Starting main leech loop");
303 while (simgrid_get_clock() < peer->deadline && count_pieces(peer->bitfield) < FILE_PIECES) {
304 if (peer->comm_received == NULL)
305 peer->comm_received = sg_mailbox_get_async(peer->mailbox, &data);
307 if (sg_comm_test(peer->comm_received)) {
308 peer->message = (message_t)data;
309 handle_message(peer, peer->message);
310 xbt_free(peer->message);
311 peer->comm_received = NULL;
313 // We don't execute the choke algorithm if we don't already have a piece
314 if (simgrid_get_clock() >= next_choked_update && count_pieces(peer->bitfield) > 0) {
315 update_choked_peers(peer);
316 next_choked_update += UPDATE_CHOKED_INTERVAL;
318 sg_actor_sleep_for(SLEEP_DURATION);
322 if (has_finished(peer->bitfield))
323 XBT_DEBUG("%d becomes a seeder", peer->id);
326 /** @brief Peer main loop when it is seeding */
327 void seed(peer_t peer)
329 double next_choked_update = simgrid_get_clock() + UPDATE_CHOKED_INTERVAL;
330 XBT_DEBUG("Start seeding.");
331 // start the main seed loop
333 while (simgrid_get_clock() < peer->deadline) {
334 if (peer->comm_received == NULL)
335 peer->comm_received = sg_mailbox_get_async(peer->mailbox, &data);
337 if (sg_comm_test(peer->comm_received)) {
338 peer->message = (message_t)data;
339 handle_message(peer, peer->message);
340 xbt_free(peer->message);
341 peer->comm_received = NULL;
343 if (simgrid_get_clock() >= next_choked_update) {
344 update_choked_peers(peer);
345 // TODO: Change the choked peer algorithm when seeding.
346 next_choked_update += UPDATE_CHOKED_INTERVAL;
348 sg_actor_sleep_for(SLEEP_DURATION);
354 void update_active_peers_set(const s_peer_t* peer, connection_t remote_peer)
356 if ((remote_peer->interested != 0) && (remote_peer->choked_upload == 0)) {
357 // add in the active peers set
358 xbt_dict_set_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int), remote_peer);
359 } else if (xbt_dict_get_or_null_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int))) {
360 xbt_dict_remove_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int));
364 /** @brief Handle a received message sent by another peer */
365 void handle_message(peer_t peer, message_t message)
367 const char* type_names[10] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED",
368 "HAVE", "BITFIELD", "REQUEST", "PIECE", "CANCEL"};
369 XBT_DEBUG("Received a %s message from %s", type_names[message->type], sg_mailbox_get_name(message->return_mailbox));
371 connection_t remote_peer = xbt_dict_get_or_null_ext(peer->connected_peers, (char*)&message->peer_id, sizeof(int));
372 xbt_assert(remote_peer != NULL || message->type == MESSAGE_HANDSHAKE,
373 "The impossible did happened: A not-in-our-list peer sent us a message.");
375 switch (message->type) {
376 case MESSAGE_HANDSHAKE:
377 // Check if the peer is in our connection list.
378 if (remote_peer == 0) {
379 xbt_dict_set_ext(peer->connected_peers, (char*)&message->peer_id, sizeof(int),
380 connection_new(message->peer_id));
381 send_message(peer, message->return_mailbox, MESSAGE_HANDSHAKE, MESSAGE_HANDSHAKE_SIZE);
383 // Send our bitfield to the peer
384 send_bitfield(peer, message->return_mailbox);
386 case MESSAGE_BITFIELD:
387 // Update the pieces list
388 update_pieces_count_from_bitfield(peer, message->bitfield);
389 // Store the bitfield
390 remote_peer->bitfield = message->bitfield;
391 xbt_assert(!remote_peer->am_interested, "Should not be interested at first");
392 if (is_interested(peer, remote_peer)) {
393 remote_peer->am_interested = 1;
394 send_message(peer, message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
397 case MESSAGE_INTERESTED:
398 xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
399 // Update the interested state of the peer.
400 remote_peer->interested = 1;
401 update_active_peers_set(peer, remote_peer);
403 case MESSAGE_NOTINTERESTED:
404 xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
405 remote_peer->interested = 0;
406 update_active_peers_set(peer, remote_peer);
408 case MESSAGE_UNCHOKE:
409 xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
410 xbt_assert(remote_peer->choked_download);
411 remote_peer->choked_download = 0;
412 // Send requests to the peer, since it has unchoked us
413 if (remote_peer->am_interested)
414 request_new_piece_to_peer(peer, remote_peer);
417 xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
418 xbt_assert(!remote_peer->choked_download);
419 remote_peer->choked_download = 1;
420 if (remote_peer->current_piece != -1)
421 remove_current_piece(peer, remote_peer, remote_peer->current_piece);
424 XBT_DEBUG("\t for piece %d", message->piece);
425 xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong HAVE message received");
426 remote_peer->bitfield = remote_peer->bitfield | (1U << message->piece);
427 peer->pieces_count[message->piece]++;
428 // If the piece is in our pieces, we tell the peer that we are interested.
429 if ((remote_peer->am_interested == 0) && peer_has_not_piece(peer, message->piece)) {
430 remote_peer->am_interested = 1;
431 send_message(peer, message->return_mailbox, MESSAGE_INTERESTED, MESSAGE_INTERESTED_SIZE);
432 if (remote_peer->choked_download == 0)
433 request_new_piece_to_peer(peer, remote_peer);
436 case MESSAGE_REQUEST:
437 xbt_assert(remote_peer->interested);
438 xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong request received");
439 if (remote_peer->choked_upload == 0) {
440 XBT_DEBUG("\t for piece %d (%d,%d)", message->piece, message->block_index,
441 message->block_index + message->block_length);
442 if (!peer_has_not_piece(peer, message->piece)) {
443 send_piece(peer, message->return_mailbox, message->piece, message->block_index, message->block_length);
446 XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
450 XBT_DEBUG(" \t for piece %d (%d,%d)", message->piece, message->block_index,
451 message->block_index + message->block_length);
452 xbt_assert(!remote_peer->choked_download);
453 xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
454 xbt_assert((message->piece >= 0 && message->piece < FILE_PIECES), "Wrong piece received");
455 // TODO: Execute à computation.
456 if (peer_has_not_piece(peer, message->piece)) {
457 update_bitfield_blocks(peer, message->piece, message->block_index, message->block_length);
458 if (piece_complete(peer, message->piece)) {
459 // Removing the piece from our piece list
460 remove_current_piece(peer, remote_peer, message->piece);
461 // Setting the fact that we have the piece
462 peer->bitfield = peer->bitfield | (1U << message->piece);
463 char* status = xbt_malloc0(FILE_PIECES + 1);
464 get_status(&status, peer->bitfield);
465 XBT_DEBUG("My status is now %s", status);
467 // Sending the information to all the peers we are connected to
468 send_have_to_all_peers(peer, message->piece);
469 // sending UNINTERESTED to peers that do not have what we want.
470 update_interested_after_receive(peer);
471 } else { // piece not completed
472 send_request_to_peer(peer, remote_peer, message->piece); // ask for the next block
475 XBT_DEBUG("However, we already have it");
476 request_new_piece_to_peer(peer, remote_peer);
484 // Update the peer speed.
486 connection_add_speed_value(remote_peer, 1.0 / (simgrid_get_clock() - peer->begin_receive_time));
488 peer->begin_receive_time = simgrid_get_clock();
491 /** Selects the appropriate piece to download and requests it to the remote_peer */
492 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
494 int piece = select_piece_to_download(peer, remote_peer);
496 peer->current_pieces |= (1U << (unsigned int)piece);
497 send_request_to_peer(peer, remote_peer, piece);
501 /** remove current_piece from the list of currently downloaded pieces. */
502 void remove_current_piece(peer_t peer, connection_t remote_peer, unsigned int current_piece)
504 peer->current_pieces &= ~(1U << current_piece);
505 remote_peer->current_piece = -1;
508 /** @brief Return the piece to be downloaded
509 * There are two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
510 * If a piece is partially downloaded, this piece will be selected prioritarily
511 * If the peer has strictly less than 4 pieces, he chooses a piece at random.
512 * If the peer has more than pieces, he downloads the pieces that are the less replicated (rarest policy).
513 * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
514 * @param peer: local peer
515 * @param remote_peer: information about the connection
516 * @return the piece to download if possible. -1 otherwise
518 int select_piece_to_download(const_peer_t peer, const_connection_t remote_peer)
520 int piece = partially_downloaded_piece(peer, remote_peer);
521 // strict priority policy
526 if (count_pieces(peer->current_pieces) >= (FILE_PIECES - count_pieces(peer->bitfield)) &&
527 (is_interested(peer, remote_peer) != 0)) {
528 int nb_interesting_pieces = 0;
529 // compute the number of interesting pieces
530 for (int i = 0; i < FILE_PIECES; i++) {
531 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i)) {
532 nb_interesting_pieces++;
535 xbt_assert(nb_interesting_pieces != 0);
536 // get a random interesting piece
537 int random_piece_index = rand() % nb_interesting_pieces;
538 int current_index = 0;
539 for (int i = 0; i < FILE_PIECES; i++) {
540 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i)) {
541 if (random_piece_index == current_index) {
548 xbt_assert(piece != -1);
551 // Random first policy
552 if (count_pieces(peer->bitfield) < 4 && (is_interested_and_free(peer, remote_peer) != 0)) {
553 int nb_interesting_pieces = 0;
554 // compute the number of interesting pieces
555 for (int i = 0; i < FILE_PIECES; i++) {
556 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
557 peer_is_not_downloading_piece(peer, i)) {
558 nb_interesting_pieces++;
561 xbt_assert(nb_interesting_pieces != 0);
562 // get a random interesting piece
563 int random_piece_index = rand() % nb_interesting_pieces;
564 int current_index = 0;
565 for (int i = 0; i < FILE_PIECES; i++) {
566 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
567 peer_is_not_downloading_piece(peer, i)) {
568 if (random_piece_index == current_index) {
575 xbt_assert(piece != -1);
577 } else { // Rarest first policy
578 short min = SHRT_MAX;
579 int nb_min_pieces = 0;
580 int current_index = 0;
581 // compute the smallest number of copies of available pieces
582 for (int i = 0; i < FILE_PIECES; i++) {
583 if (peer->pieces_count[i] < min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
584 peer_is_not_downloading_piece(peer, i))
585 min = peer->pieces_count[i];
587 xbt_assert(min != SHRT_MAX || (is_interested_and_free(peer, remote_peer) == 0));
588 // compute the number of rarest pieces
589 for (int i = 0; i < FILE_PIECES; i++) {
590 if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
591 peer_is_not_downloading_piece(peer, i))
594 xbt_assert(nb_min_pieces != 0 || (is_interested_and_free(peer, remote_peer) == 0));
595 // get a random rarest piece
596 int random_rarest_index = 0;
597 if (nb_min_pieces > 0) {
598 random_rarest_index = rand() % nb_min_pieces;
600 for (int i = 0; i < FILE_PIECES; i++) {
601 if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
602 peer_is_not_downloading_piece(peer, i)) {
603 if (random_rarest_index == current_index) {
610 xbt_assert(piece != -1 || (is_interested_and_free(peer, remote_peer) == 0));
615 /** Update the list of current choked and unchoked peers, using the choke algorithm */
616 void update_choked_peers(peer_t peer)
618 if (nb_interested_peers(peer) == 0)
620 XBT_DEBUG("(%d) update_choked peers %u active peers", peer->id, xbt_dict_size(peer->active_peers));
621 // update the current round
622 peer->round = (peer->round + 1) % 3;
624 char* choked_key = NULL;
625 connection_t chosen_peer = NULL;
626 connection_t choked_peer = NULL;
627 // remove a peer from the list
628 xbt_dict_cursor_t cursor = NULL;
629 xbt_dict_cursor_first(peer->active_peers, &cursor);
630 if (!xbt_dict_is_empty(peer->active_peers)) {
631 choked_key = xbt_dict_cursor_get_key(cursor);
632 choked_peer = xbt_dict_cursor_get_data(cursor);
634 xbt_dict_cursor_free(&cursor);
636 /**If we are currently seeding, we unchoke the peer which has been unchoked the last time.*/
637 if (has_finished(peer->bitfield)) {
638 connection_t connection;
639 double unchoke_time = simgrid_get_clock() + 1;
641 xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
642 if (connection->last_unchoke < unchoke_time && (connection->interested != 0) &&
643 (connection->choked_upload != 0)) {
644 unchoke_time = connection->last_unchoke;
645 chosen_peer = connection;
649 // Random optimistic unchoking
650 if (peer->round == 0) {
653 // We choose a random peer to unchoke.
655 if (xbt_dict_length(peer->connected_peers) > 0) {
656 id_chosen = rand() % xbt_dict_length(peer->connected_peers);
659 connection_t connection;
660 xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
661 if (i == id_chosen) {
662 chosen_peer = connection;
667 xbt_dict_cursor_free(&cursor);
668 xbt_assert(chosen_peer != NULL, "A peer should have been selected at this point");
669 if ((chosen_peer->interested == 0) || (chosen_peer->choked_upload == 0))
672 XBT_DEBUG("Nothing to do, keep going");
674 } while (chosen_peer == NULL && j < MAXIMUM_PEERS);
676 // Use the "fastest download" policy.
677 connection_t connection;
678 double fastest_speed = 0.0;
679 xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
680 if (connection->peer_speed > fastest_speed && (connection->choked_upload != 0) &&
681 (connection->interested != 0)) {
682 chosen_peer = connection;
683 fastest_speed = connection->peer_speed;
689 if (chosen_peer != NULL)
690 XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", peer->id, chosen_peer->id,
691 chosen_peer->interested, chosen_peer->choked_upload);
693 if (choked_peer != chosen_peer) {
694 if (choked_peer != NULL) {
695 xbt_assert((!choked_peer->choked_upload), "Tries to choked a choked peer");
696 choked_peer->choked_upload = 1;
697 xbt_assert((*((int*)choked_key) == choked_peer->id));
698 update_active_peers_set(peer, choked_peer);
699 XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, choked_peer->id);
700 send_message(peer, choked_peer->mailbox, MESSAGE_CHOKE, MESSAGE_CHOKE_SIZE);
702 if (chosen_peer != NULL) {
703 xbt_assert((chosen_peer->choked_upload), "Tries to unchoked an unchoked peer");
704 chosen_peer->choked_upload = 0;
705 xbt_dict_set_ext(peer->active_peers, (char*)&chosen_peer->id, sizeof(int), chosen_peer);
706 chosen_peer->last_unchoke = simgrid_get_clock();
707 XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, chosen_peer->id);
708 update_active_peers_set(peer, chosen_peer);
709 send_message(peer, chosen_peer->mailbox, MESSAGE_UNCHOKE, MESSAGE_UNCHOKE_SIZE);
714 /** Update "interested" state of peers: send "not interested" to peers that don't have any more pieces we want. */
715 void update_interested_after_receive(const_peer_t peer)
718 xbt_dict_cursor_t cursor;
719 connection_t connection;
720 xbt_dict_foreach (peer->connected_peers, cursor, key, connection) {
721 if (connection->am_interested != 0) {
723 // Check if the peer still has a piece we want.
724 for (int i = 0; i < FILE_PIECES; i++) {
725 if (peer_has_not_piece(peer, i) && connection_has_piece(connection, i)) {
730 if (!interested) { // no more piece to download from connection
731 connection->am_interested = 0;
732 send_message(peer, connection->mailbox, MESSAGE_NOTINTERESTED, MESSAGE_NOTINTERESTED_SIZE);
738 void update_bitfield_blocks(peer_t peer, int index, int block_index, int block_length)
740 xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
741 xbt_assert((block_index >= 0 && block_index <= PIECES_BLOCKS), "Wrong block : %d.", block_index);
742 for (int i = block_index; i < (block_index + block_length); i++) {
743 peer->bitfield_blocks |= (1ULL << (unsigned int)(index * PIECES_BLOCKS + i));
747 /** Returns if a peer has completed the download of a piece */
748 int piece_complete(const_peer_t peer, int index)
750 for (int i = 0; i < PIECES_BLOCKS; i++) {
751 if (!(peer->bitfield_blocks & 1ULL << (index * PIECES_BLOCKS + i))) {
758 /** Returns the first block that a peer doesn't have in a piece. If the peer has all blocks of the piece, returns -1. */
759 int get_first_missing_block_from(const_peer_t peer, int piece)
761 for (int i = 0; i < PIECES_BLOCKS; i++) {
762 if (!(peer->bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i))) {
769 /** Returns a piece that is partially downloaded and stored by the remote peer if any -1 otherwise. */
770 int partially_downloaded_piece(const_peer_t peer, const_connection_t remote_peer)
772 for (int i = 0; i < FILE_PIECES; i++) {
773 if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i) &&
774 get_first_missing_block_from(peer, i) > 0)
780 int peer_has_not_piece(const_peer_t peer, unsigned int piece)
782 return !(peer->bitfield & 1U << piece);
785 /** Check that a piece is not currently being download by the peer. */
786 int peer_is_not_downloading_piece(const_peer_t peer, unsigned int piece)
788 return !(peer->current_pieces & 1U << piece);
791 /***************** Connection internal functions ***********************/
792 connection_t connection_new(int id)
794 connection_t connection = xbt_new(s_connection_t, 1);
795 char mailbox_name[MAILBOX_SIZE];
796 snprintf(mailbox_name, MAILBOX_SIZE - 1, "%d", id);
798 connection->mailbox = sg_mailbox_by_name(mailbox_name);
799 connection->bitfield = 0;
800 connection->peer_speed = 0;
801 connection->last_unchoke = 0;
802 connection->current_piece = -1;
803 connection->am_interested = 0;
804 connection->interested = 0;
805 connection->choked_upload = 1;
806 connection->choked_download = 1;
811 void connection_add_speed_value(connection_t connection, double speed)
813 connection->peer_speed = connection->peer_speed * 0.6 + speed * 0.4;
816 int connection_has_piece(const_connection_t connection, unsigned int piece)
818 return (connection->bitfield & 1U << piece);
821 /***************** Messages creation functions ***********************/
822 /** @brief Build a new empty message */
823 message_t message_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox)
825 message_t message = xbt_new(s_message_t, 1);
826 message->peer_id = peer_id;
827 message->return_mailbox = return_mailbox;
828 message->type = type;
832 /** Builds a message containing an index. */
833 message_t message_index_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, int index)
835 message_t message = message_new(type, peer_id, return_mailbox);
836 message->piece = index;
840 message_t message_other_new(e_message_type type, int peer_id, sg_mailbox_t return_mailbox, unsigned int bitfield)
842 message_t message = message_new(type, peer_id, return_mailbox);
843 message->bitfield = bitfield;
847 message_t message_request_new(int peer_id, sg_mailbox_t return_mailbox, int piece, int block_index, int block_length)
849 message_t message = message_index_new(MESSAGE_REQUEST, peer_id, return_mailbox, piece);
850 message->block_index = block_index;
851 message->block_length = block_length;
855 message_t message_piece_new(int peer_id, sg_mailbox_t return_mailbox, int piece, int block_index, int block_length)
857 message_t message = message_index_new(MESSAGE_PIECE, peer_id, return_mailbox, piece);
858 message->block_index = block_index;
859 message->block_length = block_length;