XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
+//TODO: Let users change this
+/*
+ * File transfered data
+ *
+ * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
+ */
+static int FILE_SIZE = 10 * 5 * 16384;
+static int FILE_PIECES = 10;
+static int PIECES_BLOCKS = 5;
+static int BLOCK_SIZE = 16384;
+static int BLOCKS_REQUESTED = 2;
/**
* Peer main function
handle_message(peer, peer->task_received);
}
} else {
+ handle_pending_sends(peer);
if (peer->current_piece != -1) {
send_interested_to_peers(peer);
} else {
msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
if (status == MSG_OK) {
tracker_task_data_t data = MSG_task_get_data(task_received);
- int i, peer_id;
+ unsigned i;
+ int peer_id;
//Add the peers the tracker gave us to our peer list.
xbt_dynar_foreach(data->peers, i, peer_id) {
if (peer_id != peer->id)
peer->comm_received = NULL;
peer->round = 0;
+
+ peer->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
}
/**
xbt_dict_free(&peer->peers);
xbt_dict_free(&peer->active_peers);
xbt_dynar_free(&peer->current_pieces);
+ xbt_dynar_free(&peer->pending_sends);
xbt_free(peer->pieces_count);
xbt_free(peer->bitfield);
xbt_free(peer->bitfield_blocks);
return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0);
}
+/**
+ * Handle pending sends and remove those which are done
+ * @param peer Peer data
+ */
+void handle_pending_sends(peer_t peer)
+{
+ int index;
+
+ while ((index = MSG_comm_testany(peer->pending_sends)) != -1) {
+ msg_comm_t comm_send = xbt_dynar_get_as(peer->pending_sends, index, msg_comm_t);
+ int status = MSG_comm_get_status(comm_send);
+ xbt_dynar_remove_at(peer->pending_sends, index, &comm_send);
+ XBT_DEBUG("Communication %p is finished with status %d, dynar size is now %lu", comm_send, status, xbt_dynar_length(peer->pending_sends));
+
+ msg_task_t task = MSG_comm_get_task(comm_send);
+ MSG_comm_destroy(comm_send);
+
+ if (status != MSG_OK) {
+ task_message_free(task);
+ }
+ }
+}
+
/**
* Handle a received message sent by another peer
* @param peer Peer data
if (piece_complete(peer, message->index)) {
peer->pieces_requested--;
//Removing the piece from our piece list
- int piece_index = -1, i, piece;
+ unsigned i;
+ int piece_index = -1, piece;
xbt_dynar_foreach(peer->current_pieces, i, piece) {
if (piece == message->index) {
piece_index = i;
}
}
break;
+ case MESSAGE_CANCEL:
+ break;
}
//Update the peer speed.
if (remote_peer) {
MSG_comm_destroy(peer->comm_received);
peer->comm_received = NULL;
if (status == MSG_OK) {
- message_t message = MSG_task_get_data(peer->task_received);
+ MSG_task_get_data(peer->task_received);
handle_message(peer, peer->task_received);
if (peer->current_piece != -1) {
finished = 1;
}
xbt_dynar_push_as(peer->current_pieces, int, peer->current_piece);
XBT_DEBUG("New interested piece: %d", peer->current_piece);
- xbt_assert((peer->current_piece >= 0
- && peer->current_piece < FILE_PIECES,
- "Peer want to retrieve a piece that doesn't exist."));
+ xbt_assert((peer->current_piece >= 0 && peer->current_piece < FILE_PIECES),
+ "Peer want to retrieve a piece that doesn't exist.");
}
/**
{
//update the current round
peer->round = (peer->round + 1) % 3;
- int i;
char *key;
connection_t peer_choosed = NULL;
//remove a peer from the list
char *key;
xbt_dict_cursor_t cursor;
connection_t connection;
- int interested, cpt, piece;
+ unsigned cpt;
+ int interested, piece;
xbt_dict_foreach(peer->peers, cursor, key, connection) {
interested = 0;
if (connection->am_interested) {
*/
void send_requests_to_peer(peer_t peer, connection_t remote_peer)
{
- int i, piece, block_index, block_length;
+ unsigned i;
+ int piece, block_index, block_length;
xbt_dynar_foreach(peer->current_pieces, i, piece) {
if (remote_peer->bitfield && remote_peer->bitfield[piece] == '1') {
block_index = get_first_block(peer, piece);
connection->am_interested = 1;
msg_task_t task =
task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
- peer->id);
+ peer->id, task_message_size(MESSAGE_INTERESTED));
MSG_task_dsend(task, connection->mailbox, task_message_free);
XBT_DEBUG("Send INTERESTED to %s", connection->mailbox);
}
{
msg_task_t task =
task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
- peer->id);
+ peer->id, task_message_size(MESSAGE_INTERESTED));
MSG_task_dsend(task, mailbox, task_message_free);
XBT_DEBUG("Sending INTERESTED to %s", mailbox);
{
msg_task_t task =
task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox,
- peer->id);
+ peer->id, task_message_size(MESSAGE_NOTINTERESTED));
MSG_task_dsend(task, mailbox, task_message_free);
XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox);
xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
msg_task_t task =
task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
- peer->id);
+ peer->id, task_message_size(MESSAGE_HANDSHAKE));
MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox);
}
{
msg_task_t task =
task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
- peer->id);
+ peer->id, task_message_size(MESSAGE_HANDSHAKE));
MSG_task_dsend(task, mailbox, task_message_free);
XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox);
}
{
XBT_DEBUG("Sending a CHOKE to %s", mailbox);
msg_task_t task =
- task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, peer->id);
+ task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox,
+ peer->id, task_message_size(MESSAGE_CHOKE));
MSG_task_dsend(task, mailbox, task_message_free);
}
XBT_DEBUG("Sending a UNCHOKE to %s", mailbox);
msg_task_t task =
task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox,
- peer->id);
+ peer->id, task_message_size(MESSAGE_UNCHOKE));
MSG_task_dsend(task, mailbox, task_message_free);
}
xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
msg_task_t task =
task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox,
- peer->id, piece);
+ peer->id, piece, task_message_size(MESSAGE_HAVE));
MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
}
}
XBT_DEBUG("Sending a BITFIELD to %s", mailbox);
msg_task_t task =
task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id,
- peer->bitfield);
- MSG_task_dsend(task, mailbox, task_message_free);
+ peer->bitfield, FILE_PIECES);
+ //Async send and append to pending sends
+ msg_comm_t comm = MSG_task_isend(task, mailbox);
+ xbt_dynar_push(peer->pending_sends, &comm);
}
/**
"Tried to send a piece that we doesn't have.");
msg_task_t task =
task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece,
- stalled, block_index, block_length);
+ stalled, block_index, block_length, BLOCK_SIZE);
MSG_task_dsend(task, mailbox, task_message_free);
}
int in_current_pieces(peer_t peer, int piece)
{
- int is_in = 0, i, peer_piece;
+ unsigned i;
+ int is_in = 0, peer_piece;
xbt_dynar_foreach(peer->current_pieces, i, peer_piece) {
if (peer_piece == piece) {
is_in = 1;