Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Make generate_memcheck_tests.pl handle command "mkfile".
[simgrid.git] / examples / msg / bittorrent / peer.c
index 70e2a98..026be29 100644 (file)
 
 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
 
+//TODO: Let users change this
+/*
+ * File transfered data
+ *
+ * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
+ */
+static int FILE_SIZE = 10 * 5 * 16384;
+static int FILE_PIECES = 10;
 
+static int PIECES_BLOCKS = 5;
+static int BLOCK_SIZE = 16384;
+static int BLOCKS_REQUESTED = 2;
 
 /**
  * Peer main function
@@ -88,6 +99,7 @@ void leech_loop(peer_t peer, double deadline)
         handle_message(peer, peer->task_received);
       }
     } else {
+      handle_pending_sends(peer);
       if (peer->current_piece != -1) {
         send_interested_to_peers(peer);
       } else {
@@ -171,7 +183,8 @@ int get_peers_data(peer_t peer)
     msg_error_t status = MSG_comm_wait(comm_received, GET_PEERS_TIMEOUT);
     if (status == MSG_OK) {
       tracker_task_data_t data = MSG_task_get_data(task_received);
-      int i, peer_id;
+      unsigned i;
+      int peer_id;
       //Add the peers the tracker gave us to our peer list.
       xbt_dynar_foreach(data->peers, i, peer_id) {
         if (peer_id != peer->id)
@@ -230,6 +243,8 @@ void peer_init(peer_t peer, int id, int seed)
   peer->comm_received = NULL;
 
   peer->round = 0;
+
+  peer->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
 }
 
 /**
@@ -246,6 +261,7 @@ void peer_free(peer_t peer)
   xbt_dict_free(&peer->peers);
   xbt_dict_free(&peer->active_peers);
   xbt_dynar_free(&peer->current_pieces);
+  xbt_dynar_free(&peer->pending_sends);
   xbt_free(peer->pieces_count);
   xbt_free(peer->bitfield);
   xbt_free(peer->bitfield_blocks);
@@ -262,6 +278,29 @@ int has_finished(char *bitfield)
   return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0);
 }
 
+/**
+ * Handle pending sends and remove those which are done
+ * @param peer Peer data
+ */
+void handle_pending_sends(peer_t peer)
+{
+  int index;
+
+  while ((index = MSG_comm_testany(peer->pending_sends)) != -1) {
+    msg_comm_t comm_send = xbt_dynar_get_as(peer->pending_sends, index, msg_comm_t);
+    int status = MSG_comm_get_status(comm_send);
+    xbt_dynar_remove_at(peer->pending_sends, index, &comm_send);
+    XBT_DEBUG("Communication %p is finished with status %d, dynar size is now %lu", comm_send, status, xbt_dynar_length(peer->pending_sends));
+
+    msg_task_t task = MSG_comm_get_task(comm_send);
+    MSG_comm_destroy(comm_send);
+
+    if (status != MSG_OK) {
+      task_message_free(task);
+    }
+  }
+}
+
 /**
  * Handle a received message sent by another peer
  * @param peer Peer data
@@ -392,7 +431,8 @@ void handle_message(peer_t peer, msg_task_t task)
         if (piece_complete(peer, message->index)) {
           peer->pieces_requested--;
           //Removing the piece from our piece list
-          int piece_index = -1, i, piece;
+          unsigned i;
+          int piece_index = -1, piece;
           xbt_dynar_foreach(peer->current_pieces, i, piece) {
             if (piece == message->index) {
               piece_index = i;
@@ -415,6 +455,8 @@ void handle_message(peer_t peer, msg_task_t task)
       }
     }
     break;
+  case MESSAGE_CANCEL:
+    break;
   }
   //Update the peer speed.
   if (remote_peer) {
@@ -446,7 +488,7 @@ void wait_for_pieces(peer_t peer, double deadline)
     MSG_comm_destroy(peer->comm_received);
     peer->comm_received = NULL;
     if (status == MSG_OK) {
-      message_t message = MSG_task_get_data(peer->task_received);
+      MSG_task_get_data(peer->task_received);
       handle_message(peer, peer->task_received);
       if (peer->current_piece != -1) {
         finished = 1;
@@ -513,9 +555,8 @@ void update_current_piece(peer_t peer)
   }
   xbt_dynar_push_as(peer->current_pieces, int, peer->current_piece);
   XBT_DEBUG("New interested piece: %d", peer->current_piece);
-  xbt_assert((peer->current_piece >= 0
-              && peer->current_piece < FILE_PIECES,
-              "Peer want to retrieve a piece that doesn't exist."));
+  xbt_assert((peer->current_piece >= 0 && peer->current_piece < FILE_PIECES),
+             "Peer want to retrieve a piece that doesn't exist.");
 }
 
 /**
@@ -527,7 +568,6 @@ void update_choked_peers(peer_t peer)
 {
   //update the current round
   peer->round = (peer->round + 1) % 3;
-  int i;
   char *key;
   connection_t peer_choosed = NULL;
   //remove a peer from the list
@@ -616,7 +656,8 @@ void update_interested_after_receive(peer_t peer)
   char *key;
   xbt_dict_cursor_t cursor;
   connection_t connection;
-  int interested, cpt, piece;
+  unsigned cpt;
+  int interested, piece;
   xbt_dict_foreach(peer->peers, cursor, key, connection) {
     interested = 0;
     if (connection->am_interested) {
@@ -684,7 +725,8 @@ int get_first_block(peer_t peer, int piece)
  */
 void send_requests_to_peer(peer_t peer, connection_t remote_peer)
 {
-  int i, piece, block_index, block_length;
+  unsigned i;
+  int piece, block_index, block_length;
   xbt_dynar_foreach(peer->current_pieces, i, piece) {
     if (remote_peer->bitfield && remote_peer->bitfield[piece] == '1') {
       block_index = get_first_block(peer, piece);
@@ -716,7 +758,7 @@ void send_interested_to_peers(peer_t peer)
       connection->am_interested = 1;
       msg_task_t task =
           task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
-                           peer->id);
+                           peer->id, task_message_size(MESSAGE_INTERESTED));
       MSG_task_dsend(task, connection->mailbox, task_message_free);
       XBT_DEBUG("Send INTERESTED to %s", connection->mailbox);
     }
