-/* Copyright (c) 2012. The SimGrid Team.
+/* Copyright (c) 2012-2015. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
+
#include "peer.h"
#include "tracker.h"
#include "connection.h"
#include "messages.h"
-#include <msg/msg.h>
+#include <simgrid/msg.h>
#include <xbt/RngStream.h>
#include <limits.h>
#define FILE_PIECES 10
#define PIECES_BLOCKS 5
#define BLOCK_SIZE 16384
+#define ENABLE_END_GAME_MODE 1
/**
* Number of blocks asked by each request
#define BLOCKS_REQUESTED 2
-static const int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
+static const unsigned long int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
-void request_new_piece_to_peer(peer_t peer, connection_t remote_peer);
-void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece);
-void remove_current_piece(peer_t peer, connection_t remote_peer,
- int current_piece);
+#define SLEEP_DURATION 1
/**
* Peer main function
seed_loop(&peer, deadline);
} else {
leech_loop(&peer, deadline);
-// XBT_INFO("%d becomes a seeder", peer.id);
seed_loop(&peer, deadline);
}
} else {
* (since it couldn't have gotten more than 50 peers)
*/
send_handshake_all(peer);
- //Wait for at least one "bitfield" message.
-// wait_for_pieces(peer, deadline);
XBT_DEBUG("Starting main leech loop");
while (MSG_get_clock() < deadline && peer->pieces < FILE_PIECES) {
if (peer->comm_received == NULL) {
-// XBT_INFO("irecv");
peer->task_received = NULL;
peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
}
if (MSG_comm_test(peer->comm_received)) {
-// XBT_INFO("comm_test OK");
msg_error_t status = MSG_comm_get_status(peer->comm_received);
MSG_comm_destroy(peer->comm_received);
peer->comm_received = NULL;
handle_message(peer, peer->task_received);
}
} else {
- handle_pending_sends(peer);
//We don't execute the choke algorithm if we don't already have a piece
if (MSG_get_clock() >= next_choked_update && peer->pieces > 0) {
update_choked_peers(peer);
next_choked_update += UPDATE_CHOKED_INTERVAL;
} else {
- MSG_process_sleep(1);
+ MSG_process_sleep(SLEEP_DURATION);
}
}
}
+ if (peer->pieces == FILE_PIECES)
+ XBT_DEBUG("%d becomes a seeder", peer->id);
+
}
/**
//TODO: Change the choked peer algorithm when seeding.
next_choked_update += UPDATE_CHOKED_INTERVAL;
} else {
- MSG_process_sleep(1);
+ MSG_process_sleep(SLEEP_DURATION);
}
}
}
peer->current_pieces = xbt_dynar_new(sizeof(int), NULL);
- peer->stream = RngStream_CreateStream("");
+ peer->stream =
+ (RngStream)MSG_host_get_property_value(MSG_host_self(), "stream");
peer->comm_received = NULL;
peer->round = 0;
- peer->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
}
/**
xbt_dict_free(&peer->peers);
xbt_dict_free(&peer->active_peers);
xbt_dynar_free(&peer->current_pieces);
- xbt_dynar_free(&peer->pending_sends);
xbt_free(peer->pieces_count);
xbt_free(peer->bitfield);
xbt_free(peer->bitfield_blocks);
-
- RngStream_DeleteStream(&peer->stream);
}
/**
return nb;
}
-/**
- * Handle pending sends and remove those which are done
- * @param peer Peer data
- */
-void handle_pending_sends(peer_t peer)
-{
- int index;
-
- while ((index = MSG_comm_testany(peer->pending_sends)) != -1) {
- msg_comm_t comm_send =
- xbt_dynar_get_as(peer->pending_sends, index, msg_comm_t);
- int status = MSG_comm_get_status(comm_send);
- xbt_dynar_remove_at(peer->pending_sends, index, &comm_send);
- XBT_DEBUG
- ("Communication %p is finished with status %d, dynar size is now %lu",
- comm_send, status, xbt_dynar_length(peer->pending_sends));
-
- msg_task_t task = MSG_comm_get_task(comm_send);
- MSG_comm_destroy(comm_send);
-
- if (status != MSG_OK) {
- task_message_free(task);
- }
- }
-}
void update_active_peers_set(peer_t peer, connection_t remote_peer)
{
"A non-in-our-list peer has sent us a message. WTH ?");
XBT_DEBUG("Received a UNCHOKE message from %s (%s)", message->mailbox,
message->issuer_host_name);
+ xbt_assert(remote_peer->choked_download, "WTF !!!");
remote_peer->choked_download = 0;
//Send requests to the peer, since it has unchoked us
if (remote_peer->am_interested)
"A non-in-our-list peer has sent us a message. WTH ?");
XBT_DEBUG("Received a CHOKE message from %s (%s)", message->mailbox,
message->issuer_host_name);
+ xbt_assert(!remote_peer->choked_download, "WTF !!!");
remote_peer->choked_download = 1;
remove_current_piece(peer, remote_peer, remote_peer->current_piece);
break;
}
break;
case MESSAGE_REQUEST:
+ xbt_assert(remote_peer->interested, "WTF !!!");
+
xbt_assert((message->index >= 0
&& message->index < FILE_PIECES), "Wrong request received");
if (!remote_peer->choked_upload) {
XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)",
- message->mailbox, message->issuer_host_name, message->peer_id,
+ message->mailbox, message->issuer_host_name, message->index,
message->block_index,
message->block_index + message->block_length);
if (peer->bitfield[message->index] == '1') {
- send_piece(peer, message->mailbox, message->index, 0,
+ send_piece(peer, message->mailbox, message->index,
message->block_index, message->block_length);
}
} else {
}
break;
case MESSAGE_PIECE:
+ XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
+ message->block_index,
+ message->block_index + message->block_length,
+ message->mailbox, message->issuer_host_name);
+ xbt_assert(!remote_peer->choked_download, "WTF !!!");
+ xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE, "Can't received a piece if I'm not interested wihtout end-game mode! piece (%d) bitfield(%s) remote bitfield(%s)", message->index, peer->bitfield, remote_peer->bitfield);
+ xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
xbt_assert((message->index >= 0
&& message->index < FILE_PIECES), "Wrong piece received");
//TODO: Execute à computation.
- if (message->stalled) {
- XBT_DEBUG("The received piece %d from %s (%s) is STALLED",
- message->index, message->mailbox, message->issuer_host_name);
- } else {
- XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
- message->block_index,
- message->block_index + message->block_length,
- message->mailbox, message->issuer_host_name);
if (peer->bitfield[message->index] == '0') {
update_bitfield_blocks(peer, message->index, message->block_index,
message->block_length);
}
} else {
XBT_DEBUG("However, we already have it");
+ xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
request_new_piece_to_peer(peer, remote_peer);
}
- }
break;
case MESSAGE_CANCEL:
XBT_DEBUG("The received CANCEL from %s (%s)",
task_message_free(task);
}
+/**
+ * Selects the appropriate piece to download and requests it to the remote_peer
+ */
void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
{
int piece = select_piece_to_download(peer, remote_peer);
}
}
+/**
+ * remove current_piece from the list of currently downloaded pieces.
+ */
void remove_current_piece(peer_t peer, connection_t remote_peer,
int current_piece)
{
- int piece_index = -1, piece, i;
+ int piece_index = -1, piece;
+ unsigned int i;
xbt_dynar_foreach(peer->current_pieces, i, piece) {
if (piece == current_piece) {
piece_index = i;
* If the peer has more than pieces, he downloads the pieces that are the less
* replicated (rarest policy).
* If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
-
* @param peer: local peer
* @param remote_peer: information about the connection
* @return the piece to download if possible. -1 otherwise
// end game mode
if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces)
&& is_interested(peer, remote_peer)) {
+ if(!ENABLE_END_GAME_MODE)
+ return -1;
int i;
int nb_interesting_pieces = 0;
int random_piece_index, current_index = 0;
xbt_dict_size(peer->active_peers));
//update the current round
peer->round = (peer->round + 1) % 3;
- char *key, *key_choked;
+ char *key, *key_choked=NULL;
connection_t peer_choosed = NULL;
connection_t peer_choked = NULL;
//remove a peer from the list
char *key;
xbt_dict_cursor_t cursor;
connection_t connection;
- unsigned cpt;
- int interested, piece;
+ int interested;
xbt_dict_foreach(peer->peers, cursor, key, connection) {
interested = 0;
if (connection->am_interested) {
void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece)
{
remote_peer->current_piece = piece;
- unsigned i;
int block_index, block_length;
xbt_assert(remote_peer->bitfield, "bitfield not received");
xbt_assert(remote_peer->bitfield[piece] == '1', "WTF !!!");
msg_task_t task =
task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id,
peer->bitfield, FILE_PIECES);
- //Async send and append to pending sends
- msg_comm_t comm = MSG_task_isend(task, mailbox);
- xbt_dynar_push(peer->pending_sends, &comm);
+ MSG_task_dsend(task, mailbox, task_message_free);
}
/**
/**
* Send a "piece" message to a pair, containing a piece of the file
*/
-void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
+void send_piece(peer_t peer, const char *mailbox, int piece,
int block_index, int block_length)
{
XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index,
"Tried to send a piece that we doesn't have.");
msg_task_t task =
task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece,
- stalled, block_index, block_length, BLOCK_SIZE);
+ block_index, block_length, BLOCK_SIZE);
MSG_task_dsend(task, mailbox, task_message_free);
}