Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
BitTorrent: changed the way the PIECE message is handled so that it uses MSG_task_ise...
authorMaximiliano Geier <maximiliano.geier@loria.fr>
Wed, 14 Nov 2012 10:35:40 +0000 (11:35 +0100)
committerMaximiliano Geier <maximiliano.geier@loria.fr>
Wed, 14 Nov 2012 10:35:40 +0000 (11:35 +0100)
examples/msg/bittorrent/peer.c
examples/msg/bittorrent/peer.h

index f770a85..026be29 100644 (file)
@@ -99,6 +99,7 @@ void leech_loop(peer_t peer, double deadline)
         handle_message(peer, peer->task_received);
       }
     } else {
+      handle_pending_sends(peer);
       if (peer->current_piece != -1) {
         send_interested_to_peers(peer);
       } else {
@@ -242,6 +243,8 @@ void peer_init(peer_t peer, int id, int seed)
   peer->comm_received = NULL;
 
   peer->round = 0;
+
+  peer->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
 }
 
 /**
@@ -258,6 +261,7 @@ void peer_free(peer_t peer)
   xbt_dict_free(&peer->peers);
   xbt_dict_free(&peer->active_peers);
   xbt_dynar_free(&peer->current_pieces);
+  xbt_dynar_free(&peer->pending_sends);
   xbt_free(peer->pieces_count);
   xbt_free(peer->bitfield);
   xbt_free(peer->bitfield_blocks);
@@ -274,6 +278,29 @@ int has_finished(char *bitfield)
   return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0);
 }
 
+/**
+ * Handle pending sends and remove those which are done
+ * @param peer Peer data
+ */
+void handle_pending_sends(peer_t peer)
+{
+  int index;
+
+  while ((index = MSG_comm_testany(peer->pending_sends)) != -1) {
+    msg_comm_t comm_send = xbt_dynar_get_as(peer->pending_sends, index, msg_comm_t);
+    int status = MSG_comm_get_status(comm_send);
+    xbt_dynar_remove_at(peer->pending_sends, index, &comm_send);
+    XBT_DEBUG("Communication %p is finished with status %d, dynar size is now %lu", comm_send, status, xbt_dynar_length(peer->pending_sends));
+
+    msg_task_t task = MSG_comm_get_task(comm_send);
+    MSG_comm_destroy(comm_send);
+
+    if (status != MSG_OK) {
+      task_message_free(task);
+    }
+  }
+}
+
 /**
  * Handle a received message sent by another peer
  * @param peer Peer data
@@ -853,7 +880,9 @@ void send_bitfield(peer_t peer, const char *mailbox)
   msg_task_t task =
       task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id,
                                 peer->bitfield, FILE_PIECES);
-  MSG_task_dsend(task, mailbox, task_message_free);
+  //Async send and append to pending sends
+  msg_comm_t comm = MSG_task_isend(task, mailbox);
+  xbt_dynar_push(peer->pending_sends, &comm);
 }
 
 /**
index 8fd1997..b468774 100644 (file)
@@ -42,6 +42,8 @@ typedef struct s_peer {
   RngStream stream;             //RngStream for
 
   double begin_receive_time;    //time when the receiving communication has begun, useful for calculating host speed.
+
+  xbt_dynar_t pending_sends;    // list of sends being delivered
 } s_peer_t, *peer_t;
 
 /**
@@ -58,6 +60,7 @@ void peer_free(peer_t peer);
 
 int has_finished(char *bitfield);
 
+void handle_pending_sends(peer_t peer);
 void handle_message(peer_t peer, msg_task_t task);
 
 void wait_for_pieces(peer_t peer, double deadline);