-void handle_message(peer_t peer, msg_task_t task) {
- message_t message = MSG_task_get_data(task);
- connection_t remote_peer;
- remote_peer = xbt_dict_get_or_null_ext(peer->peers, (char*)&message->peer_id, sizeof(int));
- switch (message->type) {
-
- case MESSAGE_HANDSHAKE:
- XBT_DEBUG("Received a HANDSHAKE from %s (%s)",message->mailbox, message->issuer_host_name);
- //Check if the peer is in our connection list.
- if (!remote_peer) {
- xbt_dict_set_ext(peer->peers,(char*)&message->peer_id,sizeof(int),connection_new(message->peer_id),NULL);
- send_handshake(peer,message->mailbox);
- }
- //Send our bitfield to the peer
- send_bitfield(peer,message->mailbox);
- break;
- case MESSAGE_BITFIELD:
- XBT_DEBUG("Recieved a BITFIELD message from %s (%s)",message->mailbox,message->issuer_host_name);
- //Update the pieces list
- update_pieces_count_from_bitfield(peer,message->bitfield);
- //Store the bitfield
- remote_peer->bitfield = xbt_strdup(message->bitfield);
- //Update the current piece
- if (peer->current_piece == -1 && peer->pieces < FILE_PIECES) {
- update_current_piece(peer);
- }
- break;
- case MESSAGE_INTERESTED:
- XBT_DEBUG("Recieved an INTERESTED message from %s (%s)",message->mailbox,message->issuer_host_name);
- xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
- //Update the interested state of the peer.
- remote_peer->interested = 1;
- break;
- case MESSAGE_NOTINTERESTED:
- XBT_DEBUG("Received a NOTINTERESTED message from %s (%s)", message->mailbox, message->issuer_host_name);
- xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
- remote_peer->interested = 0;
- break;
- case MESSAGE_UNCHOKE:
- xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
- XBT_DEBUG("Received a UNCHOKE message from %s (%s)",message->mailbox,message->issuer_host_name);
- remote_peer->choked_download = 0;
- xbt_dict_set_ext(peer->active_peers,(char*)&message->peer_id,sizeof(int),remote_peer,NULL);
- //Send requests to the peer, since it has unchoked us
- send_requests_to_peer(peer,remote_peer);
- break;
- case MESSAGE_CHOKE:
- xbt_assert((remote_peer != NULL), "A non-in-our-list peer has sent us a message. WTH ?");
- XBT_DEBUG("Received a CHOKE message from %s (%s)",message->mailbox,message->issuer_host_name);
- remote_peer->choked_download = 1;
- xbt_ex_t e;
- TRY {
- xbt_dict_remove_ext(peer->active_peers,(char*)&message->peer_id,sizeof(int));
- }
- CATCH(e) {
- xbt_ex_free(e);
- }
- break;
- case MESSAGE_HAVE:
- XBT_DEBUG("Received a HAVE message from %s (%s) of piece %d",message->mailbox, message->issuer_host_name, message->index);
- xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong HAVE message received");
- if (remote_peer->bitfield == NULL)
- return;
- remote_peer->bitfield[message->index] = '1';
- peer->pieces_count[message->index]++;
- //If the piece is in our pieces, we tell the peer that we are interested.
- if (!remote_peer->am_interested && in_current_pieces(peer, message->index)) {
- remote_peer->am_interested = 1;
- send_interested(peer,remote_peer->mailbox);
- }
- break;
- case MESSAGE_REQUEST:
- xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong request received");
- if (!remote_peer->choked_upload) {
- XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)",message->mailbox,message->issuer_host_name,message->peer_id, message->block_index,message->block_index + message->block_length);
- if (peer->bitfield[message->index] == '1') {
- send_piece(peer, message->mailbox, message->index, 0, message->block_index, message->block_length);
- }
- }
- else {
- XBT_DEBUG("Received a REQUEST from %s (%s) for %d but he is choked.",message->mailbox,message->issuer_host_name,message->peer_id);
- }
- break;
- case MESSAGE_PIECE:
- xbt_assert((message->index >= 0 && message->index < FILE_PIECES), "Wrong piece received");
- //TODO: Execute à computation.
- if (message->stalled) {
- XBT_DEBUG("The received piece %d from %s (%s) is STALLED",message->index, message->mailbox, message->issuer_host_name);
- }
- else {
- XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)",message->index, message->block_index, message->block_index + message->block_length, message->mailbox, message->issuer_host_name);
- if (peer->bitfield[message->index] == '0') {
- update_bitfield_blocks(peer,message->index,message->block_index,message->block_length);
- if (piece_complete(peer,message->index)) {
- peer->pieces_requested--;
- //Removing the piece from our piece list
- int piece_index = -1, i, piece;
- xbt_dynar_foreach(peer->current_pieces, i, piece) {
- if (piece == message->index) {
- piece_index = i;
- break;
- }
- }
- xbt_assert(piece_index != -1, "Received an incorrect piece");
- xbt_dynar_remove_at(peer->current_pieces,piece_index,NULL);
- //Setting the fact that we have the piece
- peer->bitfield[message->index] = '1';
- peer->pieces++;
- XBT_DEBUG("My status is now %s",peer->bitfield);
- //Sending the information to all the peers we are connected to
- send_have(peer,message->index);
- //sending UNINTERSTED to peers that doesn't have what we want.
- update_interested_after_receive(peer);
- }
- }
- else {
- XBT_DEBUG("However, we already have it");
- }
- }
- break;
- }
- //Update the peer speed.
- if (remote_peer) {
- connection_add_speed_value(remote_peer, 1.0 / ( MSG_get_clock() - peer->begin_receive_time));
- }
- peer->begin_receive_time = MSG_get_clock();
-
- task_message_free(task);
+void handle_message(peer_t peer, msg_task_t task)
+{
+ message_t message = MSG_task_get_data(task);
+ connection_t remote_peer;
+ remote_peer =
+ xbt_dict_get_or_null_ext(peer->peers, (char *) &message->peer_id,
+ sizeof(int));
+ switch (message->type) {
+
+ case MESSAGE_HANDSHAKE:
+ XBT_DEBUG("Received a HANDSHAKE from %s (%s)", message->mailbox,
+ message->issuer_host_name);
+ //Check if the peer is in our connection list.
+ if (!remote_peer) {
+ xbt_dict_set_ext(peer->peers, (char *) &message->peer_id, sizeof(int),
+ connection_new(message->peer_id), NULL);
+ send_handshake(peer, message->mailbox);
+ }
+ //Send our bitfield to the peer
+ send_bitfield(peer, message->mailbox);
+ break;
+ case MESSAGE_BITFIELD:
+ XBT_DEBUG("Recieved a BITFIELD message from %s (%s)", message->mailbox,
+ message->issuer_host_name);
+ //Update the pieces list
+ update_pieces_count_from_bitfield(peer, message->bitfield);
+ //Store the bitfield
+ remote_peer->bitfield = xbt_strdup(message->bitfield);
+ //Update the current piece
+ if (peer->current_piece == -1 && peer->pieces < FILE_PIECES) {
+ update_current_piece(peer);
+ }
+ break;
+ case MESSAGE_INTERESTED:
+ XBT_DEBUG("Recieved an INTERESTED message from %s (%s)", message->mailbox,
+ message->issuer_host_name);
+ xbt_assert((remote_peer != NULL),
+ "A non-in-our-list peer has sent us a message. WTH ?");
+ //Update the interested state of the peer.
+ remote_peer->interested = 1;
+ break;
+ case MESSAGE_NOTINTERESTED:
+ XBT_DEBUG("Received a NOTINTERESTED message from %s (%s)", message->mailbox,
+ message->issuer_host_name);
+ xbt_assert((remote_peer != NULL),
+ "A non-in-our-list peer has sent us a message. WTH ?");
+ remote_peer->interested = 0;
+ break;
+ case MESSAGE_UNCHOKE:
+ xbt_assert((remote_peer != NULL),
+ "A non-in-our-list peer has sent us a message. WTH ?");
+ XBT_DEBUG("Received a UNCHOKE message from %s (%s)", message->mailbox,
+ message->issuer_host_name);
+ remote_peer->choked_download = 0;
+ xbt_dict_set_ext(peer->active_peers, (char *) &message->peer_id,
+ sizeof(int), remote_peer, NULL);
+ //Send requests to the peer, since it has unchoked us
+ send_requests_to_peer(peer, remote_peer);
+ break;
+ case MESSAGE_CHOKE:
+ xbt_assert((remote_peer != NULL),
+ "A non-in-our-list peer has sent us a message. WTH ?");
+ XBT_DEBUG("Received a CHOKE message from %s (%s)", message->mailbox,
+ message->issuer_host_name);
+ remote_peer->choked_download = 1;
+ xbt_ex_t e;
+ TRY {
+ xbt_dict_remove_ext(peer->active_peers, (char *) &message->peer_id,
+ sizeof(int));
+ }
+ CATCH(e) {
+ xbt_ex_free(e);
+ }
+ break;
+ case MESSAGE_HAVE:
+ XBT_DEBUG("Received a HAVE message from %s (%s) of piece %d",
+ message->mailbox, message->issuer_host_name, message->index);
+ xbt_assert((message->index >= 0
+ && message->index < FILE_PIECES),
+ "Wrong HAVE message received");
+ if (remote_peer->bitfield == NULL)
+ return;
+ remote_peer->bitfield[message->index] = '1';
+ peer->pieces_count[message->index]++;
+ //If the piece is in our pieces, we tell the peer that we are interested.
+ if (!remote_peer->am_interested && in_current_pieces(peer, message->index)) {
+ remote_peer->am_interested = 1;
+ send_interested(peer, remote_peer->mailbox);
+ }
+ break;
+ case MESSAGE_REQUEST:
+ xbt_assert((message->index >= 0
+ && message->index < FILE_PIECES), "Wrong request received");
+ if (!remote_peer->choked_upload) {
+ XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)",
+ message->mailbox, message->issuer_host_name, message->peer_id,
+ message->block_index,
+ message->block_index + message->block_length);
+ if (peer->bitfield[message->index] == '1') {
+ send_piece(peer, message->mailbox, message->index, 0,
+ message->block_index, message->block_length);
+ }
+ } else {
+ XBT_DEBUG("Received a REQUEST from %s (%s) for %d but he is choked.",
+ message->mailbox, message->issuer_host_name, message->peer_id);
+ }
+ break;
+ case MESSAGE_PIECE:
+ xbt_assert((message->index >= 0
+ && message->index < FILE_PIECES), "Wrong piece received");
+ //TODO: Execute à computation.
+ if (message->stalled) {
+ XBT_DEBUG("The received piece %d from %s (%s) is STALLED", message->index,
+ message->mailbox, message->issuer_host_name);
+ } else {
+ XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
+ message->block_index,
+ message->block_index + message->block_length, message->mailbox,
+ message->issuer_host_name);
+ if (peer->bitfield[message->index] == '0') {
+ update_bitfield_blocks(peer, message->index, message->block_index,
+ message->block_length);
+ if (piece_complete(peer, message->index)) {
+ peer->pieces_requested--;
+ //Removing the piece from our piece list
+ int piece_index = -1, i, piece;
+ xbt_dynar_foreach(peer->current_pieces, i, piece) {
+ if (piece == message->index) {
+ piece_index = i;
+ break;
+ }
+ }
+ xbt_assert(piece_index != -1, "Received an incorrect piece");
+ xbt_dynar_remove_at(peer->current_pieces, piece_index, NULL);
+ //Setting the fact that we have the piece
+ peer->bitfield[message->index] = '1';
+ peer->pieces++;
+ XBT_DEBUG("My status is now %s", peer->bitfield);
+ //Sending the information to all the peers we are connected to
+ send_have(peer, message->index);
+ //sending UNINTERSTED to peers that doesn't have what we want.
+ update_interested_after_receive(peer);
+ }
+ } else {
+ XBT_DEBUG("However, we already have it");
+ }
+ }
+ break;
+ }
+ //Update the peer speed.
+ if (remote_peer) {
+ connection_add_speed_value(remote_peer,
+ 1.0 / (MSG_get_clock() -
+ peer->begin_receive_time));
+ }
+ peer->begin_receive_time = MSG_get_clock();
+
+ task_message_free(task);