XBT_DEBUG("Got %d peers from the tracker", xbt_dict_length(peer.peers));
XBT_DEBUG("Here is my current status: %s", peer.bitfield);
peer.begin_receive_time = MSG_get_clock();
+ MSG_mailbox_set_async(peer.mailbox);
if (has_finished(peer.bitfield)) {
peer.pieces = FILE_PIECES;
send_handshake_all(&peer);
handle_message(peer, peer->task_received);
}
} else {
+ handle_pending_sends(peer);
if (peer->current_piece != -1) {
send_interested_to_peers(peer);
} else {
peer->current_pieces = xbt_dynar_new(sizeof(int), NULL);
peer->current_piece = -1;
- peer->stream = RngStream_CreateStream("");
+ peer->stream = MSG_host_get_data(MSG_host_self());
peer->comm_received = NULL;
peer->round = 0;
+
+ peer->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
}
/**
xbt_dict_free(&peer->peers);
xbt_dict_free(&peer->active_peers);
xbt_dynar_free(&peer->current_pieces);
+ xbt_dynar_free(&peer->pending_sends);
xbt_free(peer->pieces_count);
xbt_free(peer->bitfield);
xbt_free(peer->bitfield_blocks);
-
- RngStream_DeleteStream(&peer->stream);
}
/**
return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0);
}
+/**
+ * Handle pending sends and remove those which are done
+ * @param peer Peer data
+ */
+void handle_pending_sends(peer_t peer)
+{
+ int index;
+
+ while ((index = MSG_comm_testany(peer->pending_sends)) != -1) {
+ msg_comm_t comm_send = xbt_dynar_get_as(peer->pending_sends, index, msg_comm_t);
+ int status = MSG_comm_get_status(comm_send);
+ xbt_dynar_remove_at(peer->pending_sends, index, &comm_send);
+ XBT_DEBUG("Communication %p is finished with status %d, dynar size is now %lu", comm_send, status, xbt_dynar_length(peer->pending_sends));
+
+ msg_task_t task = MSG_comm_get_task(comm_send);
+ MSG_comm_destroy(comm_send);
+
+ if (status != MSG_OK) {
+ task_message_free(task);
+ }
+ }
+}
+
/**
* Handle a received message sent by another peer
* @param peer Peer data
}
}
break;
+ case MESSAGE_CANCEL:
+ break;
}
//Update the peer speed.
if (remote_peer) {
//remove a peer from the list
xbt_dict_cursor_t cursor = NULL;
xbt_dict_cursor_first(peer->active_peers, &cursor);
- if (xbt_dict_length(peer->active_peers) > 0) {
+ if (!xbt_dict_is_empty(peer->active_peers)) {
key = xbt_dict_cursor_get_key(cursor);
connection_t peer_choked = xbt_dict_cursor_get_data(cursor);
if (peer_choked) {
msg_task_t task =
task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id,
peer->bitfield, FILE_PIECES);
- MSG_task_dsend(task, mailbox, task_message_free);
+ //Async send and append to pending sends
+ msg_comm_t comm = MSG_task_isend(task, mailbox);
+ xbt_dynar_push(peer->pending_sends, &comm);
}
/**