X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/13f9ae4ca658a16c0b9d76fb918032f1c8b9841a..d155fd69fa99c97b3a9c86bb7f2e472c2e7332df:/examples/msg/bittorrent/peer.c diff --git a/examples/msg/bittorrent/peer.c b/examples/msg/bittorrent/peer.c index baa8bc1672..632b19e0a0 100644 --- a/examples/msg/bittorrent/peer.c +++ b/examples/msg/bittorrent/peer.c @@ -48,6 +48,7 @@ int peer(int argc, char *argv[]) XBT_DEBUG("Got %d peers from the tracker", xbt_dict_length(peer.peers)); XBT_DEBUG("Here is my current status: %s", peer.bitfield); peer.begin_receive_time = MSG_get_clock(); + MSG_mailbox_set_async(peer.mailbox); if (has_finished(peer.bitfield)) { peer.pieces = FILE_PIECES; send_handshake_all(&peer); @@ -99,6 +100,7 @@ void leech_loop(peer_t peer, double deadline) handle_message(peer, peer->task_received); } } else { + handle_pending_sends(peer); if (peer->current_piece != -1) { send_interested_to_peers(peer); } else { @@ -238,10 +240,12 @@ void peer_init(peer_t peer, int id, int seed) peer->current_pieces = xbt_dynar_new(sizeof(int), NULL); peer->current_piece = -1; - peer->stream = RngStream_CreateStream(""); + peer->stream = MSG_host_get_data(MSG_host_self()); peer->comm_received = NULL; peer->round = 0; + + peer->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL); } /** @@ -258,11 +262,10 @@ void peer_free(peer_t peer) xbt_dict_free(&peer->peers); xbt_dict_free(&peer->active_peers); xbt_dynar_free(&peer->current_pieces); + xbt_dynar_free(&peer->pending_sends); xbt_free(peer->pieces_count); xbt_free(peer->bitfield); xbt_free(peer->bitfield_blocks); - - RngStream_DeleteStream(&peer->stream); } /** @@ -274,6 +277,29 @@ int has_finished(char *bitfield) return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0); } +/** + * Handle pending sends and remove those which are done + * @param peer Peer data + */ +void handle_pending_sends(peer_t peer) +{ + int index; + + while ((index = MSG_comm_testany(peer->pending_sends)) != -1) { + msg_comm_t comm_send = xbt_dynar_get_as(peer->pending_sends, index, msg_comm_t); + int status = MSG_comm_get_status(comm_send); + xbt_dynar_remove_at(peer->pending_sends, index, &comm_send); + XBT_DEBUG("Communication %p is finished with status %d, dynar size is now %lu", comm_send, status, xbt_dynar_length(peer->pending_sends)); + + msg_task_t task = MSG_comm_get_task(comm_send); + MSG_comm_destroy(comm_send); + + if (status != MSG_OK) { + task_message_free(task); + } + } +} + /** * Handle a received message sent by another peer * @param peer Peer data @@ -428,6 +454,8 @@ void handle_message(peer_t peer, msg_task_t task) } } break; + case MESSAGE_CANCEL: + break; } //Update the peer speed. if (remote_peer) { @@ -544,7 +572,7 @@ void update_choked_peers(peer_t peer) //remove a peer from the list xbt_dict_cursor_t cursor = NULL; xbt_dict_cursor_first(peer->active_peers, &cursor); - if (xbt_dict_length(peer->active_peers) > 0) { + if (!xbt_dict_is_empty(peer->active_peers)) { key = xbt_dict_cursor_get_key(cursor); connection_t peer_choked = xbt_dict_cursor_get_data(cursor); if (peer_choked) { @@ -851,7 +879,9 @@ void send_bitfield(peer_t peer, const char *mailbox) msg_task_t task = task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id, peer->bitfield, FILE_PIECES); - MSG_task_dsend(task, mailbox, task_message_free); + //Async send and append to pending sends + msg_comm_t comm = MSG_task_isend(task, mailbox); + xbt_dynar_push(peer->pending_sends, &comm); } /**