Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright notices
[simgrid.git] / examples / msg / bittorrent / peer.c
index 88b287f..4382710 100644 (file)
@@ -1,13 +1,14 @@
-/* Copyright (c) 2012. The SimGrid Team.
+/* Copyright (c) 2012-2015. The SimGrid Team.
  * All rights reserved.                                                     */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
+
 #include "peer.h"
 #include "tracker.h"
 #include "connection.h"
 #include "messages.h"
-#include <msg/msg.h>
+#include <simgrid/msg.h>
 #include <xbt/RngStream.h>
 #include <limits.h>
 
@@ -23,6 +24,7 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
 #define FILE_PIECES  10
 #define PIECES_BLOCKS 5
 #define BLOCK_SIZE  16384
+#define ENABLE_END_GAME_MODE 1
 
 /**
  *  Number of blocks asked by each request
@@ -30,13 +32,10 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers");
 #define BLOCKS_REQUESTED 2
 
 
-static const int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
+static const unsigned long int FILE_SIZE = FILE_PIECES * PIECES_BLOCKS * BLOCK_SIZE;
 
-void request_new_piece_to_peer(peer_t peer, connection_t remote_peer);
-void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece);
-void remove_current_piece(peer_t peer, connection_t remote_peer,
-                          int current_piece);
 
+#define SLEEP_DURATION 1
 
   /**
  * Peer main function
@@ -68,7 +67,6 @@ int peer(int argc, char *argv[])
       seed_loop(&peer, deadline);
     } else {
       leech_loop(&peer, deadline);
-//      XBT_INFO("%d becomes a seeder", peer.id);
       seed_loop(&peer, deadline);
     }
   } else {
@@ -98,18 +96,14 @@ void leech_loop(peer_t peer, double deadline)
    * (since it couldn't have gotten more than 50 peers)
    */
   send_handshake_all(peer);
-  //Wait for at least one "bitfield" message.
-//  wait_for_pieces(peer, deadline);
   XBT_DEBUG("Starting main leech loop");
 
   while (MSG_get_clock() < deadline && peer->pieces < FILE_PIECES) {
     if (peer->comm_received == NULL) {
-//      XBT_INFO("irecv");
       peer->task_received = NULL;
       peer->comm_received = MSG_task_irecv(&peer->task_received, peer->mailbox);
     }
     if (MSG_comm_test(peer->comm_received)) {
-//      XBT_INFO("comm_test OK");
       msg_error_t status = MSG_comm_get_status(peer->comm_received);
       MSG_comm_destroy(peer->comm_received);
       peer->comm_received = NULL;
@@ -117,16 +111,18 @@ void leech_loop(peer_t peer, double deadline)
         handle_message(peer, peer->task_received);
       }
     } else {
-      handle_pending_sends(peer);
       //We don't execute the choke algorithm if we don't already have a piece
       if (MSG_get_clock() >= next_choked_update && peer->pieces > 0) {
         update_choked_peers(peer);
         next_choked_update += UPDATE_CHOKED_INTERVAL;
       } else {
-        MSG_process_sleep(1);
+        MSG_process_sleep(SLEEP_DURATION);
       }
     }
   }
