From 37f24fd67e04254218bd7b43dcb7650506776868 Mon Sep 17 00:00:00 2001 From: Maximiliano Geier Date: Wed, 14 Nov 2012 11:35:40 +0100 Subject: [PATCH] BitTorrent: changed the way the PIECE message is handled so that it uses MSG_task_isend insted of MSG_task_dsend, and queues pending comms accordingly --- examples/msg/bittorrent/peer.c | 31 ++++++++++++++++++++++++++++++- examples/msg/bittorrent/peer.h | 3 +++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/examples/msg/bittorrent/peer.c b/examples/msg/bittorrent/peer.c index f770a85b32..026be29d45 100644 --- a/examples/msg/bittorrent/peer.c +++ b/examples/msg/bittorrent/peer.c @@ -99,6 +99,7 @@ void leech_loop(peer_t peer, double deadline) handle_message(peer, peer->task_received); } } else { + handle_pending_sends(peer); if (peer->current_piece != -1) { send_interested_to_peers(peer); } else { @@ -242,6 +243,8 @@ void peer_init(peer_t peer, int id, int seed) peer->comm_received = NULL; peer->round = 0; + + peer->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL); } /** @@ -258,6 +261,7 @@ void peer_free(peer_t peer) xbt_dict_free(&peer->peers); xbt_dict_free(&peer->active_peers); xbt_dynar_free(&peer->current_pieces); + xbt_dynar_free(&peer->pending_sends); xbt_free(peer->pieces_count); xbt_free(peer->bitfield); xbt_free(peer->bitfield_blocks); @@ -274,6 +278,29 @@ int has_finished(char *bitfield) return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0); } +/** + * Handle pending sends and remove those which are done + * @param peer Peer data + */ +void handle_pending_sends(peer_t peer) +{ + int index; + + while ((index = MSG_comm_testany(peer->pending_sends)) != -1) { + msg_comm_t comm_send = xbt_dynar_get_as(peer->pending_sends, index, msg_comm_t); + int status = MSG_comm_get_status(comm_send); + xbt_dynar_remove_at(peer->pending_sends, index, &comm_send); + XBT_DEBUG("Communication %p is finished with status %d, dynar size is now %lu", comm_send, status, xbt_dynar_length(peer->pending_sends)); + + msg_task_t task = MSG_comm_get_task(comm_send); + MSG_comm_destroy(comm_send); + + if (status != MSG_OK) { + task_message_free(task); + } + } +} + /** * Handle a received message sent by another peer * @param peer Peer data @@ -853,7 +880,9 @@ void send_bitfield(peer_t peer, const char *mailbox) msg_task_t task = task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id, peer->bitfield, FILE_PIECES); - MSG_task_dsend(task, mailbox, task_message_free); + //Async send and append to pending sends + msg_comm_t comm = MSG_task_isend(task, mailbox); + xbt_dynar_push(peer->pending_sends, &comm); } /** diff --git a/examples/msg/bittorrent/peer.h b/examples/msg/bittorrent/peer.h index 8fd1997601..b4687745cc 100644 --- a/examples/msg/bittorrent/peer.h +++ b/examples/msg/bittorrent/peer.h @@ -42,6 +42,8 @@ typedef struct s_peer { RngStream stream; //RngStream for double begin_receive_time; //time when the receiving communication has begun, useful for calculating host speed. + + xbt_dynar_t pending_sends; // list of sends being delivered } s_peer_t, *peer_t; /** @@ -58,6 +60,7 @@ void peer_free(peer_t peer); int has_finished(char *bitfield); +void handle_pending_sends(peer_t peer); void handle_message(peer_t peer, msg_task_t task); void wait_for_pieces(peer_t peer, double deadline); -- 2.20.1