@@ -734,7 +776,7 @@ void send_interested(peer_t peer, const char *mailbox)
 {
   msg_task_t task =
       task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
-                       peer->id);
+                       peer->id, task_message_size(MESSAGE_INTERESTED));
   MSG_task_dsend(task, mailbox, task_message_free);
   XBT_DEBUG("Sending INTERESTED to %s", mailbox);
 
@@ -749,7 +791,7 @@ void send_notinterested(peer_t peer, const char *mailbox)
 {
   msg_task_t task =
       task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox,
-                       peer->id);
+                       peer->id, task_message_size(MESSAGE_NOTINTERESTED));
   MSG_task_dsend(task, mailbox, task_message_free);
   XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox);
 
@@ -767,7 +809,7 @@ void send_handshake_all(peer_t peer)
   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
     msg_task_t task =
         task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
-                         peer->id);
+                         peer->id, task_message_size(MESSAGE_HANDSHAKE));
     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
     XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox);
   }
@@ -782,7 +824,7 @@ void send_handshake(peer_t peer, const char *mailbox)
 {
   msg_task_t task =
       task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
-                       peer->id);
+                       peer->id, task_message_size(MESSAGE_HANDSHAKE));
   MSG_task_dsend(task, mailbox, task_message_free);
   XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox);
 }
@@ -794,7 +836,8 @@ void send_choked(peer_t peer, const char *mailbox)
 {
   XBT_DEBUG("Sending a CHOKE to %s", mailbox);
   msg_task_t task =
-      task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, peer->id);
+      task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, 
+                       peer->id, task_message_size(MESSAGE_CHOKE));
   MSG_task_dsend(task, mailbox, task_message_free);
 }
 
@@ -806,7 +849,7 @@ void send_unchoked(peer_t peer, const char *mailbox)
   XBT_DEBUG("Sending a UNCHOKE to %s", mailbox);
   msg_task_t task =
       task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox,
-                       peer->id);
+                       peer->id, task_message_size(MESSAGE_UNCHOKE));
   MSG_task_dsend(task, mailbox, task_message_free);
 }
 
@@ -822,7 +865,7 @@ void send_have(peer_t peer, int piece)
   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
     msg_task_t task =
         task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox,
-                               peer->id, piece);
+                               peer->id, piece, task_message_size(MESSAGE_HAVE));
     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
   }
 }
@@ -836,8 +879,10 @@ void send_bitfield(peer_t peer, const char *mailbox)
   XBT_DEBUG("Sending a BITFIELD to %s", mailbox);
   msg_task_t task =
       task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id,
-                                peer->bitfield);
-  MSG_task_dsend(task, mailbox, task_message_free);
+                                peer->bitfield, FILE_PIECES);
+  //Async send and append to pending sends
+  msg_comm_t comm = MSG_task_isend(task, mailbox);
+  xbt_dynar_push(peer->pending_sends, &comm);
 }
 
 /**
@@ -867,13 +912,14 @@ void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
              "Tried to send a piece that we doesn't have.");
   msg_task_t task =
       task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece,
-                             stalled, block_index, block_length);
+                             stalled, block_index, block_length, BLOCK_SIZE);
   MSG_task_dsend(task, mailbox, task_message_free);
 }
 
 int in_current_pieces(peer_t peer, int piece)
 {
-  int is_in = 0, i, peer_piece;
+  unsigned i;
+  int is_in = 0, peer_piece;
   xbt_dynar_foreach(peer->current_pieces, i, peer_piece) {
     if (peer_piece == piece) {
       is_in = 1;