X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/a056c753e9e4cfb9147a8184e4db442ff3b99b43..c83246c94a2ea53cc13a509c8f39da2c3403fc38:/examples/msg/bittorrent/peer.c diff --git a/examples/msg/bittorrent/peer.c b/examples/msg/bittorrent/peer.c index f770a85b32..9219ee17ae 100644 --- a/examples/msg/bittorrent/peer.c +++ b/examples/msg/bittorrent/peer.c @@ -48,6 +48,7 @@ int peer(int argc, char *argv[]) XBT_DEBUG("Got %d peers from the tracker", xbt_dict_length(peer.peers)); XBT_DEBUG("Here is my current status: %s", peer.bitfield); peer.begin_receive_time = MSG_get_clock(); + MSG_mailbox_set_async(peer.mailbox); if (has_finished(peer.bitfield)) { peer.pieces = FILE_PIECES; send_handshake_all(&peer); @@ -89,30 +90,32 @@ void leech_loop(peer_t peer, double deadline) while (MSG_get_clock() < deadline && peer->pieces < FILE_PIECES) { if (peer->comm_received == NULL) { peer->task_received = NULL; - peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox); + peer->comm_received = + MSG_task_irecv(&peer->task_received, peer->mailbox); } if (MSG_comm_test(peer->comm_received)) { msg_error_t status = MSG_comm_get_status(peer->comm_received); MSG_comm_destroy(peer->comm_received); peer->comm_received = NULL; if (status == MSG_OK) { - handle_message(peer, peer->task_received); + handle_message(peer, peer->task_received); } } else { + handle_pending_sends(peer); if (peer->current_piece != -1) { - send_interested_to_peers(peer); + send_interested_to_peers(peer); } else { - //If the current interested pieces is < MAX - if (peer->pieces_requested < MAX_PIECES) { - update_current_piece(peer); - } + //If the current interested pieces is < MAX + if (peer->pieces_requested < MAX_PIECES) { + update_current_piece(peer); + } } //We don't execute the choke algorithm if we don't already have a piece if (MSG_get_clock() >= next_choked_update && peer->pieces > 0) { - update_choked_peers(peer); - next_choked_update += UPDATE_CHOKED_INTERVAL; + update_choked_peers(peer); + next_choked_update += UPDATE_CHOKED_INTERVAL; } else { - MSG_process_sleep(1); + MSG_process_sleep(1); } } } @@ -131,22 +134,23 @@ void seed_loop(peer_t peer, double deadline) while (MSG_get_clock() < deadline) { if (peer->comm_received == NULL) { peer->task_received = NULL; - peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox); + peer->comm_received = + MSG_task_irecv(&peer->task_received, peer->mailbox); } if (MSG_comm_test(peer->comm_received)) { msg_error_t status = MSG_comm_get_status(peer->comm_received); MSG_comm_destroy(peer->comm_received); peer->comm_received = NULL; if (status == MSG_OK) { - handle_message(peer, peer->task_received); + handle_message(peer, peer->task_received); } } else { if (MSG_get_clock() >= next_choked_update) { - update_choked_peers(peer); - //TODO: Change the choked peer algorithm when seeding. - next_choked_update += UPDATE_CHOKED_INTERVAL; + update_choked_peers(peer); + //TODO: Change the choked peer algorithm when seeding. + next_choked_update += UPDATE_CHOKED_INTERVAL; } else { - MSG_process_sleep(1); + MSG_process_sleep(1); } } } @@ -162,8 +166,8 @@ int get_peers_data(peer_t peer) double timeout = MSG_get_clock() + GET_PEERS_TIMEOUT; //Build the task to send to the tracker tracker_task_data_t data = - tracker_task_data_new(MSG_host_get_name(MSG_host_self()), - peer->mailbox_tracker, peer->id, 0, 0, FILE_SIZE); + tracker_task_data_new(MSG_host_get_name(MSG_host_self()), + peer->mailbox_tracker, peer->id, 0, 0, FILE_SIZE); //Build the task to send. msg_task_t task_send = MSG_task_create(NULL, 0, TRACKER_COMM_SIZE, data); msg_task_t task_received = NULL; @@ -171,8 +175,8 @@ int get_peers_data(peer_t peer) while (!send_success && MSG_get_clock() < timeout) { XBT_DEBUG("Sending a peer request to the tracker."); msg_error_t status = - MSG_task_send_with_timeout(task_send, TRACKER_MAILBOX, - GET_PEERS_TIMEOUT); + MSG_task_send_with_timeout(task_send, TRACKER_MAILBOX, + GET_PEERS_TIMEOUT); if (status == MSG_OK) { send_success = 1; } @@ -186,9 +190,9 @@ int get_peers_data(peer_t peer) int peer_id; //Add the peers the tracker gave us to our peer list. xbt_dynar_foreach(data->peers, i, peer_id) { - if (peer_id != peer->id) - xbt_dict_set_ext(peer->peers, (char *) &peer_id, sizeof(int), - connection_new(peer_id), NULL); + if (peer_id != peer->id) + xbt_dict_set_ext(peer->peers, (char *) &peer_id, sizeof(int), + connection_new(peer_id), NULL); } success = 1; //free the communication and the task @@ -222,11 +226,11 @@ void peer_init(peer_t peer, int id, int seed) if (seed) { memset(peer->bitfield, '1', sizeof(char) * (FILE_PIECES + 1)); memset(peer->bitfield_blocks, '1', - sizeof(char) * FILE_PIECES * (PIECES_BLOCKS)); + sizeof(char) * FILE_PIECES * (PIECES_BLOCKS)); } else { memset(peer->bitfield, '0', sizeof(char) * (FILE_PIECES + 1)); memset(peer->bitfield_blocks, '0', - sizeof(char) * FILE_PIECES * (PIECES_BLOCKS)); + sizeof(char) * FILE_PIECES * (PIECES_BLOCKS)); } peer->bitfield[FILE_PIECES] = '\0'; @@ -238,10 +242,12 @@ void peer_init(peer_t peer, int id, int seed) peer->current_pieces = xbt_dynar_new(sizeof(int), NULL); peer->current_piece = -1; - peer->stream = RngStream_CreateStream(""); + peer->stream = MSG_host_get_data(MSG_host_self()); peer->comm_received = NULL; peer->round = 0; + + peer->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL); } /** @@ -258,11 +264,10 @@ void peer_free(peer_t peer) xbt_dict_free(&peer->peers); xbt_dict_free(&peer->active_peers); xbt_dynar_free(&peer->current_pieces); + xbt_dynar_free(&peer->pending_sends); xbt_free(peer->pieces_count); xbt_free(peer->bitfield); xbt_free(peer->bitfield_blocks); - - RngStream_DeleteStream(&peer->stream); } /** @@ -271,7 +276,34 @@ void peer_free(peer_t peer) */ int has_finished(char *bitfield) { - return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0); + return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == + NULL) ? 1 : 0); +} + +/** + * Handle pending sends and remove those which are done + * @param peer Peer data + */ +void handle_pending_sends(peer_t peer) +{ + int index; + + while ((index = MSG_comm_testany(peer->pending_sends)) != -1) { + msg_comm_t comm_send = + xbt_dynar_get_as(peer->pending_sends, index, msg_comm_t); + int status = MSG_comm_get_status(comm_send); + xbt_dynar_remove_at(peer->pending_sends, index, &comm_send); + XBT_DEBUG + ("Communication %p is finished with status %d, dynar size is now %lu", + comm_send, status, xbt_dynar_length(peer->pending_sends)); + + msg_task_t task = MSG_comm_get_task(comm_send); + MSG_comm_destroy(comm_send); + + if (status != MSG_OK) { + task_message_free(task); + } + } } /** @@ -284,17 +316,17 @@ void handle_message(peer_t peer, msg_task_t task) message_t message = MSG_task_get_data(task); connection_t remote_peer; remote_peer = - xbt_dict_get_or_null_ext(peer->peers, (char *) &message->peer_id, - sizeof(int)); + xbt_dict_get_or_null_ext(peer->peers, (char *) &message->peer_id, + sizeof(int)); switch (message->type) { case MESSAGE_HANDSHAKE: XBT_DEBUG("Received a HANDSHAKE from %s (%s)", message->mailbox, - message->issuer_host_name); + message->issuer_host_name); //Check if the peer is in our connection list. if (!remote_peer) { xbt_dict_set_ext(peer->peers, (char *) &message->peer_id, sizeof(int), - connection_new(message->peer_id), NULL); + connection_new(message->peer_id), NULL); send_handshake(peer, message->mailbox); } //Send our bitfield to the peer @@ -302,7 +334,7 @@ void handle_message(peer_t peer, msg_task_t task) break; case MESSAGE_BITFIELD: XBT_DEBUG("Recieved a BITFIELD message from %s (%s)", message->mailbox, - message->issuer_host_name); + message->issuer_host_name); //Update the pieces list update_pieces_count_from_bitfield(peer, message->bitfield); //Store the bitfield @@ -314,40 +346,40 @@ void handle_message(peer_t peer, msg_task_t task) break; case MESSAGE_INTERESTED: XBT_DEBUG("Recieved an INTERESTED message from %s (%s)", message->mailbox, - message->issuer_host_name); + message->issuer_host_name); xbt_assert((remote_peer != NULL), - "A non-in-our-list peer has sent us a message. WTH ?"); + "A non-in-our-list peer has sent us a message. WTH ?"); //Update the interested state of the peer. remote_peer->interested = 1; break; case MESSAGE_NOTINTERESTED: - XBT_DEBUG("Received a NOTINTERESTED message from %s (%s)", message->mailbox, - message->issuer_host_name); + XBT_DEBUG("Received a NOTINTERESTED message from %s (%s)", + message->mailbox, message->issuer_host_name); xbt_assert((remote_peer != NULL), - "A non-in-our-list peer has sent us a message. WTH ?"); + "A non-in-our-list peer has sent us a message. WTH ?"); remote_peer->interested = 0; break; case MESSAGE_UNCHOKE: xbt_assert((remote_peer != NULL), - "A non-in-our-list peer has sent us a message. WTH ?"); + "A non-in-our-list peer has sent us a message. WTH ?"); XBT_DEBUG("Received a UNCHOKE message from %s (%s)", message->mailbox, - message->issuer_host_name); + message->issuer_host_name); remote_peer->choked_download = 0; xbt_dict_set_ext(peer->active_peers, (char *) &message->peer_id, - sizeof(int), remote_peer, NULL); + sizeof(int), remote_peer, NULL); //Send requests to the peer, since it has unchoked us send_requests_to_peer(peer, remote_peer); break; case MESSAGE_CHOKE: xbt_assert((remote_peer != NULL), - "A non-in-our-list peer has sent us a message. WTH ?"); + "A non-in-our-list peer has sent us a message. WTH ?"); XBT_DEBUG("Received a CHOKE message from %s (%s)", message->mailbox, - message->issuer_host_name); + message->issuer_host_name); remote_peer->choked_download = 1; xbt_ex_t e; TRY { xbt_dict_remove_ext(peer->active_peers, (char *) &message->peer_id, - sizeof(int)); + sizeof(int)); } CATCH(e) { xbt_ex_free(e); @@ -355,76 +387,78 @@ void handle_message(peer_t peer, msg_task_t task) break; case MESSAGE_HAVE: XBT_DEBUG("Received a HAVE message from %s (%s) of piece %d", - message->mailbox, message->issuer_host_name, message->index); + message->mailbox, message->issuer_host_name, message->index); xbt_assert((message->index >= 0 - && message->index < FILE_PIECES), - "Wrong HAVE message received"); + && message->index < FILE_PIECES), + "Wrong HAVE message received"); if (remote_peer->bitfield == NULL) return; remote_peer->bitfield[message->index] = '1'; peer->pieces_count[message->index]++; //If the piece is in our pieces, we tell the peer that we are interested. - if (!remote_peer->am_interested && in_current_pieces(peer, message->index)) { + if (!remote_peer->am_interested + && in_current_pieces(peer, message->index)) { remote_peer->am_interested = 1; send_interested(peer, remote_peer->mailbox); } break; case MESSAGE_REQUEST: xbt_assert((message->index >= 0 - && message->index < FILE_PIECES), "Wrong request received"); + && message->index < FILE_PIECES), "Wrong request received"); if (!remote_peer->choked_upload) { XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)", - message->mailbox, message->issuer_host_name, message->peer_id, - message->block_index, - message->block_index + message->block_length); + message->mailbox, message->issuer_host_name, message->peer_id, + message->block_index, + message->block_index + message->block_length); if (peer->bitfield[message->index] == '1') { - send_piece(peer, message->mailbox, message->index, 0, - message->block_index, message->block_length); + send_piece(peer, message->mailbox, message->index, 0, + message->block_index, message->block_length); } } else { XBT_DEBUG("Received a REQUEST from %s (%s) for %d but he is choked.", - message->mailbox, message->issuer_host_name, message->peer_id); + message->mailbox, message->issuer_host_name, + message->peer_id); } break; case MESSAGE_PIECE: xbt_assert((message->index >= 0 - && message->index < FILE_PIECES), "Wrong piece received"); + && message->index < FILE_PIECES), "Wrong piece received"); //TODO: Execute à computation. if (message->stalled) { - XBT_DEBUG("The received piece %d from %s (%s) is STALLED", message->index, - message->mailbox, message->issuer_host_name); + XBT_DEBUG("The received piece %d from %s (%s) is STALLED", + message->index, message->mailbox, message->issuer_host_name); } else { XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index, - message->block_index, - message->block_index + message->block_length, message->mailbox, - message->issuer_host_name); + message->block_index, + message->block_index + message->block_length, + message->mailbox, message->issuer_host_name); if (peer->bitfield[message->index] == '0') { - update_bitfield_blocks(peer, message->index, message->block_index, - message->block_length); - if (piece_complete(peer, message->index)) { - peer->pieces_requested--; - //Removing the piece from our piece list - unsigned i; - int piece_index = -1, piece; - xbt_dynar_foreach(peer->current_pieces, i, piece) { - if (piece == message->index) { - piece_index = i; - break; - } - } - xbt_assert(piece_index != -1, "Received an incorrect piece"); - xbt_dynar_remove_at(peer->current_pieces, piece_index, NULL); - //Setting the fact that we have the piece - peer->bitfield[message->index] = '1'; - peer->pieces++; - XBT_DEBUG("My status is now %s", peer->bitfield); - //Sending the information to all the peers we are connected to - send_have(peer, message->index); - //sending UNINTERSTED to peers that doesn't have what we want. - update_interested_after_receive(peer); - } + update_bitfield_blocks(peer, message->index, message->block_index, + message->block_length); + if (piece_complete(peer, message->index)) { + peer->pieces_requested--; + //Removing the piece from our piece list + unsigned i; + int piece_index = -1, piece; + xbt_dynar_foreach(peer->current_pieces, i, piece) { + if (piece == message->index) { + piece_index = i; + break; + } + } + xbt_assert(piece_index != -1, "Received an incorrect piece"); + xbt_dynar_remove_at(peer->current_pieces, piece_index, NULL); + //Setting the fact that we have the piece + peer->bitfield[message->index] = '1'; + peer->pieces++; + XBT_DEBUG("My status is now %s", peer->bitfield); + //Sending the information to all the peers we are connected to + send_have(peer, message->index); + //sending UNINTERSTED to peers that doesn't have what we want. + update_interested_after_receive(peer); + } } else { - XBT_DEBUG("However, we already have it"); + XBT_DEBUG("However, we already have it"); } } break; @@ -434,8 +468,8 @@ void handle_message(peer_t peer, msg_task_t task) //Update the peer speed. if (remote_peer) { connection_add_speed_value(remote_peer, - 1.0 / (MSG_get_clock() - - peer->begin_receive_time)); + 1.0 / (MSG_get_clock() - + peer->begin_receive_time)); } peer->begin_receive_time = MSG_get_clock(); @@ -454,7 +488,8 @@ void wait_for_pieces(peer_t peer, double deadline) while (MSG_get_clock() < deadline && !finished) { if (peer->comm_received == NULL) { peer->task_received = NULL; - peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox); + peer->comm_received = + MSG_task_irecv(&peer->task_received, peer->mailbox); } msg_error_t status = MSG_comm_wait(peer->comm_received, TIMEOUT_MESSAGE); //free the comm already, we don't need it anymore @@ -464,7 +499,7 @@ void wait_for_pieces(peer_t peer, double deadline) MSG_task_get_data(peer->task_received); handle_message(peer, peer->task_received); if (peer->current_piece != -1) { - finished = 1; + finished = 1; } } } @@ -501,27 +536,27 @@ void update_current_piece(peer_t peer) int i = 0; do { peer->current_piece = - RngStream_RandInt(peer->stream, 0, FILE_PIECES - 1);; + RngStream_RandInt(peer->stream, 0, FILE_PIECES - 1);; i++; } while (! - (peer->bitfield[peer->current_piece] == '0' - && !in_current_pieces(peer, peer->current_piece))); + (peer->bitfield[peer->current_piece] == '0' + && !in_current_pieces(peer, peer->current_piece))); } else { //Trivial min algorithm. int i, min_id = -1; short min = -1; for (i = 0; i < FILE_PIECES; i++) { if (peer->bitfield[i] == '0') { - min = peer->pieces_count[i]; - min_id = i; - break; + min = peer->pieces_count[i]; + min_id = i; + break; } } xbt_assert((min > -1), "Couldn't find a minimum"); for (i = 1; i < FILE_PIECES; i++) { if (peer->pieces_count[i] < min && peer->bitfield[i] == '0') { - min = peer->pieces_count[i]; - min_id = i; + min = peer->pieces_count[i]; + min_id = i; } } peer->current_piece = min_id; @@ -529,7 +564,7 @@ void update_current_piece(peer_t peer) xbt_dynar_push_as(peer->current_pieces, int, peer->current_piece); XBT_DEBUG("New interested piece: %d", peer->current_piece); xbt_assert((peer->current_piece >= 0 && peer->current_piece < FILE_PIECES), - "Peer want to retrieve a piece that doesn't exist."); + "Peer want to retrieve a piece that doesn't exist."); } /** @@ -546,7 +581,7 @@ void update_choked_peers(peer_t peer) //remove a peer from the list xbt_dict_cursor_t cursor = NULL; xbt_dict_cursor_first(peer->active_peers, &cursor); - if (xbt_dict_length(peer->active_peers) > 0) { + if (!xbt_dict_is_empty(peer->active_peers)) { key = xbt_dict_cursor_get_key(cursor); connection_t peer_choked = xbt_dict_cursor_get_data(cursor); if (peer_choked) { @@ -557,7 +592,7 @@ void update_choked_peers(peer_t peer) } xbt_dict_cursor_free(&cursor); - /** + /** * If we are currently seeding, we unchoke the peer which has * been unchoke the least time. */ @@ -567,8 +602,8 @@ void update_choked_peers(peer_t peer) xbt_dict_foreach(peer->peers, cursor, key, connection) { if (connection->last_unchoke < unchoke_time && connection->interested) { - unchoke_time = connection->last_unchoke; - peer_choosed = connection; + unchoke_time = connection->last_unchoke; + peer_choosed = connection; } } } else { @@ -576,35 +611,34 @@ void update_choked_peers(peer_t peer) if (peer->round == 0) { int j = 0; do { - //We choose a random peer to unchoke. - int id_chosen = - RngStream_RandInt(peer->stream, 0, - xbt_dict_length(peer->peers) - 1); - int i = 0; - connection_t connection; - xbt_dict_foreach(peer->peers, cursor, key, connection) { - if (i == id_chosen) { - peer_choosed = connection; - break; - } - i++; - } - xbt_dict_cursor_free(&cursor); - if (peer_choosed->interested == 0) { - peer_choosed = NULL; - } - j++; + //We choose a random peer to unchoke. + int id_chosen = RngStream_RandInt(peer->stream, 0, + xbt_dict_length(peer->peers) - 1); + int i = 0; + connection_t connection; + xbt_dict_foreach(peer->peers, cursor, key, connection) { + if (i == id_chosen) { + peer_choosed = connection; + break; + } + i++; + } + xbt_dict_cursor_free(&cursor); + if (peer_choosed->interested == 0) { + peer_choosed = NULL; + } + j++; } while (peer_choosed == NULL && j < MAXIMUM_PAIRS); } else { //Use the "fastest download" policy. connection_t connection; double fastest_speed = 0.0; xbt_dict_foreach(peer->peers, cursor, key, connection) { - if (connection->peer_speed > fastest_speed && connection->choked_upload - && connection->interested) { - peer_choosed = connection; - fastest_speed = connection->peer_speed; - } + if (connection->peer_speed > fastest_speed + && connection->choked_upload && connection->interested) { + peer_choosed = connection; + fastest_speed = connection->peer_speed; + } } } @@ -612,7 +646,7 @@ void update_choked_peers(peer_t peer) if (peer_choosed != NULL) { peer_choosed->choked_upload = 0; xbt_dict_set_ext(peer->active_peers, (char *) &peer_choosed->id, - sizeof(int), peer_choosed, NULL); + sizeof(int), peer_choosed, NULL); peer_choosed->last_unchoke = MSG_get_clock(); send_unchoked(peer, peer_choosed->mailbox); } @@ -636,28 +670,28 @@ void update_interested_after_receive(peer_t peer) if (connection->am_interested) { //Check if the peer still has a piece we want. xbt_dynar_foreach(peer->current_pieces, cpt, piece) { - xbt_assert((piece >= 0), "Wrong piece."); - if (connection->bitfield && connection->bitfield[piece] == '1') { - interested = 1; - break; - } + xbt_assert((piece >= 0), "Wrong piece."); + if (connection->bitfield && connection->bitfield[piece] == '1') { + interested = 1; + break; + } } if (!interested) { - connection->am_interested = 0; - send_notinterested(peer, connection->mailbox); + connection->am_interested = 0; + send_notinterested(peer, connection->mailbox); } } } } void update_bitfield_blocks(peer_t peer, int index, int block_index, - int block_length) + int block_length) { int i; xbt_assert((index >= 0 && index <= FILE_PIECES), "Wrong piece."); xbt_assert((block_index >= 0 - && block_index <= PIECES_BLOCKS), "Wrong block : %d.", - block_index); + && block_index <= PIECES_BLOCKS), "Wrong block : %d.", + block_index); for (i = block_index; i < (block_index + block_length); i++) { peer->bitfield_blocks[index * PIECES_BLOCKS + i] = '1'; } @@ -704,11 +738,11 @@ void send_requests_to_peer(peer_t peer, connection_t remote_peer) if (remote_peer->bitfield && remote_peer->bitfield[piece] == '1') { block_index = get_first_block(peer, piece); if (block_index != -1) { - block_length = PIECES_BLOCKS - block_index; - block_length = min(BLOCKS_REQUESTED, block_length); - send_request(peer, remote_peer->mailbox, piece, block_index, - block_length); - break; + block_length = PIECES_BLOCKS - block_index; + block_length = min(BLOCKS_REQUESTED, block_length); + send_request(peer, remote_peer->mailbox, piece, block_index, + block_length); + break; } } } @@ -724,14 +758,14 @@ void send_interested_to_peers(peer_t peer) xbt_dict_cursor_t cursor = NULL; connection_t connection; xbt_assert((peer->current_piece != -1), - "Tried to send a interested message wheras the current_piece is -1"); + "Tried to send a interested message wheras the current_piece is -1"); xbt_dict_foreach(peer->peers, cursor, key, connection) { if (connection->bitfield - && connection->bitfield[peer->current_piece] == '1') { + && connection->bitfield[peer->current_piece] == '1') { connection->am_interested = 1; msg_task_t task = - task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox, - peer->id, task_message_size(MESSAGE_INTERESTED)); + task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox, + peer->id, task_message_size(MESSAGE_INTERESTED)); MSG_task_dsend(task, connection->mailbox, task_message_free); XBT_DEBUG("Send INTERESTED to %s", connection->mailbox); } @@ -748,8 +782,8 @@ void send_interested_to_peers(peer_t peer) void send_interested(peer_t peer, const char *mailbox) { msg_task_t task = - task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox, - peer->id, task_message_size(MESSAGE_INTERESTED)); + task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox, + peer->id, task_message_size(MESSAGE_INTERESTED)); MSG_task_dsend(task, mailbox, task_message_free); XBT_DEBUG("Sending INTERESTED to %s", mailbox); @@ -763,8 +797,8 @@ void send_interested(peer_t peer, const char *mailbox) void send_notinterested(peer_t peer, const char *mailbox) { msg_task_t task = - task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox, - peer->id, task_message_size(MESSAGE_NOTINTERESTED)); + task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox, + peer->id, task_message_size(MESSAGE_NOTINTERESTED)); MSG_task_dsend(task, mailbox, task_message_free); XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox); @@ -781,8 +815,8 @@ void send_handshake_all(peer_t peer) char *key; xbt_dict_foreach(peer->peers, cursor, key, remote_peer) { msg_task_t task = - task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox, - peer->id, task_message_size(MESSAGE_HANDSHAKE)); + task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox, + peer->id, task_message_size(MESSAGE_HANDSHAKE)); MSG_task_dsend(task, remote_peer->mailbox, task_message_free); XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox); } @@ -796,8 +830,8 @@ void send_handshake_all(peer_t peer) void send_handshake(peer_t peer, const char *mailbox) { msg_task_t task = - task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox, - peer->id, task_message_size(MESSAGE_HANDSHAKE)); + task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox, + peer->id, task_message_size(MESSAGE_HANDSHAKE)); MSG_task_dsend(task, mailbox, task_message_free); XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox); } @@ -809,8 +843,8 @@ void send_choked(peer_t peer, const char *mailbox) { XBT_DEBUG("Sending a CHOKE to %s", mailbox); msg_task_t task = - task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, - peer->id, task_message_size(MESSAGE_CHOKE)); + task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, + peer->id, task_message_size(MESSAGE_CHOKE)); MSG_task_dsend(task, mailbox, task_message_free); } @@ -821,8 +855,8 @@ void send_unchoked(peer_t peer, const char *mailbox) { XBT_DEBUG("Sending a UNCHOKE to %s", mailbox); msg_task_t task = - task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox, - peer->id, task_message_size(MESSAGE_UNCHOKE)); + task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox, + peer->id, task_message_size(MESSAGE_UNCHOKE)); MSG_task_dsend(task, mailbox, task_message_free); } @@ -837,8 +871,9 @@ void send_have(peer_t peer, int piece) char *key; xbt_dict_foreach(peer->peers, cursor, key, remote_peer) { msg_task_t task = - task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox, - peer->id, piece, task_message_size(MESSAGE_HAVE)); + task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox, + peer->id, piece, + task_message_size(MESSAGE_HAVE)); MSG_task_dsend(task, remote_peer->mailbox, task_message_free); } } @@ -851,22 +886,24 @@ void send_bitfield(peer_t peer, const char *mailbox) { XBT_DEBUG("Sending a BITFIELD to %s", mailbox); msg_task_t task = - task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id, - peer->bitfield, FILE_PIECES); - MSG_task_dsend(task, mailbox, task_message_free); + task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id, + peer->bitfield, FILE_PIECES); + //Async send and append to pending sends + msg_comm_t comm = MSG_task_isend(task, mailbox); + xbt_dynar_push(peer->pending_sends, &comm); } /** * Send a "request" message to a pair, containing a request for a piece */ -void send_request(peer_t peer, const char *mailbox, int piece, int block_index, - int block_length) +void send_request(peer_t peer, const char *mailbox, int piece, + int block_index, int block_length) { XBT_DEBUG("Sending a REQUEST to %s for piece %d (%d,%d)", mailbox, piece, - block_index, block_length); + block_index, block_length); msg_task_t task = - task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece, - block_index, block_length); + task_message_request_new(peer->hostname, peer->mailbox, peer->id, piece, + block_index, block_length); MSG_task_dsend(task, mailbox, task_message_free); } @@ -874,16 +911,16 @@ void send_request(peer_t peer, const char *mailbox, int piece, int block_index, * Send a "piece" message to a pair, containing a piece of the file */ void send_piece(peer_t peer, const char *mailbox, int piece, int stalled, - int block_index, int block_length) + int block_index, int block_length) { XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index, - block_length, mailbox); + block_length, mailbox); xbt_assert(piece >= 0, "Tried to send a piece that doesn't exist."); xbt_assert((peer->bitfield[piece] == '1'), - "Tried to send a piece that we doesn't have."); + "Tried to send a piece that we doesn't have."); msg_task_t task = - task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece, - stalled, block_index, block_length, BLOCK_SIZE); + task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece, + stalled, block_index, block_length, BLOCK_SIZE); MSG_task_dsend(task, mailbox, task_message_free); }