+  if (peer->pieces == FILE_PIECES)
+    XBT_DEBUG("%d becomes a seeder", peer->id);
+
 }
 
 /**
@@ -157,7 +153,7 @@ void seed_loop(peer_t peer, double deadline)
         //TODO: Change the choked peer algorithm when seeding.
         next_choked_update += UPDATE_CHOKED_INTERVAL;
       } else {
-        MSG_process_sleep(1);
+        MSG_process_sleep(SLEEP_DURATION);
       }
     }
   }
@@ -246,12 +242,12 @@ void peer_init(peer_t peer, int id, int seed)
 
   peer->current_pieces = xbt_dynar_new(sizeof(int), NULL);
 
-  peer->stream = RngStream_CreateStream("");
+  peer->stream =
+    (RngStream)MSG_host_get_property_value(MSG_host_self(), "stream");
   peer->comm_received = NULL;
 
   peer->round = 0;
 
-  peer->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
 }
 
 /**
@@ -268,12 +264,9 @@ void peer_free(peer_t peer)
   xbt_dict_free(&peer->peers);
   xbt_dict_free(&peer->active_peers);
   xbt_dynar_free(&peer->current_pieces);
-  xbt_dynar_free(&peer->pending_sends);
   xbt_free(peer->pieces_count);
   xbt_free(peer->bitfield);
   xbt_free(peer->bitfield_blocks);
-
-  RngStream_DeleteStream(&peer->stream);
 }
 
 /**
@@ -298,31 +291,6 @@ int nb_interested_peers(peer_t peer)
   return nb;
 }
 
-/**
- * Handle pending sends and remove those which are done
- * @param peer Peer data
- */
-void handle_pending_sends(peer_t peer)
-{
-  int index;
-
-  while ((index = MSG_comm_testany(peer->pending_sends)) != -1) {
-    msg_comm_t comm_send =
-        xbt_dynar_get_as(peer->pending_sends, index, msg_comm_t);
-    int status = MSG_comm_get_status(comm_send);
-    xbt_dynar_remove_at(peer->pending_sends, index, &comm_send);
-    XBT_DEBUG
-        ("Communication %p is finished with status %d, dynar size is now %lu",
-         comm_send, status, xbt_dynar_length(peer->pending_sends));
-
-    msg_task_t task = MSG_comm_get_task(comm_send);
-    MSG_comm_destroy(comm_send);
-
-    if (status != MSG_OK) {
-      task_message_free(task);
-    }
-  }
-}
 
 void update_active_peers_set(peer_t peer, connection_t remote_peer)
 {
@@ -407,6 +375,7 @@ void handle_message(peer_t peer, msg_task_t task)
                "A non-in-our-list peer has sent us a message. WTH ?");
     XBT_DEBUG("Received a UNCHOKE message from %s (%s)", message->mailbox,
               message->issuer_host_name);
+    xbt_assert(remote_peer->choked_download, "WTF !!!");
     remote_peer->choked_download = 0;
     //Send requests to the peer, since it has unchoked us
     if (remote_peer->am_interested)
@@ -417,6 +386,7 @@ void handle_message(peer_t peer, msg_task_t task)
                "A non-in-our-list peer has sent us a message. WTH ?");
     XBT_DEBUG("Received a CHOKE message from %s (%s)", message->mailbox,
               message->issuer_host_name);
+    xbt_assert(!remote_peer->choked_download, "WTF !!!");
     remote_peer->choked_download = 1;
     remove_current_piece(peer, remote_peer, remote_peer->current_piece);
     break;
@@ -438,15 +408,17 @@ void handle_message(peer_t peer, msg_task_t task)
     }
     break;
   case MESSAGE_REQUEST:
+    xbt_assert(remote_peer->interested, "WTF !!!");
+
     xbt_assert((message->index >= 0
                 && message->index < FILE_PIECES), "Wrong request received");
     if (!remote_peer->choked_upload) {
       XBT_DEBUG("Received a REQUEST from %s (%s) for %d (%d,%d)",
-                message->mailbox, message->issuer_host_name, message->peer_id,
+                message->mailbox, message->issuer_host_name, message->index,
                 message->block_index,
                 message->block_index + message->block_length);
       if (peer->bitfield[message->index] == '1') {
-        send_piece(peer, message->mailbox, message->index, 0,
+        send_piece(peer, message->mailbox, message->index,
                    message->block_index, message->block_length);
       }
     } else {
@@ -455,17 +427,16 @@ void handle_message(peer_t peer, msg_task_t task)
     }
     break;
   case MESSAGE_PIECE:
+    XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
+              message->block_index,
+              message->block_index + message->block_length,
+              message->mailbox, message->issuer_host_name);
+    xbt_assert(!remote_peer->choked_download, "WTF !!!");
+    xbt_assert(remote_peer->am_interested || ENABLE_END_GAME_MODE, "Can't received a piece if I'm not interested wihtout end-game mode! piece (%d) bitfield(%s) remote bitfield(%s)", message->index, peer->bitfield, remote_peer->bitfield);
+    xbt_assert(remote_peer->choked_download != 1, "Can't received a piece if I'm choked !");
     xbt_assert((message->index >= 0
                 && message->index < FILE_PIECES), "Wrong piece received");
     //TODO: Execute à computation.
-    if (message->stalled) {
-      XBT_DEBUG("The received piece %d from %s (%s) is STALLED",
-                message->index, message->mailbox, message->issuer_host_name);
-    } else {
-      XBT_DEBUG("Received piece %d (%d,%d) from %s (%s)", message->index,
-                message->block_index,
-                message->block_index + message->block_length,
-                message->mailbox, message->issuer_host_name);
       if (peer->bitfield[message->index] == '0') {
         update_bitfield_blocks(peer, message->index, message->block_index,
                                message->block_length);
@@ -485,9 +456,9 @@ void handle_message(peer_t peer, msg_task_t task)
         }
       } else {
         XBT_DEBUG("However, we already have it");
+        xbt_assert(ENABLE_END_GAME_MODE, "Should not happen because we don't use end game mode !");
         request_new_piece_to_peer(peer, remote_peer);
       }
