Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add bittorrent example
authorSamuel Lepetit <samuel.lepetit@inria.fr>
Mon, 4 Jun 2012 08:36:54 +0000 (10:36 +0200)
committerSamuel Lepetit <samuel.lepetit@inria.fr>
Mon, 4 Jun 2012 08:36:54 +0000 (10:36 +0200)
CMakeLists.txt
examples/bittorrent/Bittorrent.java [new file with mode: 0644]
examples/bittorrent/Common.java [new file with mode: 0644]
examples/bittorrent/Connection.java [new file with mode: 0644]
examples/bittorrent/MessageTask.java [new file with mode: 0644]
examples/bittorrent/Peer.java [new file with mode: 0644]
examples/bittorrent/Tracker.java [new file with mode: 0644]
examples/bittorrent/TrackerTask.java [new file with mode: 0644]
examples/bittorrent/bittorrent.xml [new file with mode: 0644]
examples/bittorrent/generate.py [new file with mode: 0755]

index 6e9c6db..f2d0163 100644 (file)
@@ -113,6 +113,13 @@ set(JMSG_JAVA_SRC
 )
 
 set(JAVA_EXAMPLES
 )
 
 set(JAVA_EXAMPLES
+  examples/bittorrent/Bittorrent.java
+  examples/bittorrent/Common.java
+  examples/bittorrent/Connection.java
+  examples/bittorrent/MessageTask.java
+  examples/bittorrent/Peer.java
+  examples/bittorrent/Tracker.java
+  examples/bittorrent/TrackerTask.java
   examples/chord/Chord.java
   examples/chord/Common.java
   examples/chord/Node.java
   examples/chord/Chord.java
   examples/chord/Common.java
   examples/chord/Node.java
@@ -176,6 +183,7 @@ set(XML_FILES
     examples/master_slave_bypass/platform.xml
     examples/master_slave_kill/platform.xml
     examples/async/asyncDeployment.xml
     examples/master_slave_bypass/platform.xml
     examples/master_slave_kill/platform.xml
     examples/async/asyncDeployment.xml
+    examples/bittorrent/bittorrent.xml
 )
 
 set(source_to_pack
 )
 
 set(source_to_pack
@@ -256,6 +264,8 @@ add_custom_command(
        COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/mutualExclusion/centralized/*.java                                 
        COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/pingPong/*.java
        COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/startKillTime/*.java
        COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/mutualExclusion/centralized/*.java                                 
        COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/pingPong/*.java
        COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/startKillTime/*.java
+       COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/bittorrent/*.java
+
 )
 
 add_custom_target(simgrid_java_examples ALL
 )
 
 add_custom_target(simgrid_java_examples ALL
@@ -278,8 +288,9 @@ ${CMAKE_HOME_DIRECTORY}/simgrid.jar
 INCLUDE(CTest)
 ENABLE_TESTING()
 
 INCLUDE(CTest)
 ENABLE_TESTING()
 
-ADD_TEST(basic           ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/basic/basic.tesh)
 ADD_TEST(async           ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/async/async.tesh)
 ADD_TEST(async           ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/async/async.tesh)
+ADD_TEST(basic           ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/basic/basic.tesh)
+ADD_TEST(bittorrent           ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/bittorrent/bittorrent.tesh)
 ADD_TEST(chord          ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/chord/chord.tesh)
 ADD_TEST(pingPong        ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/pingPong/pingpong.tesh)
 ADD_TEST(CommTime        ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/commTime/commtime.tesh)
 ADD_TEST(chord          ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/chord/chord.tesh)
 ADD_TEST(pingPong        ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/pingPong/pingpong.tesh)
 ADD_TEST(CommTime        ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/commTime/commtime.tesh)
@@ -288,7 +299,7 @@ ADD_TEST(bypass          ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE
 ADD_TEST(kill            ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/master_slave_kill/kill.tesh)
 ADD_TEST(startKillTime            ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/startKillTime/startKillTime.tesh)
 #Don't forget to put new test in this list!!!
 ADD_TEST(kill            ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/master_slave_kill/kill.tesh)
 ADD_TEST(startKillTime            ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/startKillTime/startKillTime.tesh)
 #Don't forget to put new test in this list!!!
-set(test_list basic chord async pingPong CommTime mutualExclusion bypass kill startKillTime)
+set(test_list basic bittorrent chord async pingPong CommTime mutualExclusion bypass kill startKillTime)
 
 ##########################################
 # Set the  DYLD_LIBRARY_PATH for mac     #
 
 ##########################################
 # Set the  DYLD_LIBRARY_PATH for mac     #
diff --git a/examples/bittorrent/Bittorrent.java b/examples/bittorrent/Bittorrent.java
new file mode 100644 (file)
index 0000000..5288ca9
--- /dev/null
@@ -0,0 +1,24 @@
+package bittorrent;
+
+import org.simgrid.msg.Msg;
+import org.simgrid.msg.MsgException;
+
+public class Bittorrent {
+       public static void main(String[] args) throws MsgException {
+               /* initialize the MSG simulation. Must be done before anything else (even logging). */
+               Msg.init(args);
+       if(args.length < 2) {
+               Msg.info("Usage   : Bittorrent platform_file deployment_file");
+               Msg.info("example : Bittorrent platform.xml deployment.xml");
+               System.exit(1);
+       }
+               
+               /* construct the platform and deploy the application */
+               Msg.createEnvironment(args[0]);
+               Msg.deployApplication(args[1]);
+                       
+               /*  execute the simulation. */
+        Msg.run();             
+       }
+
+}
diff --git a/examples/bittorrent/Common.java b/examples/bittorrent/Common.java
new file mode 100644 (file)
index 0000000..9f15539
--- /dev/null
@@ -0,0 +1,53 @@
+package bittorrent;
+/**
+ * Common constants for use in the simulation
+ */
+public class Common {  
+       public static String TRACKER_MAILBOX = "tracker_mailbox";
+       
+       public static int FILE_SIZE = 5120;
+       public static int FILE_PIECE_SIZE = 512;
+       public static int FILE_PIECES = 10;
+       
+       public static int PIECE_COMM_SIZE = 1;
+       /**
+        * Information message size
+        */
+       public static int MESSAGE_SIZE = 1;
+       /**
+        * Max number of pairs sent by the tracker to clients
+        */
+       public static int MAXIMUM_PAIRS = 50;
+       /**
+        * Interval of time where the peer should send a request to the tracker
+        */
+       public static int TRACKER_QUERY_INTERVAL = 1000;
+       /**
+        * Communication size for a task to the tracker
+        */
+       public static double TRACKER_COMM_SIZE = 0.01;
+       /**
+        * Timeout for the get peers data
+        */
+       public static int GET_PEERS_TIMEOUT = 10000;
+       /**
+        * Timeout for "standard" messages.
+        */
+       public static int TIMEOUT_MESSAGE = 10;
+       /**
+        * Timeout for tracker receive.
+        */
+       public static int TRACKER_RECEIVE_TIMEOUT = 10;
+       /**
+        * Number of peers that can be unchocked at a given time
+        */
+       public static int MAX_UNCHOKED_PEERS = 4;
+       /**
+        * Interval between each update of the choked peers
+        */
+       public static int UPDATE_CHOKED_INTERVAL = 50;
+       /**
+        * Number of pieces the peer asks for simultaneously
+        */
+       public static int MAX_PIECES = 1;
+}
diff --git a/examples/bittorrent/Connection.java b/examples/bittorrent/Connection.java
new file mode 100644 (file)
index 0000000..5cb1b62
--- /dev/null
@@ -0,0 +1,54 @@
+package bittorrent;
+
+import java.util.Arrays;
+
+public class Connection {
+       /**
+        * Remote peer id
+        */
+       public int id;
+       /**
+        * Remote peer bitfield.
+        */
+       public char bitfield[];
+       /**
+        * Remote peer mailbox
+        */
+       public String mailbox;
+       /**
+        * Indicates if we are interested in something this peer has
+        */
+       public boolean amInterested = false;
+       /**
+        * Indicates if the peer is interested in one of our pieces
+        */
+       public boolean interested = false;
+       /**
+        * Indicates if the peer is choked for the current peer
+        */
+       public boolean chokedUpload = true;
+       /**
+        * Indicates if the peer has choked the current peer
+        */
+       public boolean chokedDownload = true;
+       
+       /**
+        * Constructor
+        */
+       public Connection(int id) {
+               this.id = id;
+               this.mailbox = Integer.toString(id);
+       }
+
+       @Override
+       public String toString() {
+               return "Connection [id=" + id + ", bitfield="
+                               + Arrays.toString(bitfield) + ", mailbox=" + mailbox
+                               + ", amInterested=" + amInterested + ", interested="
+                               + interested + ", chokedUpload=" + chokedUpload
+                               + ", chokedDownload=" + chokedDownload + "]";
+       }
+       
+       
+}
\ No newline at end of file
diff --git a/examples/bittorrent/MessageTask.java b/examples/bittorrent/MessageTask.java
new file mode 100644 (file)
index 0000000..558515f
--- /dev/null
@@ -0,0 +1,86 @@
+package bittorrent;
+
+import org.simgrid.msg.Task;
+/**
+ * Tasks sent between peers
+ */
+public class MessageTask extends Task {
+       public enum Type {
+               HANDSHAKE,
+               CHOKE,
+               UNCHOKE,
+               INTERESTED,
+               NOTINTERESTED,
+               HAVE,
+               BITFIELD,
+               REQUEST,
+               PIECE
+       };
+       public Type type;
+       public String issuerHostname;
+       public String mailbox;
+       public int peerId;
+       public char bitfield[];
+       public int index;
+       public boolean stalled;
+       /**
+        * Constructor, builds a value-less message
+        * @param type
+        * @param issuerHostname
+        * @param mailbox
+        * @param peerId
+        */
+       public MessageTask(Type type, String issuerHostname, String mailbox, int peerId) {
+               this.type = type;
+               this.issuerHostname = issuerHostname;
+               this.mailbox = mailbox;
+               this.peerId = peerId;
+       }
+       /**
+        * Constructor, builds a new "have/request/piece" message
+        * @param type
+        * @param issuerHostname
+        * @param mailbox
+        * @param peerId
+        * @param index
+        */
+       public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index) {
+               this.type = type;
+               this.issuerHostname = issuerHostname;
+               this.mailbox = mailbox;
+               this.peerId = peerId;
+               this.index = index;
+       }
+       /**
+        * Constructor, builds a new bitfield message
+        * @param type
+        * @param issuerHostname
+        * @param mailbox
+        * @param peerId
+        * @param bitfield
+        */
+       public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, char bitfield[]) {
+               this.type = type;
+               this.issuerHostname = issuerHostname;
+               this.mailbox = mailbox;
+               this.peerId = peerId;
+               this.bitfield = bitfield;
+       }
+       /**
+        * Constructor, build a new "piece" message
+        * @param type
+        * @param issuerHostname
+        * @param mailbox
+        * @param peerId
+        * @param index
+        * @param stalled
+        */
+       public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index, boolean stalled) {
+               this.type = type;
+               this.issuerHostname = issuerHostname;
+               this.mailbox = mailbox;
+               this.peerId = peerId;
+               this.index = index;
+               this.stalled = stalled;
+       }       
+}
diff --git a/examples/bittorrent/Peer.java b/examples/bittorrent/Peer.java
new file mode 100644 (file)
index 0000000..7e3fb8c
--- /dev/null
@@ -0,0 +1,598 @@
+package bittorrent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.simgrid.msg.Comm;
+import org.simgrid.msg.Host;
+import org.simgrid.msg.Msg;
+import org.simgrid.msg.MsgException;
+import org.simgrid.msg.Process;
+import org.simgrid.msg.Task;
+
+/**
+ * Main class for peers execution
+ */
+public class Peer extends Process {
+       protected int round = 0;
+       
+       protected double deadline;
+       
+       protected int id;
+       protected String mailbox;
+       protected String mailboxTracker;
+       protected String hostname;
+       protected int pieces = 0;
+       protected char[] bitfield = new char[Common.FILE_PIECES];
+       protected short[] piecesCount = new short[Common.FILE_PIECES];
+       
+       protected int piecesRequested = 0;
+       
+       protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
+       protected int currentPiece = -1;
+
+       protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();        
+       protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
+       
+       protected Comm commReceived = null;
+
+       public Peer(Host host, String name, String[]args) {
+               super(host,name,args);
+       }       
+       
+       @Override
+       public void main(String[] args) throws MsgException {
+               //Check arguments
+               if (args.length != 3 && args.length != 2) {
+                       Msg.info("Wrong number of arguments");
+               }
+               if (args.length == 3) {
+                       init(Integer.valueOf(args[0]),true);
+               }
+               else {
+                       init(Integer.valueOf(args[0]),false);
+               }
+               //Retrieve the deadline
+               deadline = Double.valueOf(args[1]);
+               if (deadline < 0) {
+                       Msg.info("Wrong deadline supplied");
+                       return;
+               }
+               Msg.info("Hi, I'm joining the network with id " + id);
+               //Getting peer data from the tracker
+               if (getPeersData()) {
+                       Msg.debug("Got " + peers.size() + " peers from the tracker");
+                       Msg.debug("Here is my current status: " + getStatus());
+                       if (hasFinished()) {
+                               pieces = Common.FILE_PIECES;
+                               sendHandshakeAll();
+                               seedLoop();
+                       }
+                       else {
+                               leechLoop();
+                               seedLoop();
+                       }
+               }
+               else {
+                       Msg.info("Couldn't contact the tracker.");
+               }
+               Msg.info("Here is my current status: " + getStatus());
+       }
+       /**
+        * Peer main loop when it is leeching.
+        */
+       private void leechLoop() {
+               double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
+               Msg.debug("Start downloading.");
+               /**
+                * Send a "handshake" message to all the peers it got
+                * (it couldn't have gotten more than 50 peers anyway)
+                */
+               sendHandshakeAll();
+               //Wait for at least one "bitfield" message.
+               waitForPieces();
+               Msg.debug("Starting main leech loop");
+               while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
+                       if (commReceived == null) {
+                               commReceived = Task.irecv(mailbox);
+                       }
+                       try {
+                               if (commReceived.test()) {
+                                       handleMessage(commReceived.getTask());
+                                       commReceived = null;
+                               }
+                               else {
+                                       //If the user has a pending interesting
+                                       if (currentPiece != -1) {
+                                               sendInterestedToPeers();
+                                       }
+                                       else {
+                                               if (currentPieces.size() < Common.MAX_PIECES) {
+                                                       updateCurrentPiece();
+                                               }
+                                       }
+                                       //We don't execute the choke algorithm if we don't already have a piece
+                                       if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
+                                               updateChokedPeers();
+                                               nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
+                                       }
+                                       else {
+                                               waitFor(1);
+                                       }
+                               }
+                       }
+                       catch (MsgException e) {
+                               commReceived = null;                            
+                       }
+               }
+       }
+       
+       /**
+        * Peer main loop when it is seeding
+        */
+       private void seedLoop() {
+               double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
+               Msg.debug("Start seeding.");
+               //start the main seed loop
+               while (Msg.getClock() < deadline) {
+                       if (commReceived == null) {
+                               commReceived = Task.irecv(mailbox);
+                       }
+                       try {
+                               if (commReceived.test()) {
+                                       handleMessage(commReceived.getTask());
+                                       commReceived = null;
+                               }
+                               else {
+                                       if (Msg.getClock() >= nextChokedUpdate) {
+                                               updateChokedPeers();
+                                               //TODO: Change the choked peer algorithm when seeding
+                                               nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
+                                       }
+                                       else {
+                                               waitFor(1);
+                                       }
+                               }
+                       }
+                       catch (MsgException e) {
+                               commReceived = null;                            
+                       }
+
+               }
+       }
+       
+       /**
+        * Initialize the various peer data
+        * @param id id of the peer to take in the network
+        * @param seed indicates if the peer is a seed
+        */
+       private void init(int id, boolean seed) {
+               this.id = id;
+               this.mailbox = Integer.toString(id);
+               this.mailboxTracker = "tracker_" + Integer.toString(id);
+               if (seed) {
+                       for (int i = 0; i < bitfield.length; i++) {
+                               bitfield[i] = '1';
+                       }
+               }
+               else {
+                       for (int i = 0; i < bitfield.length; i++) {
+                               bitfield[i] = '0';
+                       }                       
+               }
+               this.hostname = host.getName();
+       }
+       /**
+        * Retrieves the peer list from the tracker
+        */
+       private boolean getPeersData() {
+               
+               boolean success = false, sendSuccess = false;
+               double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
+               //Build the task to send to the tracker
+               TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
+                       
+               while (!sendSuccess && Msg.getClock() < timeout) {
+                       try {
+                               Msg.debug("Sending a peer request to the tracker.");
+                               taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
+                               sendSuccess = true;
+                       }
+                       catch (MsgException e) {
+                               
+                       }
+               }
+               while (!success && Msg.getClock() < timeout) {
+                       commReceived = Task.irecv(this.mailboxTracker);
+                       try {
+                               commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
+                               if (commReceived.getTask() instanceof TrackerTask) {
+                                       TrackerTask task = (TrackerTask)commReceived.getTask();
+                                       for (Integer peerId: task.peers) {
+                                               if (peerId != this.id) {
+                                                       peers.put(peerId, new Connection(peerId));
+                                               }       
+                                       }
+                                       success = true;
+                               }
+                       }
+                       catch (MsgException e) {
+                               
+                       }
+                       commReceived = null;
+               }
+               commReceived = null;
+               return success;
+       }
+       /**
+        * Handle a received message sent by another peer
+        * @param task task received.
+        */
+       void handleMessage(Task task) {
+               MessageTask message = (MessageTask)task;
+               Connection remotePeer = peers.get(message.peerId);
+               switch (message.type) {
+                       case HANDSHAKE:
+                               Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
+                               //Check if the peer is in our connection list
+                               if (remotePeer == null) {
+                                       peers.put(message.peerId, new Connection(message.peerId));
+                                       sendHandshake(message.mailbox);
+                               }
+                               //Send our bitfield to the pair
+                               sendBitfield(message.mailbox);
+                       break;
+                       case BITFIELD:
+                               Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
+                               //update the pieces list
+                               updatePiecesCountFromBitfield(message.bitfield);
+                               //Update the current piece
+                               if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
+                                       updateCurrentPiece();
+                               }                               
+                               remotePeer.bitfield  = message.bitfield.clone();
+                       break;
+                       case INTERESTED:
+                               Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
+                               assert remotePeer != null;
+                               remotePeer.interested = true;
+                       break;
+                       case NOTINTERESTED:
+                               Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
+                               assert remotePeer != null;
+                               remotePeer.interested = false;
+                       break;
+                       case UNCHOKE:
+                               Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
+                               assert remotePeer != null;
+                               remotePeer.chokedDownload = false;
+                               activePeers.put(remotePeer.id,remotePeer);
+                               sendRequestsToPeer(remotePeer);
+                       break;
+                       case CHOKE:
+                               Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
+                               assert remotePeer != null;
+                               remotePeer.chokedDownload = true;
+                               activePeers.remove(remotePeer.id);
+                       break;
+                       case HAVE:
+                               if (remotePeer.bitfield == null) {
+                                       return;
+                               }
+                               Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
+                               assert message.index >= 0 && message.index < Common.FILE_PIECES;
+                               assert remotePeer.bitfield != null;
+                               remotePeer.bitfield[message.index] = '1';
+                               piecesCount[message.index]++; 
+                               //Send interested message to the peer if he has what we want
+                               if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
+                                       remotePeer.amInterested = true;
+                                       sendInterested(remotePeer.mailbox);
+                               }
+                               
+                               if (currentPieces.contains(message.index)) {
+                                       sendRequest(message.mailbox,message.index);
+                               }
+                       break;
+                       case REQUEST:
+                               assert message.index >= 0 && message.index < Common.FILE_PIECES;
+                               if (!remotePeer.chokedUpload) {
+                                       Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
+                                       if (bitfield[message.index] == '1') {
+                                               sendPiece(message.mailbox,message.index,false); 
+                                       }
+                                       else {
+                                               Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
+                                       }
+                               }
+                       break;
+                       case PIECE:
+                               if (message.stalled) {
+                                       Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
+                               }
+                               else {
+                                       Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
+                                       if (bitfield[message.index] == '0') {
+                                               piecesRequested--;
+                                               //Removing the piece from our piece list.
+                                               //TODO: It can not work, I should test it
+                                               if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
+                                               }
+                                               //Setting the fact that we have the piece
+                                               bitfield[message.index] = '1';
+                                               pieces++;
+                                               Msg.debug("My status is now " + getStatus());
+                                               //Sending the information to all the peers we are connected to
+                                               sendHave(message.index);
+                                               //sending UNINTERSTED to peers that doesn't have what we want.
+                                               updateInterestedAfterReceive();
+                                       }
+                                       else {
+                                               Msg.debug("However, we already have it.");
+                                       }
+                               }
+                       break;
+               }
+       }
+       /**
+        * Wait for the node to receive interesting bitfield messages (ie: non empty)
+        * to be received
+        */
+       void waitForPieces() {
+               boolean finished = false;
+               while (Msg.getClock() < deadline && !finished) {
+                       if (commReceived == null) {
+                               commReceived = Task.irecv(mailbox);
+                       }
+                       try {
+                               commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
+                               handleMessage(commReceived.getTask());
+                               if (currentPiece != -1) {
+                                       finished = true;
+                               }
+                               commReceived = null;
+                       }
+                       catch (MsgException e) {
+                               commReceived = null;
+                       }
+               }
+       }
+       
+       private boolean hasFinished() {
+               for (int i = 0; i < bitfield.length; i++) {
+                       if (bitfield[i] == '1') {
+                               return true;
+                       }
+               }
+               return false;
+       }
+       /**
+        * Updates the list of who has a piece from a bitfield
+        * @param bitfield bitfield
+        */
+       private void updatePiecesCountFromBitfield(char bitfield[]) {
+               for (int i = 0; i < Common.FILE_PIECES; i++) {
+                       if (bitfield[i] == '1') {
+                               piecesCount[i]++;
+                       }
+               }
+       }
+       /**
+        * Update the piece the peer is currently interested in.
+        * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
+        * If the peer has less than 3 pieces, he chooses a piece at random.
+        * If the peer has more than pieces, he downloads the pieces that are the less
+        * replicated
+        */
+       void updateCurrentPiece() {
+               if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
+                       return;
+               }
+               if (true || pieces < 3) {
+                       int i = 0, peerPiece;
+                       do {
+                               currentPiece = ((int)Msg.getClock() + id + i) % Common.FILE_PIECES;
+                               i++;
+                       } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
+               }
+               else {
+                       //trivial min algorithm.
+                       //TODO
+               }
+               currentPieces.add(currentPiece);
+               Msg.debug("New interested piece: " + currentPiece);
+               assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
+       }
+       /**
+        * Update the list of current choked and unchoked peers, using the
+        * choke algorithm
+        */
+       private void updateChokedPeers() {
+               round = (round + 1) % 3;
+               if (peers.size() == 0) {
+                       return;
+               }
+               //remove a peer from the list
+               Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
+               if (it.hasNext()) {
+                       Entry<Integer,Connection> e = it.next();
+                       Connection peerChoked = e.getValue();
+                       sendChoked(peerChoked.mailbox);
+                       peerChoked.chokedUpload = true;
+                       activePeers.remove(e.getKey());
+               }
+               //Random optimistic unchoking
+               if (round == 0 || true) {
+                       int j = 0, i;
+                       Connection peerChoosed = null;
+                       do {
+                               i = 0;
+                               int idChosen = ((int)Msg.getClock() + j) % peers.size();
+                               for (Connection connection : peers.values()) {
+                                       if (i == idChosen) {
+                                               peerChoosed = connection;
+                                               break;
+                                       }
+                                       i++;
+                               } //TODO: Not really the best way ever
+                               if (!peerChoosed.interested) {
+                                       peerChoosed = null;
+                               }
+                               j++;
+                       } while (peerChoosed == null && j < Common.MAXIMUM_PAIRS);
+                       if (peerChoosed != null) {
+                               activePeers.put(peerChoosed.id,peerChoosed);
+                               peerChoosed.chokedUpload = false;
+                               sendUnchoked(peerChoosed.mailbox);
+                       }
+               }
+       }
+       /**     
+        * Updates our "interested" state about peers: send "not interested" to peers
+        * that don't have any more pieces we want.
+        */
+       private void updateInterestedAfterReceive() {
+               boolean interested;
+               for (Connection connection : peers.values()) {
+                       interested = false;
+                       if (connection.amInterested) {
+                               for (Integer piece : currentPieces) {
+                                       if (connection.bitfield[piece] == '1') {
+                                               interested = true;
+                                               break;
+                                       }
+                               }       
+                               if (!interested) {
+                                       connection.amInterested = false;
+                                       sendNotInterested(connection.mailbox);
+                               }
+                       }
+               }
+       }
+       /**
+        * Send request messages to a peer that have unchoked us
+        * @param remotePeer peer data to the peer we want to send the request
+        */
+       private void sendRequestsToPeer(Connection remotePeer) {
+               for (Integer piece : currentPieces) {
+                       if (remotePeer.bitfield != null && remotePeer.bitfield[piece] == '1') {
+                               sendRequest(remotePeer.mailbox, piece);
+                       }                       
+               }
+       }       
+       /**
+        * Find the peers that have the current interested piece and send them
+        * the "interested" message
+        */
+       private void sendInterestedToPeers() {
+               if (currentPiece == -1) {
+                       return;
+               }
+               for (Connection connection : peers.values()) {
+                       if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
+                               connection.amInterested = true;                         
+                               MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
+                               task.dsend(connection.mailbox);                         
+                       }
+               }
+               currentPiece = -1;
+               piecesRequested++;
+       }
+       /**
+        * Send a "interested" message to a peer.
+        */
+       private void sendInterested(String mailbox) {
+               MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
+               task.dsend(mailbox);                                            
+       }
+       /**
+        * Send a "not interested" message to a peer
+        * @param mailbox mailbox destination mailbox
+        */
+       private void sendNotInterested(String mailbox) {
+               MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
+               task.dsend(mailbox);                            
+       }
+       /**
+        * Send a handshake message to all the peers the peer has.
+        * @param peer peer data
+        */
+       private void sendHandshakeAll() {
+               for (Connection remotePeer : peers.values()) {
+                       MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
+                       id);
+                       task.dsend(remotePeer.mailbox);
+               }
+       }
+       /**
+        * Send a "handshake" message to an user
+        * @param mailbox mailbox where to we send the message
+        */
+       private void sendHandshake(String mailbox) {
+               Msg.debug("Sending a HANDSHAKE to " + mailbox);
+               MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
+               task.dsend(mailbox);            
+       }
+       /**
+        * Send a "choked" message to a peer
+        */
+       private void sendChoked(String mailbox) {
+               Msg.debug("Sending a CHOKE to " + mailbox);
+               MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
+               task.dsend(mailbox);
+       }
+       /**
+        * Send a "unchoked" message to a peer
+        */
+       private void sendUnchoked(String mailbox) {
+               Msg.debug("Sending a UNCHOKE to " + mailbox);
+               MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
+               task.dsend(mailbox);
+       }
+       /**
+        * Send a "HAVE" message to all peers we are connected to
+        */
+       private void sendHave(int piece) {
+               Msg.debug("Sending HAVE message to all my peers");
+               for (Connection remotePeer : peers.values()) {
+                       MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
+                       task.dsend(remotePeer.mailbox);
+               }
+       }
+       /**
+        * Send a bitfield message to all the peers the peer has.
+        * @param peer peer data
+        */
+       private void sendBitfield(String mailbox) {
+               Msg.debug("Sending a BITFIELD to " + mailbox);
+               MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
+               task.dsend(mailbox);
+       }
+       /**
+        * Send a "request" message to a pair, containing a request for a piece
+        */
+       private void sendRequest(String mailbox, int piece) {
+               Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece);
+               MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece);
+               task.dsend(mailbox);
+       }
+       /**
+        * Send a "piece" message to a pair, containing a piece of the file
+        */
+       private void sendPiece(String mailbox, int piece, boolean stalled) {
+               Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
+               MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled);
+               task.dsend(mailbox);
+       }
+       
+       private String getStatus() {
+               String s = "";
+               for (int i = 0; i < Common.FILE_PIECES; i++) {
+                       s = s + bitfield[i];
+               }
+               return s;
+       }
+}
+       
diff --git a/examples/bittorrent/Tracker.java b/examples/bittorrent/Tracker.java
new file mode 100644 (file)
index 0000000..537a3a5
--- /dev/null
@@ -0,0 +1,90 @@
+package bittorrent;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.simgrid.msg.Comm;
+import org.simgrid.msg.Host;
+import org.simgrid.msg.Process;
+import org.simgrid.msg.Msg;
+import org.simgrid.msg.MsgException;
+import org.simgrid.msg.Task;
+
+import org.simgrid.msg.RngStream;
+/**
+ * Tracker, handle requests from peers.
+ */
+public class Tracker extends Process {
+       protected RngStream stream;
+       /**
+        * Peers list
+        */
+       protected ArrayList<Integer> peersList;
+       /**
+        * End time for the simulation
+        */
+       protected double deadline;
+       /**
+        * Current comm received
+        */
+       protected Comm commReceived = null;
+       
+       public Tracker(Host host, String name, String[]args) {
+               super(host,name,args);
+       }
+       
+       @Override
+       public void main(String[] args) throws MsgException {
+               if (args.length != 1) {
+                       Msg.info("Wrong number of arguments for the tracker.");
+                       return;
+               }
+               //Build the RngStream object for randomness
+               stream = new RngStream("tracker");
+               //Retrieve the end time
+               deadline = Double.valueOf(args[0]);
+               //Building peers array
+               peersList = new ArrayList<Integer>();
+               
+               Msg.info("Tracker launched.");          
+               while (Msg.getClock() < deadline) {
+                       if (commReceived == null) {
+                               commReceived = Task.irecv(Common.TRACKER_MAILBOX);
+                       }
+                       try {
+                               if (commReceived.test()) {
+                                       Task task = commReceived.getTask();
+                                       if (task instanceof TrackerTask) {
+                                               TrackerTask tTask = (TrackerTask)task;
+                                               //Sending peers to the peer
+                                               //TODO: Send RANDOM pairs using RngStreams.
+                                               int nbPeers = 0;
+                                               while (nbPeers < Common.MAXIMUM_PAIRS && nbPeers < peersList.size()) {
+                                                       int nextPeer;
+                                                       do {
+                                                               nextPeer = stream.randInt(0, peersList.size() - 1);
+                                                       } while (tTask.peers.contains(nextPeer));
+                                                       tTask.peers.add(peersList.get(nextPeer));
+                                                       nbPeers++;
+                                               }
+                                               //Adding the peer to our list
+                                               peersList.add(tTask.peerId);
+                                               tTask.type = TrackerTask.Type.ANSWER;
+                                               //Setting the interval
+                                               tTask.interval = Common.TRACKER_QUERY_INTERVAL;
+                                               //Sending the task back to the peer
+                                               tTask.dsend(tTask.mailbox);
+                                       }
+                                       commReceived = null;
+                               }
+                               else {
+                                       waitFor(1);
+                               }
+                       }
+                       catch (MsgException e) {
+                               commReceived = null;                            
+                       }
+               }
+               Msg.info("Tracker is leaving");
+       }
+
+}
diff --git a/examples/bittorrent/TrackerTask.java b/examples/bittorrent/TrackerTask.java
new file mode 100644 (file)
index 0000000..41cac26
--- /dev/null
@@ -0,0 +1,43 @@
+package bittorrent;
+import java.util.ArrayList;
+
+import org.simgrid.msg.Task;
+
+/**
+ * Task exchanged between the tracker
+ * and the peers. 
+ */
+public class TrackerTask extends Task {
+       /**
+        * Type of the tasks
+        */
+       public enum Type {
+               REQUEST,
+               ANSWER
+       };
+       public Type type;
+       public String hostname;
+       public String mailbox;
+       public int peerId;
+       public int uploaded;
+       public int downloaded;
+       public int left;
+       public double interval;
+       public ArrayList<Integer> peers;
+       
+       public TrackerTask(String hostname, String mailbox, int peerId) {
+               this(hostname, mailbox, peerId, 0, 0, Common.FILE_SIZE);
+       }       
+       public TrackerTask(String hostname, String mailbox, int peerId, int uploaded, int downloaded, int left) {
+               super("", 0, Common.TRACKER_COMM_SIZE);
+               this.type = Type.REQUEST;
+               this.hostname = hostname;
+               this.mailbox = mailbox;
+               this.peerId = peerId;
+               this.uploaded = uploaded;
+               this.downloaded = downloaded;
+               this.left = left;
+               this.peers = new ArrayList<Integer>();
+       }
+       
+}
diff --git a/examples/bittorrent/bittorrent.xml b/examples/bittorrent/bittorrent.xml
new file mode 100644 (file)
index 0000000..0699cac
--- /dev/null
@@ -0,0 +1,39 @@
+<?xml version='1.0'?>
+<!DOCTYPE platform SYSTEM "http://simgrid.gforge.inria.fr/simgrid.dtd">
+<platform version="3">
+
+  <process host="Jacquelin" function="bittorrent.Tracker">
+    <argument value="3000" />                  
+  </process>
+
+  <process host="Boivin" function="bittorrent.Peer">
+    <argument value="00000002"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+    <argument value="1" />                     <!-- indicates if the bittorrent.Peer is a seed at the begining of the simulation -->   
+  </process>
+  <process host="Jean_Yves" function="bittorrent.Peer">
+    <argument value="00000003"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+  </process>
+  <process host="TeX" function="bittorrent.Peer">
+    <argument value="00000004"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+  </process>
+  <process host="Geoff" function="bittorrent.Peer">
+    <argument value="00000005"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+  </process>
+  <process host="Disney" function="bittorrent.Peer">
+    <argument value="00000006"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+  </process>
+  <process host="iRMX" function="bittorrent.Peer">
+    <argument value="00000007"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+  </process>
+  <process host="McGee" function="bittorrent.Peer">
+    <argument value="00000008"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+  </process>
+
+</platform>
diff --git a/examples/bittorrent/generate.py b/examples/bittorrent/generate.py
new file mode 100755 (executable)
index 0000000..9ba8611
--- /dev/null
@@ -0,0 +1,41 @@
+#!/usr/bin/python
+
+# This script generates a specific deployment file for the Bittorrent example.
+# It assumes that the platform will be a cluster.
+# Usage: python generate.py nb_nodes nb_bits end_date percentage
+# Example: python generate.py 10000 5000
+
+import sys, random
+
+if len(sys.argv) != 4:
+       print("Usage: python generate.py nb_nodes end_date seed_percentage > deployment_file.xml")
+       sys.exit(1)
+
+nb_nodes = int(sys.argv[1])
+end_date = int(sys.argv[2])
+seed_percentage = int(sys.argv[3]);
+
+nb_bits = 24
+max_id = 2 ** nb_bits - 1
+all_ids = [42]
+
+sys.stdout.write("<?xml version='1.0'?>\n"
+"<!DOCTYPE platform SYSTEM \"http://simgrid.gforge.inria.fr/simgrid.dtd\">\n"
+"<platform version=\"3\">\n"
+"  <process host=\"c-0.me\" function=\"bittorrent.Tracker\"><argument value=\"%d\"/></process>\n" % end_date)
+
+for i in range(1, nb_nodes):
+
+  ok = False
+  while not ok:
+    my_id = random.randint(0, max_id)
+    ok = not my_id in all_ids
+  start_date = i * 10
+  line = "  <process host=\"c-%d.me\" function=\"bittorrent.Peer\"><argument value=\"%d\" /><argument value=\"%d\" />" % (i, my_id, end_date)
+  if random.randint(0,100) < seed_percentage:
+    line += "<argument value=\"1\" />"
+  line += "</process>\n";
+  sys.stdout.write(line)
+  all_ids.append(my_id)
+sys.stdout.write("</platform>")
+