* under the terms of the license (GNU LGPL) which comes with this package. */
#include "peer.h"
-#include "tracker.h"
#include "connection.h"
#include "messages.h"
+#include "tracker.h"
+#include <limits.h>
#include <simgrid/msg.h>
#include <xbt/RngStream.h>
-#include <limits.h>
XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
*/
#define FILE_PIECES 10UL
#define PIECES_BLOCKS 5UL
-#define BLOCK_SIZE 16384
+#define BLOCK_SIZE 16384
static const unsigned long int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
/** Number of blocks asked by each request */
#define ENABLE_END_GAME_MODE 1
#define SLEEP_DURATION 1
-int count_pieces(unsigned int bitfield){
- int count=0;
+int count_pieces(unsigned int bitfield)
+{
+ int count = 0;
unsigned int n = bitfield;
- while (n){
+ while (n) {
count += n & 1U;
- n >>= 1U ;
+ n >>= 1U;
}
return count;
}
-int peer_has_not_piece(peer_t peer, unsigned int piece){
- return !(peer->bitfield & 1U<<piece);
+int peer_has_not_piece(peer_t peer, unsigned int piece)
+{
+ return !(peer->bitfield & 1U << piece);
}
/** Check that a piece is not currently being download by the peer. */
-int peer_is_not_downloading_piece(peer_t peer, unsigned int piece){
- return !(peer->current_pieces & 1U<<piece);
+int peer_is_not_downloading_piece(peer_t peer, unsigned int piece)
+{
+ return !(peer->current_pieces & 1U << piece);
}
-void get_status(char **status, unsigned int bitfield){
- for(int i=FILE_PIECES-1; i>=0; i--)
+void get_status(char** status, unsigned int bitfield)
+{
+ for (int i = FILE_PIECES - 1; i >= 0; i--)
(*status)[i] = (bitfield & (1U << i)) ? '1' : '0';
(*status)[FILE_PIECES] = '\0';
}
/** Peer main function */
-int peer(int argc, char *argv[])
+int peer(int argc, char* argv[])
{
- //Check arguments
+ // Check arguments
xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
- //Build peer object
- peer_t peer = peer_init(xbt_str_parse_int(argv[1],"Invalid ID: %s"), argc==4 ? 1:0);
+ // Build peer object
+ peer_t peer = peer_init(xbt_str_parse_int(argv[1], "Invalid ID: %s"), argc == 4 ? 1 : 0);
- //Retrieve deadline
- double deadline = xbt_str_parse_double(argv[2],"Invalid deadline: %s");
+ // Retrieve deadline
+ double deadline = xbt_str_parse_double(argv[2], "Invalid deadline: %s");
xbt_assert(deadline > 0, "Wrong deadline supplied");
- char *status = xbt_malloc0(FILE_PIECES+1);
+ char* status = xbt_malloc0(FILE_PIECES + 1);
get_status(&status, peer->bitfield);
XBT_INFO("Hi, I'm joining the network with id %d", peer->id);
- //Getting peer data from the tracker.
+ // Getting peer data from the tracker.
if (get_peers_data(peer)) {
XBT_DEBUG("Got %d peers from the tracker. Current status is: %s", xbt_dict_length(peer->peers), status);
peer->begin_receive_time = MSG_get_clock();
handle_message(peer, peer->task_received);
}
} else {
- //We don't execute the choke algorithm if we don't already have a piece
+ // We don't execute the choke algorithm if we don't already have a piece
if (MSG_get_clock() >= next_choked_update && count_pieces(peer->bitfield) > 0) {
update_choked_peers(peer);
next_choked_update += UPDATE_CHOKED_INTERVAL;
{
double next_choked_update = MSG_get_clock() + UPDATE_CHOKED_INTERVAL;
XBT_DEBUG("Start seeding.");
- //start the main seed loop
+ // start the main seed loop
while (MSG_get_clock() < deadline) {
if (peer->comm_received == NULL) {
peer->task_received = NULL;
} else {
if (MSG_get_clock() >= next_choked_update) {
update_choked_peers(peer);
- //TODO: Change the choked peer algorithm when seeding.
+ // TODO: Change the choked peer algorithm when seeding.
next_choked_update += UPDATE_CHOKED_INTERVAL;
} else {
MSG_process_sleep(SLEEP_DURATION);
*/
int get_peers_data(peer_t peer)
{
- int success = 0;
+ int success = 0;
double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT;
- //Build the task to send to the tracker
- tracker_task_data_t data = tracker_task_data_new(MSG_host_get_name(MSG_host_self()), peer->mailbox_tracker,
- peer->id, 0, 0, FILE_SIZE);
+ // Build the task to send to the tracker
+ tracker_task_data_t data =
+ tracker_task_data_new(MSG_host_get_name(MSG_host_self()), peer->mailbox_tracker, peer->id, 0, 0, FILE_SIZE);
msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data);
while ((success == 0) && MSG_get_clock() < timeout) {
XBT_DEBUG("Sending a peer request to the tracker.");
}
}
- success = 0;
+ success = 0;
msg_task_t task_received = NULL;
while ((success == 0) && MSG_get_clock() < timeout) {
msg_comm_t comm_received = MSG_task_irecv(&task_received, peer->mailbox_tracker);
- msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
+ msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
if (status == MSG_OK) {
tracker_task_data_t data = MSG_task_get_data(task_received);
unsigned i;
int peer_id;
- //Add the peers the tracker gave us to our peer list.
- xbt_dynar_foreach(data->peers, i, peer_id) {
+ // Add the peers the tracker gave us to our peer list.
+ xbt_dynar_foreach (data->peers, i, peer_id) {
if (peer_id != peer->id)
- xbt_dict_set_ext(peer->peers, (char *) &peer_id, sizeof(int), connection_new(peer_id), NULL);
+ xbt_dict_set_ext(peer->peers, (char*)&peer_id, sizeof(int), connection_new(peer_id), NULL);
}
success = 1;
- //free the communication and the task
+ // free the communication and the task
MSG_comm_destroy(comm_received);
tracker_task_data_free(data);
MSG_task_destroy(task_received);
*/
peer_t peer_init(int id, int seed)
{
- peer_t peer = xbt_new(s_peer_t,1);
- peer->id = id;
+ peer_t peer = xbt_new(s_peer_t, 1);
+ peer->id = id;
peer->hostname = MSG_host_get_name(MSG_host_self());
- snprintf(peer->mailbox,MAILBOX_SIZE-1, "%d", id);
- snprintf(peer->mailbox_tracker,MAILBOX_SIZE-1, "tracker_%d", id);
+ snprintf(peer->mailbox, MAILBOX_SIZE - 1, "%d", id);
+ snprintf(peer->mailbox_tracker, MAILBOX_SIZE - 1, "tracker_%d", id);
peer->peers = xbt_dict_new_homogeneous(NULL);
peer->active_peers = xbt_dict_new_homogeneous(NULL);
if (seed) {
- peer->bitfield = (1U<<FILE_PIECES)-1U;
- peer->bitfield_blocks = (1ULL<<(FILE_PIECES * PIECES_BLOCKS))-1ULL ;
+ peer->bitfield = (1U << FILE_PIECES) - 1U;
+ peer->bitfield_blocks = (1ULL << (FILE_PIECES * PIECES_BLOCKS)) - 1ULL;
} else {
- peer->bitfield = 0;
+ peer->bitfield = 0;
peer->bitfield_blocks = 0;
}
/** Destroys a poor peer object. */
void peer_free(peer_t peer)
{
- char *key;
+ char* key;
connection_t connection;
xbt_dict_cursor_t cursor;
- xbt_dict_foreach(peer->peers, cursor, key, connection) {
+ xbt_dict_foreach (peer->peers, cursor, key, connection) {
connection_free(connection);
}
xbt_dict_free(&peer->peers);
*/
int has_finished(unsigned int bitfield)
{
- return bitfield == (1U<<FILE_PIECES)-1U;
+ return bitfield == (1U << FILE_PIECES) - 1U;
}
int nb_interested_peers(peer_t peer)
{
xbt_dict_cursor_t cursor = NULL;
- char *key;
+ char* key;
connection_t connection;
int nb = 0;
- xbt_dict_foreach(peer->peers, cursor, key, connection) {
+ xbt_dict_foreach (peer->peers, cursor, key, connection) {
if (connection->interested)
nb++;
}
void update_active_peers_set(peer_t peer, connection_t remote_peer)
{
if ((remote_peer->interested != 0) && (remote_peer->choked_upload == 0)) {
- //add in the active peers set
- xbt_dict_set_ext(peer->active_peers, (char *) &remote_peer->id, sizeof(int), remote_peer, NULL);
- } else if (xbt_dict_get_or_null_ext(peer->active_peers, (char *) &remote_peer->id, sizeof(int))) {
- xbt_dict_remove_ext(peer->active_peers, (char *) &remote_peer->id, sizeof(int));
+ // add in the active peers set
+ xbt_dict_set_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int), remote_peer, NULL);
+ } else if (xbt_dict_get_or_null_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int))) {
+ xbt_dict_remove_ext(peer->active_peers, (char*)&remote_peer->id, sizeof(int));
}
}
*/
void handle_message(peer_t peer, msg_task_t task)
{
- const char* type_names[10] =
- {"HANDSHAKE","CHOKE","UNCHOKE","INTERESTED","NOTINTERESTED","HAVE","BITFIELD","REQUEST", "PIECE", "CANCEL" };
+ const char* type_names[10] = {"HANDSHAKE", "CHOKE", "UNCHOKE", "INTERESTED", "NOTINTERESTED",
+ "HAVE", "BITFIELD", "REQUEST", "PIECE", "CANCEL"};
message_t message = MSG_task_get_data(task);
XBT_DEBUG("Received a %s message from %s (%s)", type_names[message->type], message->mailbox,
- message->issuer_host_name);
+ message->issuer_host_name);
connection_t remote_peer;
- remote_peer = xbt_dict_get_or_null_ext(peer->peers, (char *) &message->peer_id, sizeof(int));
+ remote_peer = xbt_dict_get_or_null_ext(peer->peers, (char*)&message->peer_id, sizeof(int));
switch (message->type) {
- case MESSAGE_HANDSHAKE:
- //Check if the peer is in our connection list.
- if (remote_peer == 0) {
- xbt_dict_set_ext(peer->peers, (char *) &message->peer_id, sizeof(int), connection_new(message->peer_id), NULL);
- send_handshake(peer, message->mailbox);
- }
- //Send our bitfield to the peer
- send_bitfield(peer, message->mailbox);
- break;
- case MESSAGE_BITFIELD:
- //Update the pieces list
- update_pieces_count_from_bitfield(peer, message->bitfield);
- //Store the bitfield
- remote_peer->bitfield = message->bitfield;
- xbt_assert(!remote_peer->am_interested, "Should not be interested at first");
- if (is_interested(peer, remote_peer)) {
- remote_peer->am_interested = 1;
- send_interested(peer, message->mailbox);
- }
- break;
- case MESSAGE_INTERESTED:
- xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
- //Update the interested state of the peer.
- remote_peer->interested = 1;
- update_active_peers_set(peer, remote_peer);
- break;
- case MESSAGE_NOTINTERESTED:
- xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
- remote_peer->interested = 0;
- update_active_peers_set(peer, remote_peer);
- break;
- case MESSAGE_UNCHOKE:
- xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
- xbt_assert(remote_peer->choked_download);
- remote_peer->choked_download = 0;
- //Send requests to the peer, since it has unchoked us
- if (remote_peer->am_interested)
- request_new_piece_to_peer(peer, remote_peer);
- break;
- case MESSAGE_CHOKE:
- xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
- xbt_assert(!remote_peer->choked_download);
- remote_peer->choked_download = 1;
- if (remote_peer->current_piece != -1)
- remove_current_piece(peer, remote_peer, remote_peer->current_piece);
- break;
- case MESSAGE_HAVE:
- XBT_DEBUG("\t for piece %d", message->index);
- xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong HAVE message received");
- remote_peer->bitfield = remote_peer->bitfield | (1U<<message->index);
- peer->pieces_count[message->index]++;
- //If the piece is in our pieces, we tell the peer that we are interested.
- if ((remote_peer->am_interested == 0) && peer_has_not_piece(peer,message->index)) {
- remote_peer->am_interested = 1;
- send_interested(peer, message->mailbox);
- if (remote_peer->choked_download == 0)
+ case MESSAGE_HANDSHAKE:
+ // Check if the peer is in our connection list.
+ if (remote_peer == 0) {
+ xbt_dict_set_ext(peer->peers, (char*)&message->peer_id, sizeof(int), connection_new(message->peer_id), NULL);
+ send_handshake(peer, message->mailbox);
+ }
+ // Send our bitfield to the peer
+ send_bitfield(peer, message->mailbox);
+ break;
+ case MESSAGE_BITFIELD:
+ // Update the pieces list
+ update_pieces_count_from_bitfield(peer, message->bitfield);
+ // Store the bitfield
+ remote_peer->bitfield = message->bitfield;
+ xbt_assert(!remote_peer->am_interested, "Should not be interested at first");
+ if (is_interested(peer, remote_peer)) {
+ remote_peer->am_interested = 1;
+ send_interested(peer, message->mailbox);
+ }
+ break;
+ case MESSAGE_INTERESTED:
+ xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
+ // Update the interested state of the peer.
+ remote_peer->interested = 1;
+ update_active_peers_set(peer, remote_peer);
+ break;
+ case MESSAGE_NOTINTERESTED:
+ xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
+ remote_peer->interested = 0;
+ update_active_peers_set(peer, remote_peer);
+ break;
+ case MESSAGE_UNCHOKE:
+ xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
+ xbt_assert(remote_peer->choked_download);
+ remote_peer->choked_download = 0;
+ // Send requests to the peer, since it has unchoked us
+ if (remote_peer->am_interested)
request_new_piece_to_peer(peer, remote_peer);
- }
- break;
- case MESSAGE_REQUEST:
- xbt_assert(remote_peer->interested);
- xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong request received");
- if (remote_peer->choked_upload == 0) {
- XBT_DEBUG("\t for piece %d (%d,%d)", message->index, message->block_index,
- message->block_index + message->block_length);
- if (!peer_has_not_piece(peer, message->index)) {
- send_piece(peer, message->mailbox, message->index, message->block_index, message->block_length);
+ break;
+ case MESSAGE_CHOKE:
+ xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
+ xbt_assert(!remote_peer->choked_download);
+ remote_peer->choked_download = 1;
+ if (remote_peer->current_piece != -1)
+ remove_current_piece(peer, remote_peer, remote_peer->current_piece);
+ break;
+ case MESSAGE_HAVE:
+ XBT_DEBUG("\t for piece %d", message->index);
+ xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong HAVE message received");
+ remote_peer->bitfield = remote_peer->bitfield | (1U << message->index);
+ peer->pieces_count[message->index]++;
+ // If the piece is in our pieces, we tell the peer that we are interested.
+ if ((remote_peer->am_interested == 0) && peer_has_not_piece(peer, message->index)) {
+ remote_peer->am_interested = 1;
+ send_interested(peer, message->mailbox);
+ if (remote_peer->choked_download == 0)
+ request_new_piece_to_peer(peer, remote_peer);
}
- } else {
- XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
- }
- break;
- case MESSAGE_PIECE:
- XBT_DEBUG(" \t for piece %d (%d,%d)", message->index, message->block_index,
- message->block_index + message->block_length);
- xbt_assert(!remote_peer->choked_download);
- xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE,
- "Can't received a piece if I'm not interested wihtout end-game mode!"
- "piece (%d) bitfield(%u) remote bitfield(%u)", message->index, peer->bitfield, remote_peer->bitfield);
- xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
- xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong piece received");
- //TODO: Execute à computation.
- if (peer_has_not_piece(peer,message->index)) {
- update_bitfield_blocks(peer, message->index, message->block_index, message->block_length);
- if (piece_complete(peer, message->index)) {
- //Removing the piece from our piece list
- remove_current_piece(peer, remote_peer, message->index);
- //Setting the fact that we have the piece
- peer->bitfield = peer->bitfield | (1U<<message->index);
- char* status = xbt_malloc0(FILE_PIECES+1);
- get_status(&status, peer->bitfield);
- XBT_DEBUG("My status is now %s", status);
- xbt_free(status);
- //Sending the information to all the peers we are connected to
- send_have(peer, message->index);
- //sending UNINTERESTED to peers that do not have what we want.
- update_interested_after_receive(peer);
- } else { // piece not completed
- send_request_to_peer(peer, remote_peer, message->index); // ask for the next block
+ break;
+ case MESSAGE_REQUEST:
+ xbt_assert(remote_peer->interested);
+ xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong request received");
+ if (remote_peer->choked_upload == 0) {
+ XBT_DEBUG("\t for piece %d (%d,%d)", message->index, message->block_index,
+ message->block_index + message->block_length);
+ if (!peer_has_not_piece(peer, message->index)) {
+ send_piece(peer, message->mailbox, message->index, message->block_index, message->block_length);
+ }
+ } else {
+ XBT_DEBUG("\t for piece %d but he is choked.", message->peer_id);
}
- } else {
- XBT_DEBUG("However, we already have it");
- xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
- request_new_piece_to_peer(peer, remote_peer);
- }
- break;
- case MESSAGE_CANCEL:
- break;
- default:
- THROW_IMPOSSIBLE;
+ break;
+ case MESSAGE_PIECE:
+ XBT_DEBUG(" \t for piece %d (%d,%d)", message->index, message->block_index,
+ message->block_index + message->block_length);
+ xbt_assert(!remote_peer->choked_download);
+ xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE,
+ "Can't received a piece if I'm not interested wihtout end-game mode!"
+ "piece (%d) bitfield(%u) remote bitfield(%u)",
+ message->index, peer->bitfield, remote_peer->bitfield);
+ xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
+ xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong piece received");
+ // TODO: Execute à computation.
+ if (peer_has_not_piece(peer, message->index)) {
+ update_bitfield_blocks(peer, message->index, message->block_index, message->block_length);
+ if (piece_complete(peer, message->index)) {
+ // Removing the piece from our piece list
+ remove_current_piece(peer, remote_peer, message->index);
+ // Setting the fact that we have the piece
+ peer->bitfield = peer->bitfield | (1U << message->index);
+ char* status = xbt_malloc0(FILE_PIECES + 1);
+ get_status(&status, peer->bitfield);
+ XBT_DEBUG("My status is now %s", status);
+ xbt_free(status);
+ // Sending the information to all the peers we are connected to
+ send_have(peer, message->index);
+ // sending UNINTERESTED to peers that do not have what we want.
+ update_interested_after_receive(peer);
+ } else { // piece not completed
+ send_request_to_peer(peer, remote_peer, message->index); // ask for the next block
+ }
+ } else {
+ XBT_DEBUG("However, we already have it");
+ xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
+ request_new_piece_to_peer(peer, remote_peer);
+ }
+ break;
+ case MESSAGE_CANCEL:
+ break;
+ default:
+ THROW_IMPOSSIBLE;
}
- //Update the peer speed.
+ // Update the peer speed.
if (remote_peer) {
connection_add_speed_value(remote_peer, 1.0 / (MSG_get_clock() - peer->begin_receive_time));
}
{
int piece = select_piece_to_download(peer, remote_peer);
if (piece != -1) {
- peer->current_pieces|= (1U << (unsigned int) piece);
+ peer->current_pieces |= (1U << (unsigned int)piece);
send_request_to_peer(peer, remote_peer, piece);
}
}
if (count_pieces(peer->current_pieces) >= (FILE_PIECES - count_pieces(peer->bitfield)) &&
(is_interested(peer, remote_peer) != 0)) {
#if ENABLE_END_GAME_MODE == 0
- return -1;
+ return -1;
#endif
int nb_interesting_pieces = 0;
// compute the number of interesting pieces
for (int i = 0; i < FILE_PIECES; i++) {
- if (peer_has_not_piece(peer,i) && connection_has_piece(remote_peer,i)) {
+ if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i)) {
nb_interesting_pieces++;
}
}
xbt_assert(nb_interesting_pieces != 0);
// get a random interesting piece
int random_piece_index = RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
- int current_index = 0;
+ int current_index = 0;
for (int i = 0; i < FILE_PIECES; i++) {
- if (peer_has_not_piece(peer,i) && connection_has_piece(remote_peer,i)) {
+ if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i)) {
if (random_piece_index == current_index) {
piece = i;
break;
int nb_interesting_pieces = 0;
// compute the number of interesting pieces
for (int i = 0; i < FILE_PIECES; i++) {
- if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer,i) && peer_is_not_downloading_piece(peer, i)) {
+ if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
+ peer_is_not_downloading_piece(peer, i)) {
nb_interesting_pieces++;
}
}
xbt_assert(nb_interesting_pieces != 0);
// get a random interesting piece
int random_piece_index = RngStream_RandInt(peer->stream, 0, nb_interesting_pieces - 1);
- int current_index = 0;
+ int current_index = 0;
for (int i = 0; i < FILE_PIECES; i++) {
- if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer,i) && peer_is_not_downloading_piece(peer, i)) {
+ if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
+ peer_is_not_downloading_piece(peer, i)) {
if (random_piece_index == current_index) {
piece = i;
break;
}
xbt_assert(piece != -1);
return piece;
- } else { // Rarest first policy
- short min = SHRT_MAX;
+ } else { // Rarest first policy
+ short min = SHRT_MAX;
int nb_min_pieces = 0;
int current_index = 0;
// compute the smallest number of copies of available pieces
for (int i = 0; i < FILE_PIECES; i++) {
- if (peer->pieces_count[i] < min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer,i) &&
+ if (peer->pieces_count[i] < min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
peer_is_not_downloading_piece(peer, i))
min = peer->pieces_count[i];
}
- xbt_assert(min != SHRT_MAX || (is_interested_and_free(peer, remote_peer) ==0));
+ xbt_assert(min != SHRT_MAX || (is_interested_and_free(peer, remote_peer) == 0));
// compute the number of rarest pieces
for (int i = 0; i < FILE_PIECES; i++) {
- if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer,i) &&
+ if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
peer_is_not_downloading_piece(peer, i))
nb_min_pieces++;
}
- xbt_assert(nb_min_pieces != 0 || (is_interested_and_free(peer, remote_peer)==0));
+ xbt_assert(nb_min_pieces != 0 || (is_interested_and_free(peer, remote_peer) == 0));
// get a random rarest piece
int random_rarest_index = RngStream_RandInt(peer->stream, 0, nb_min_pieces - 1);
for (int i = 0; i < FILE_PIECES; i++) {
- if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer,i) &&
+ if (peer->pieces_count[i] == min && peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) &&
peer_is_not_downloading_piece(peer, i)) {
if (random_rarest_index == current_index) {
piece = i;
if (nb_interested_peers(peer) == 0)
return;
XBT_DEBUG("(%d) update_choked peers %u active peers", peer->id, xbt_dict_size(peer->active_peers));
- //update the current round
+ // update the current round
peer->round = (peer->round + 1) % 3;
- char *key;
- char *key_choked=NULL;
+ char* key;
+ char* key_choked = NULL;
connection_t peer_choosed = NULL;
- connection_t peer_choked = NULL;
- //remove a peer from the list
+ connection_t peer_choked = NULL;
+ // remove a peer from the list
xbt_dict_cursor_t cursor = NULL;
xbt_dict_cursor_first(peer->active_peers, &cursor);
if (xbt_dict_length(peer->active_peers) > 0) {
- key_choked = xbt_dict_cursor_get_key(cursor);
+ key_choked = xbt_dict_cursor_get_key(cursor);
peer_choked = xbt_dict_cursor_get_data(cursor);
}
xbt_dict_cursor_free(&cursor);
connection_t connection;
double unchoke_time = MSG_get_clock() + 1;
- xbt_dict_foreach(peer->peers, cursor, key, connection) {
- if (connection->last_unchoke < unchoke_time &&
- (connection->interested != 0) && (connection->choked_upload != 0)) {
+ xbt_dict_foreach (peer->peers, cursor, key, connection) {
+ if (connection->last_unchoke < unchoke_time && (connection->interested != 0) &&
+ (connection->choked_upload != 0)) {
unchoke_time = connection->last_unchoke;
peer_choosed = connection;
}
}
} else {
- //Random optimistic unchoking
+ // Random optimistic unchoking
if (peer->round == 0) {
int j = 0;
do {
- //We choose a random peer to unchoke.
+ // We choose a random peer to unchoke.
int id_chosen = RngStream_RandInt(peer->stream, 0, xbt_dict_length(peer->peers) - 1);
- int i = 0;
+ int i = 0;
connection_t connection;
- xbt_dict_foreach(peer->peers, cursor, key, connection) {
+ xbt_dict_foreach (peer->peers, cursor, key, connection) {
if (i == id_chosen) {
peer_choosed = connection;
break;
j++;
} while (peer_choosed == NULL && j < MAXIMUM_PEERS);
} else {
- //Use the "fastest download" policy.
+ // Use the "fastest download" policy.
connection_t connection;
double fastest_speed = 0.0;
- xbt_dict_foreach(peer->peers, cursor, key, connection) {
- if (connection->peer_speed > fastest_speed &&
- (connection->choked_upload != 0) && (connection->interested != 0)) {
- peer_choosed = connection;
+ xbt_dict_foreach (peer->peers, cursor, key, connection) {
+ if (connection->peer_speed > fastest_speed && (connection->choked_upload != 0) &&
+ (connection->interested != 0)) {
+ peer_choosed = connection;
fastest_speed = connection->peer_speed;
}
}
}
if (peer_choosed != NULL)
- XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ",
- peer->id, peer_choosed->id, peer_choosed->interested, peer_choosed->choked_upload);
+ XBT_DEBUG("(%d) update_choked peers unchoked (%d) ; int (%d) ; choked (%d) ", peer->id, peer_choosed->id,
+ peer_choosed->interested, peer_choosed->choked_upload);
if (peer_choked != peer_choosed) {
if (peer_choked != NULL) {
xbt_assert((!peer_choked->choked_upload), "Tries to choked a choked peer");
peer_choked->choked_upload = 1;
- xbt_assert((*((int *) key_choked) == peer_choked->id));
+ xbt_assert((*((int*)key_choked) == peer_choked->id));
update_active_peers_set(peer, peer_choked);
XBT_DEBUG("(%d) Sending a CHOKE to %d", peer->id, peer_choked->id);
send_choked(peer, peer_choked->mailbox);
if (peer_choosed != NULL) {
xbt_assert((peer_choosed->choked_upload), "Tries to unchoked an unchoked peer");
peer_choosed->choked_upload = 0;
- xbt_dict_set_ext(peer->active_peers, (char *) &peer_choosed->id, sizeof(int), peer_choosed, NULL);
+ xbt_dict_set_ext(peer->active_peers, (char*)&peer_choosed->id, sizeof(int), peer_choosed, NULL);
peer_choosed->last_unchoke = MSG_get_clock();
XBT_DEBUG("(%d) Sending a UNCHOKE to %d", peer->id, peer_choosed->id);
update_active_peers_set(peer, peer_choosed);
*/
void update_interested_after_receive(peer_t peer)
{
- char *key;
+ char* key;
xbt_dict_cursor_t cursor;
connection_t connection;
- xbt_dict_foreach(peer->peers, cursor, key, connection) {
+ xbt_dict_foreach (peer->peers, cursor, key, connection) {
if (connection->am_interested != 0) {
int interested = 0;
- //Check if the peer still has a piece we want.
+ // Check if the peer still has a piece we want.
for (int i = 0; i < FILE_PIECES; i++) {
- if (peer_has_not_piece(peer, i) && connection_has_piece(connection,i)) {
+ if (peer_has_not_piece(peer, i) && connection_has_piece(connection, i)) {
interested = 1;
break;
}
}
- if (!interested) { //no more piece to download from connection
+ if (!interested) { // no more piece to download from connection
connection->am_interested = 0;
send_notinterested(peer, connection->mailbox);
}
xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece.");
xbt_assert((block_index >= 0 && block_index <= PIECES_BLOCKS), "Wrong block : %d.", block_index);
for (int i = block_index; i < (block_index + block_length); i++) {
- peer->bitfield_blocks |= (1ULL<<(unsigned int)(index * PIECES_BLOCKS + i));
+ peer->bitfield_blocks |= (1ULL << (unsigned int)(index * PIECES_BLOCKS + i));
}
}
int piece_complete(peer_t peer, int index)
{
for (int i = 0; i < PIECES_BLOCKS; i++) {
- if (!(peer->bitfield_blocks & 1ULL<<(index * PIECES_BLOCKS + i))) {
+ if (!(peer->bitfield_blocks & 1ULL << (index * PIECES_BLOCKS + i))) {
return 0;
}
}
int get_first_block(peer_t peer, int piece)
{
for (int i = 0; i < PIECES_BLOCKS; i++) {
- if (!(peer->bitfield_blocks & 1ULL<<(piece * PIECES_BLOCKS + i))) {
+ if (!(peer->bitfield_blocks & 1ULL << (piece * PIECES_BLOCKS + i))) {
return i;
}
}
/** Indicates if the remote peer has a piece not stored by the local peer */
int is_interested(peer_t peer, connection_t remote_peer)
{
- return remote_peer->bitfield & (peer->bitfield^((1<<FILE_PIECES)-1));
+ return remote_peer->bitfield & (peer->bitfield ^ ((1 << FILE_PIECES) - 1));
}
/** Indicates if the remote peer has a piece not stored by the local peer nor requested by the local peer */
int is_interested_and_free(peer_t peer, connection_t remote_peer)
{
for (int i = 0; i < FILE_PIECES; i++) {
- if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer,i) && peer_is_not_downloading_piece(peer, i)) {
+ if (peer_has_not_piece(peer, i) && connection_has_piece(remote_peer, i) && peer_is_not_downloading_piece(peer, i)) {
return 1;
}
}
void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece)
{
remote_peer->current_piece = piece;
- xbt_assert(connection_has_piece(remote_peer,piece));
+ xbt_assert(connection_has_piece(remote_peer, piece));
int block_index = get_first_block(peer, piece);
if (block_index != -1) {
int block_length = MIN(BLOCKS_REQUESTED, PIECES_BLOCKS - block_index);
* @param peer peer data
* @param mailbox destination mailbox
*/
-void send_interested(peer_t peer, const char *mailbox)
+void send_interested(peer_t peer, const char* mailbox)
{
msg_task_t task = task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox, peer->id,
task_message_size(MESSAGE_INTERESTED));
* @param peer peer data
* @param mailbox destination mailbox
*/
-void send_notinterested(peer_t peer, const char *mailbox)
+void send_notinterested(peer_t peer, const char* mailbox)
{
msg_task_t task = task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox, peer->id,
task_message_size(MESSAGE_NOTINTERESTED));
{
connection_t remote_peer;
xbt_dict_cursor_t cursor = NULL;
- char *key;
- xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
+ char* key;
+ xbt_dict_foreach (peer->peers, cursor, key, remote_peer) {
msg_task_t task = task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox, peer->id,
task_message_size(MESSAGE_HANDSHAKE));
MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
* @param peer peer data
* @param mailbox mailbox where to we send the message
*/
-void send_handshake(peer_t peer, const char *mailbox)
+void send_handshake(peer_t peer, const char* mailbox)
{
XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox);
msg_task_t task = task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox, peer->id,
}
/** Send a "choked" message to a peer. */
-void send_choked(peer_t peer, const char *mailbox)
+void send_choked(peer_t peer, const char* mailbox)
{
XBT_DEBUG("Sending a CHOKE to %s", mailbox);
- msg_task_t task = task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, peer->id,
- task_message_size(MESSAGE_CHOKE));
+ msg_task_t task =
+ task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, peer->id, task_message_size(MESSAGE_CHOKE));
MSG_task_dsend(task, mailbox, task_message_free);
}
/** Send a "unchoked" message to a peer */
-void send_unchoked(peer_t peer, const char *mailbox)
+void send_unchoked(peer_t peer, const char* mailbox)
{
XBT_DEBUG("Sending a UNCHOKE to %s", mailbox);
- msg_task_t task = task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox, peer->id,
- task_message_size(MESSAGE_UNCHOKE));
+ msg_task_t task =
+ task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox, peer->id, task_message_size(MESSAGE_UNCHOKE));
MSG_task_dsend(task, mailbox, task_message_free);
}
XBT_DEBUG("Sending HAVE message to all my peers");
connection_t remote_peer;
xbt_dict_cursor_t cursor = NULL;
- char *key;
- xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
+ char* key;
+ xbt_dict_foreach (peer->peers, cursor, key, remote_peer) {
msg_task_t task = task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox, peer->id, piece,
task_message_size(MESSAGE_HAVE));
MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
/** @brief Send a bitfield message to all the peers the peer has.
* @param peer peer data
*/
-void send_bitfield(peer_t peer, const char *mailbox)
+void send_bitfield(peer_t peer, const char* mailbox)
{
XBT_DEBUG("Sending a BITFIELD to %s", mailbox);
msg_task_t task = task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id, peer->bitfield, FILE_PIECES);
}
/** Send a "request" message to a pair, containing a request for a piece */
-void send_request(peer_t peer, const char *mailbox, int piece, int block_index, int block_length)
+void send_request(peer_t peer, const char* mailbox, int piece, int block_index, int block_length)
{
XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", mailbox, piece, block_index, block_length);
msg_task_t task = task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece, block_index, block_length);
}
/** Send a "piece" message to a pair, containing a piece of the file */
-void send_piece(peer_t peer, const char *mailbox, int piece, int block_index, int block_length)
+void send_piece(peer_t peer, const char* mailbox, int piece, int block_index, int block_length)
{
XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index, block_length, mailbox);
xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist.");
- xbt_assert(!peer_has_not_piece(peer,piece), "Tried to send a piece that we doesn't have.");
- msg_task_t task = task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece, block_index, block_length,
- BLOCK_SIZE);
+ xbt_assert(!peer_has_not_piece(peer, piece), "Tried to send a piece that we doesn't have.");
+ msg_task_t task =
+ task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece, block_index, block_length, BLOCK_SIZE);
MSG_task_dsend(task, mailbox, task_message_free);
}