-    }
     break;
   case MESSAGE_CANCEL:
     XBT_DEBUG("The received CANCEL from %s (%s)",
@@ -505,6 +476,9 @@ void handle_message(peer_t peer, msg_task_t task)
   task_message_free(task);
 }
 
+/**
+ * Selects the appropriate piece to download and requests it to the remote_peer
+ */
 void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
 {
   int piece = select_piece_to_download(peer, remote_peer);
@@ -514,10 +488,14 @@ void request_new_piece_to_peer(peer_t peer, connection_t remote_peer)
   }
 }
 
+/**
+ * remove current_piece from the list of currently downloaded pieces.
+ */
 void remove_current_piece(peer_t peer, connection_t remote_peer,
                           int current_piece)
 {
-  int piece_index = -1, piece, i;
+  int piece_index = -1, piece;
+  unsigned int i;
   xbt_dynar_foreach(peer->current_pieces, i, piece) {
     if (piece == current_piece) {
       piece_index = i;
@@ -554,7 +532,6 @@ void update_pieces_count_from_bitfield(peer_t peer, char *bitfield)
  * If the peer has more than pieces, he downloads the pieces that are the less
  * replicated (rarest policy).
  * If all pieces have been downloaded or requested, we select a random requested piece (endgame mode).
-
  * @param peer: local peer
  * @param remote_peer: information about the connection
  * @return the piece to download if possible. -1 otherwise
@@ -571,6 +548,8 @@ int select_piece_to_download(peer_t peer, connection_t remote_peer)
   // end game mode
   if (xbt_dynar_length(peer->current_pieces) >= (FILE_PIECES - peer->pieces)
       && is_interested(peer, remote_peer)) {
+    if(!ENABLE_END_GAME_MODE)
+      return -1;
     int i;
     int nb_interesting_pieces = 0;
     int random_piece_index, current_index = 0;
@@ -679,7 +658,7 @@ void update_choked_peers(peer_t peer)
             xbt_dict_size(peer->active_peers));
   //update the current round
   peer->round = (peer->round + 1) % 3;
-  char *key, *key_choked;
+  char *key, *key_choked=NULL;
   connection_t peer_choosed = NULL;
   connection_t peer_choked = NULL;
   //remove a peer from the list
@@ -794,8 +773,7 @@ void update_interested_after_receive(peer_t peer)
   char *key;
   xbt_dict_cursor_t cursor;
   connection_t connection;
-  unsigned cpt;
-  int interested, piece;
+  int interested;
   xbt_dict_foreach(peer->peers, cursor, key, connection) {
     interested = 0;
     if (connection->am_interested) {
@@ -917,7 +895,6 @@ int partially_downloaded_piece(peer_t peer, connection_t remote_peer)
 void send_request_to_peer(peer_t peer, connection_t remote_peer, int piece)
 {
   remote_peer->current_piece = piece;
-  unsigned i;
   int block_index, block_length;
   xbt_assert(remote_peer->bitfield, "bitfield not received");
   xbt_assert(remote_peer->bitfield[piece] == '1', "WTF !!!");
@@ -1070,9 +1047,7 @@ void send_bitfield(peer_t peer, const char *mailbox)
   msg_task_t task =
       task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id,
                                 peer->bitfield, FILE_PIECES);
-  //Async send and append to pending sends
-  msg_comm_t comm = MSG_task_isend(task, mailbox);
-  xbt_dynar_push(peer->pending_sends, &comm);
+  MSG_task_dsend(task, mailbox, task_message_free);
 }
 
 /**
@@ -1092,7 +1067,7 @@ void send_request(peer_t peer, const char *mailbox, int piece,
 /**
  * Send a "piece" message to a pair, containing a piece of the file
  */
-void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
+void send_piece(peer_t peer, const char *mailbox, int piece,
                 int block_index, int block_length)
 {
   XBT_DEBUG("Sending the PIECE %d (%d,%d) to %s", piece, block_index,
@@ -1102,6 +1077,6 @@ void send_piece(peer_t peer, const char *mailbox, int piece, int stalled,
              "Tried to send a piece that we doesn't have.");
   msg_task_t task =
       task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece,
-                             stalled, block_index, block_length, BLOCK_SIZE);
+                             block_index, block_length, BLOCK_SIZE);
   MSG_task_dsend(task, mailbox, task_message_free);
 }