Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' into hypervisor
[simgrid.git] / examples / msg / bittorrent / peer.c
index acafbc7..632b19e 100644 (file)
@@ -15,11 +15,14 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
 //TODO: Let users change this
 /*
  * File transfered data
+ *
+ * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes
  */
-static int FILE_SIZE = 5120;
+static int FILE_SIZE = 10 * 5 * 16384;
 static int FILE_PIECES = 10;
 
 static int PIECES_BLOCKS = 5;
+static int BLOCK_SIZE = 16384;
 static int BLOCKS_REQUESTED = 2;
 
 /**
@@ -45,6 +48,7 @@ int peer(int argc, char *argv[])
     XBT_DEBUG("Got %d peers from the tracker", xbt_dict_length(peer.peers));
     XBT_DEBUG("Here is my current status: %s", peer.bitfield);
     peer.begin_receive_time = MSG_get_clock();
+    MSG_mailbox_set_async(peer.mailbox);
     if (has_finished(peer.bitfield)) {
       peer.pieces = FILE_PIECES;
       send_handshake_all(&peer);
@@ -96,6 +100,7 @@ void leech_loop(peer_t peer, double deadline)
         handle_message(peer, peer->task_received);
       }
     } else {
+      handle_pending_sends(peer);
       if (peer->current_piece != -1) {
         send_interested_to_peers(peer);
       } else {
@@ -235,10 +240,12 @@ void peer_init(peer_t peer, int id, int seed)
   peer->current_pieces = xbt_dynar_new(sizeof(int), NULL);
   peer->current_piece = -1;
 
-  peer->stream = RngStream_CreateStream("");
+  peer->stream = MSG_host_get_data(MSG_host_self());
   peer->comm_received = NULL;
 
   peer->round = 0;
+
+  peer->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
 }
 
 /**
@@ -255,11 +262,10 @@ void peer_free(peer_t peer)
   xbt_dict_free(&peer->peers);
   xbt_dict_free(&peer->active_peers);
   xbt_dynar_free(&peer->current_pieces);
+  xbt_dynar_free(&peer->pending_sends);
   xbt_free(peer->pieces_count);
   xbt_free(peer->bitfield);
   xbt_free(peer->bitfield_blocks);
-
-  RngStream_DeleteStream(&peer->stream);
 }
 
 /**
@@ -271,6 +277,29 @@ int has_finished(char *bitfield)
   return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0);
 }
 
+/**
+ * Handle pending sends and remove those which are done
+ * @param peer Peer data
+ */
+void handle_pending_sends(peer_t peer)
+{
+  int index;
+
+  while ((index = MSG_comm_testany(peer->pending_sends)) != -1) {
+    msg_comm_t comm_send = xbt_dynar_get_as(peer->pending_sends, index, msg_comm_t);
+    int status = MSG_comm_get_status(comm_send);
+    xbt_dynar_remove_at(peer->pending_sends, index, &comm_send);
+    XBT_DEBUG("Communication %p is finished with status %d, dynar size is now %lu", comm_send, status, xbt_dynar_length(peer->pending_sends));
+
+    msg_task_t task = MSG_comm_get_task(comm_send);
+    MSG_comm_destroy(comm_send);
+
+    if (status != MSG_OK) {
+      task_message_free(task);
+    }
+  }
+}
+
 /**
  * Handle a received message sent by another peer
  * @param peer Peer data
@@ -425,6 +454,8 @@ void handle_message(peer_t peer, msg_task_t task)
       }
     }
     break;
+  case MESSAGE_CANCEL:
+    break;
   }
   //Update the peer speed.
   if (remote_peer) {
@@ -541,7 +572,7 @@ void update_choked_peers(peer_t peer)
   //remove a peer from the list
   xbt_dict_cursor_t cursor = NULL;
   xbt_dict_cursor_first(peer->active_peers, &cursor);
-  if (xbt_dict_length(peer->active_peers) > 0) {
+  if (!xbt_dict_is_empty(peer->active_peers)) {
     key = xbt_dict_cursor_get_key(cursor);
     connection_t peer_choked = xbt_dict_cursor_get_data(cursor);
     if (peer_choked) {
@@ -726,7 +757,7 @@ void send_interested_to_peers(peer_t peer)
       connection->am_interested = 1;
       msg_task_t task =
           task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
-                           peer->id);
+                           peer->id, task_message_size(MESSAGE_INTERESTED));
       MSG_task_dsend(task, connection->mailbox, task_message_free);
       XBT_DEBUG("Send INTERESTED to %s", connection->mailbox);
     }
@@ -744,7 +775,7 @@ void send_interested(peer_t peer, const char *mailbox)
 {
   msg_task_t task =
       task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox,
-                       peer->id);
+                       peer->id, task_message_size(MESSAGE_INTERESTED));
   MSG_task_dsend(task, mailbox, task_message_free);
   XBT_DEBUG("Sending INTERESTED to %s", mailbox);
 
@@ -759,7 +790,7 @@ void send_notinterested(peer_t peer, const char *mailbox)
 {
   msg_task_t task =
       task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox,
-                       peer->id);
+                       peer->id, task_message_size(MESSAGE_NOTINTERESTED));
   MSG_task_dsend(task, mailbox, task_message_free);
   XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox);
 
@@ -777,7 +808,7 @@ void send_handshake_all(peer_t peer)
   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
     msg_task_t task =
         task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
-                         peer->id);
+                         peer->id, task_message_size(MESSAGE_HANDSHAKE));
     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
     XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox);
   }
@@ -792,7 +823,7 @@ void send_handshake(peer_t peer, const char *mailbox)
 {
   msg_task_t task =
       task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox,
-                       peer->id);
+                       peer->id, task_message_size(MESSAGE_HANDSHAKE));
   MSG_task_dsend(task, mailbox, task_message_free);
   XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox);
 }
@@ -804,7 +835,8 @@ void send_choked(peer_t peer, const char *mailbox)
 {
   XBT_DEBUG("Sending a CHOKE to %s", mailbox);
   msg_task_t task =
-      task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, peer->id);
+      task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, 
+                       peer->id, task_message_size(MESSAGE_CHOKE));
   MSG_task_dsend(task, mailbox, task_message_free);
 }
 
@@ -816,7 +848,7 @@ void send_unchoked(peer_t peer, const char *mailbox)
   XBT_DEBUG("Sending a UNCHOKE to %s", mailbox);
   msg_task_t task =
       task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox,
-                       peer->id);
+                       peer->id, task_message_size(MESSAGE_UNCHOKE));
   MSG_task_dsend(task, mailbox, task_message_free);
 }
 
@@ -832,7 +864,7 @@ void send_have(peer_t peer, int piece)
   xbt_dict_foreach(peer->peers, cursor, key, remote_peer) {
     msg_task_t task =
         task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox,
-                               peer->id, piece);
+                               peer->id, piece, task_message_size(MESSAGE_HAVE));
     MSG_task_dsend(task, remote_peer->mailbox, task_message_free);
   }
 }
@@ -846,8 +878,10 @@ void send_bitfield(peer_t peer, const char *mailbox)
   XBT_DEBUG("Sending a BITFIELD to %s", mailbox);
   msg_task_t task =
       task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id,
-                                peer->bitfield);
-  MSG_task_dsend(task, mailbox, task_message_free);
+                                peer->bitfield, FILE_PIECES);
+  //Async send and append to pending sends
+  msg_comm_t comm = MSG_task_isend(task, mailbox);
+  xbt_dynar_push(peer->pending_sends, &comm);
 }
 
 /**
@@ -877,7 +911,7 @@ void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
              "Tried to send a piece that we doesn't have.");
   msg_task_t task =
       task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece,
-                             stalled, block_index, block_length);
+                             stalled, block_index, block_length, BLOCK_SIZE);
   MSG_task_dsend(task, mailbox, task_message_free);
 }