From 7cfc39a067963b183621e9e9d08d604de258de34 Mon Sep 17 00:00:00 2001 From: Samuel Lepetit Date: Mon, 4 Jun 2012 10:36:54 +0200 Subject: [PATCH] Add bittorrent example --- CMakeLists.txt | 15 +- examples/bittorrent/Bittorrent.java | 24 ++ examples/bittorrent/Common.java | 53 +++ examples/bittorrent/Connection.java | 54 +++ examples/bittorrent/MessageTask.java | 86 ++++ examples/bittorrent/Peer.java | 598 +++++++++++++++++++++++++++ examples/bittorrent/Tracker.java | 90 ++++ examples/bittorrent/TrackerTask.java | 43 ++ examples/bittorrent/bittorrent.xml | 39 ++ examples/bittorrent/generate.py | 41 ++ 10 files changed, 1041 insertions(+), 2 deletions(-) create mode 100644 examples/bittorrent/Bittorrent.java create mode 100644 examples/bittorrent/Common.java create mode 100644 examples/bittorrent/Connection.java create mode 100644 examples/bittorrent/MessageTask.java create mode 100644 examples/bittorrent/Peer.java create mode 100644 examples/bittorrent/Tracker.java create mode 100644 examples/bittorrent/TrackerTask.java create mode 100644 examples/bittorrent/bittorrent.xml create mode 100755 examples/bittorrent/generate.py diff --git a/CMakeLists.txt b/CMakeLists.txt index 6e9c6db089..f2d0163c55 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -113,6 +113,13 @@ set(JMSG_JAVA_SRC ) set(JAVA_EXAMPLES + examples/bittorrent/Bittorrent.java + examples/bittorrent/Common.java + examples/bittorrent/Connection.java + examples/bittorrent/MessageTask.java + examples/bittorrent/Peer.java + examples/bittorrent/Tracker.java + examples/bittorrent/TrackerTask.java examples/chord/Chord.java examples/chord/Common.java examples/chord/Node.java @@ -176,6 +183,7 @@ set(XML_FILES examples/master_slave_bypass/platform.xml examples/master_slave_kill/platform.xml examples/async/asyncDeployment.xml + examples/bittorrent/bittorrent.xml ) set(source_to_pack @@ -256,6 +264,8 @@ add_custom_command( COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/mutualExclusion/centralized/*.java COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/pingPong/*.java COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/startKillTime/*.java + COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/bittorrent/*.java + ) add_custom_target(simgrid_java_examples ALL @@ -278,8 +288,9 @@ ${CMAKE_HOME_DIRECTORY}/simgrid.jar INCLUDE(CTest) ENABLE_TESTING() -ADD_TEST(basic ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/basic/basic.tesh) ADD_TEST(async ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/async/async.tesh) +ADD_TEST(basic ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/basic/basic.tesh) +ADD_TEST(bittorrent ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/bittorrent/bittorrent.tesh) ADD_TEST(chord ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/chord/chord.tesh) ADD_TEST(pingPong ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/pingPong/pingpong.tesh) ADD_TEST(CommTime ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/commTime/commtime.tesh) @@ -288,7 +299,7 @@ ADD_TEST(bypass ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE ADD_TEST(kill ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/master_slave_kill/kill.tesh) ADD_TEST(startKillTime ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/startKillTime/startKillTime.tesh) #Don't forget to put new test in this list!!! -set(test_list basic chord async pingPong CommTime mutualExclusion bypass kill startKillTime) +set(test_list basic bittorrent chord async pingPong CommTime mutualExclusion bypass kill startKillTime) ########################################## # Set the DYLD_LIBRARY_PATH for mac # diff --git a/examples/bittorrent/Bittorrent.java b/examples/bittorrent/Bittorrent.java new file mode 100644 index 0000000000..5288ca9f31 --- /dev/null +++ b/examples/bittorrent/Bittorrent.java @@ -0,0 +1,24 @@ +package bittorrent; + +import org.simgrid.msg.Msg; +import org.simgrid.msg.MsgException; + +public class Bittorrent { + public static void main(String[] args) throws MsgException { + /* initialize the MSG simulation. Must be done before anything else (even logging). */ + Msg.init(args); + if(args.length < 2) { + Msg.info("Usage : Bittorrent platform_file deployment_file"); + Msg.info("example : Bittorrent platform.xml deployment.xml"); + System.exit(1); + } + + /* construct the platform and deploy the application */ + Msg.createEnvironment(args[0]); + Msg.deployApplication(args[1]); + + /* execute the simulation. */ + Msg.run(); + } + +} diff --git a/examples/bittorrent/Common.java b/examples/bittorrent/Common.java new file mode 100644 index 0000000000..9f155399dd --- /dev/null +++ b/examples/bittorrent/Common.java @@ -0,0 +1,53 @@ +package bittorrent; +/** + * Common constants for use in the simulation + */ +public class Common { + public static String TRACKER_MAILBOX = "tracker_mailbox"; + + public static int FILE_SIZE = 5120; + public static int FILE_PIECE_SIZE = 512; + public static int FILE_PIECES = 10; + + public static int PIECE_COMM_SIZE = 1; + /** + * Information message size + */ + public static int MESSAGE_SIZE = 1; + /** + * Max number of pairs sent by the tracker to clients + */ + public static int MAXIMUM_PAIRS = 50; + /** + * Interval of time where the peer should send a request to the tracker + */ + public static int TRACKER_QUERY_INTERVAL = 1000; + /** + * Communication size for a task to the tracker + */ + public static double TRACKER_COMM_SIZE = 0.01; + /** + * Timeout for the get peers data + */ + public static int GET_PEERS_TIMEOUT = 10000; + /** + * Timeout for "standard" messages. + */ + public static int TIMEOUT_MESSAGE = 10; + /** + * Timeout for tracker receive. + */ + public static int TRACKER_RECEIVE_TIMEOUT = 10; + /** + * Number of peers that can be unchocked at a given time + */ + public static int MAX_UNCHOKED_PEERS = 4; + /** + * Interval between each update of the choked peers + */ + public static int UPDATE_CHOKED_INTERVAL = 50; + /** + * Number of pieces the peer asks for simultaneously + */ + public static int MAX_PIECES = 1; +} diff --git a/examples/bittorrent/Connection.java b/examples/bittorrent/Connection.java new file mode 100644 index 0000000000..5cb1b62bba --- /dev/null +++ b/examples/bittorrent/Connection.java @@ -0,0 +1,54 @@ +package bittorrent; + +import java.util.Arrays; + +public class Connection { + /** + * Remote peer id + */ + public int id; + /** + * Remote peer bitfield. + */ + public char bitfield[]; + /** + * Remote peer mailbox + */ + public String mailbox; + /** + * Indicates if we are interested in something this peer has + */ + public boolean amInterested = false; + /** + * Indicates if the peer is interested in one of our pieces + */ + public boolean interested = false; + /** + * Indicates if the peer is choked for the current peer + */ + public boolean chokedUpload = true; + /** + * Indicates if the peer has choked the current peer + */ + public boolean chokedDownload = true; + + /** + * Constructor + */ + public Connection(int id) { + this.id = id; + this.mailbox = Integer.toString(id); + } + + @Override + public String toString() { + return "Connection [id=" + id + ", bitfield=" + + Arrays.toString(bitfield) + ", mailbox=" + mailbox + + ", amInterested=" + amInterested + ", interested=" + + interested + ", chokedUpload=" + chokedUpload + + ", chokedDownload=" + chokedDownload + "]"; + } + + +} + \ No newline at end of file diff --git a/examples/bittorrent/MessageTask.java b/examples/bittorrent/MessageTask.java new file mode 100644 index 0000000000..558515ffdb --- /dev/null +++ b/examples/bittorrent/MessageTask.java @@ -0,0 +1,86 @@ +package bittorrent; + +import org.simgrid.msg.Task; +/** + * Tasks sent between peers + */ +public class MessageTask extends Task { + public enum Type { + HANDSHAKE, + CHOKE, + UNCHOKE, + INTERESTED, + NOTINTERESTED, + HAVE, + BITFIELD, + REQUEST, + PIECE + }; + public Type type; + public String issuerHostname; + public String mailbox; + public int peerId; + public char bitfield[]; + public int index; + public boolean stalled; + /** + * Constructor, builds a value-less message + * @param type + * @param issuerHostname + * @param mailbox + * @param peerId + */ + public MessageTask(Type type, String issuerHostname, String mailbox, int peerId) { + this.type = type; + this.issuerHostname = issuerHostname; + this.mailbox = mailbox; + this.peerId = peerId; + } + /** + * Constructor, builds a new "have/request/piece" message + * @param type + * @param issuerHostname + * @param mailbox + * @param peerId + * @param index + */ + public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index) { + this.type = type; + this.issuerHostname = issuerHostname; + this.mailbox = mailbox; + this.peerId = peerId; + this.index = index; + } + /** + * Constructor, builds a new bitfield message + * @param type + * @param issuerHostname + * @param mailbox + * @param peerId + * @param bitfield + */ + public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, char bitfield[]) { + this.type = type; + this.issuerHostname = issuerHostname; + this.mailbox = mailbox; + this.peerId = peerId; + this.bitfield = bitfield; + } + /** + * Constructor, build a new "piece" message + * @param type + * @param issuerHostname + * @param mailbox + * @param peerId + * @param index + * @param stalled + */ + public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index, boolean stalled) { + this.type = type; + this.issuerHostname = issuerHostname; + this.mailbox = mailbox; + this.peerId = peerId; + this.index = index; + this.stalled = stalled; + } +} diff --git a/examples/bittorrent/Peer.java b/examples/bittorrent/Peer.java new file mode 100644 index 0000000000..7e3fb8ce3e --- /dev/null +++ b/examples/bittorrent/Peer.java @@ -0,0 +1,598 @@ +package bittorrent; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map.Entry; + +import org.simgrid.msg.Comm; +import org.simgrid.msg.Host; +import org.simgrid.msg.Msg; +import org.simgrid.msg.MsgException; +import org.simgrid.msg.Process; +import org.simgrid.msg.Task; + +/** + * Main class for peers execution + */ +public class Peer extends Process { + protected int round = 0; + + protected double deadline; + + protected int id; + protected String mailbox; + protected String mailboxTracker; + protected String hostname; + protected int pieces = 0; + protected char[] bitfield = new char[Common.FILE_PIECES]; + protected short[] piecesCount = new short[Common.FILE_PIECES]; + + protected int piecesRequested = 0; + + protected ArrayList currentPieces = new ArrayList(); + protected int currentPiece = -1; + + protected HashMap activePeers = new HashMap(); + protected HashMap peers = new HashMap(); + + protected Comm commReceived = null; + + public Peer(Host host, String name, String[]args) { + super(host,name,args); + } + + @Override + public void main(String[] args) throws MsgException { + //Check arguments + if (args.length != 3 && args.length != 2) { + Msg.info("Wrong number of arguments"); + } + if (args.length == 3) { + init(Integer.valueOf(args[0]),true); + } + else { + init(Integer.valueOf(args[0]),false); + } + //Retrieve the deadline + deadline = Double.valueOf(args[1]); + if (deadline < 0) { + Msg.info("Wrong deadline supplied"); + return; + } + Msg.info("Hi, I'm joining the network with id " + id); + //Getting peer data from the tracker + if (getPeersData()) { + Msg.debug("Got " + peers.size() + " peers from the tracker"); + Msg.debug("Here is my current status: " + getStatus()); + if (hasFinished()) { + pieces = Common.FILE_PIECES; + sendHandshakeAll(); + seedLoop(); + } + else { + leechLoop(); + seedLoop(); + } + } + else { + Msg.info("Couldn't contact the tracker."); + } + Msg.info("Here is my current status: " + getStatus()); + } + /** + * Peer main loop when it is leeching. + */ + private void leechLoop() { + double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL; + Msg.debug("Start downloading."); + /** + * Send a "handshake" message to all the peers it got + * (it couldn't have gotten more than 50 peers anyway) + */ + sendHandshakeAll(); + //Wait for at least one "bitfield" message. + waitForPieces(); + Msg.debug("Starting main leech loop"); + while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) { + if (commReceived == null) { + commReceived = Task.irecv(mailbox); + } + try { + if (commReceived.test()) { + handleMessage(commReceived.getTask()); + commReceived = null; + } + else { + //If the user has a pending interesting + if (currentPiece != -1) { + sendInterestedToPeers(); + } + else { + if (currentPieces.size() < Common.MAX_PIECES) { + updateCurrentPiece(); + } + } + //We don't execute the choke algorithm if we don't already have a piece + if (Msg.getClock() >= nextChokedUpdate && pieces > 0) { + updateChokedPeers(); + nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL; + } + else { + waitFor(1); + } + } + } + catch (MsgException e) { + commReceived = null; + } + } + } + + /** + * Peer main loop when it is seeding + */ + private void seedLoop() { + double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL; + Msg.debug("Start seeding."); + //start the main seed loop + while (Msg.getClock() < deadline) { + if (commReceived == null) { + commReceived = Task.irecv(mailbox); + } + try { + if (commReceived.test()) { + handleMessage(commReceived.getTask()); + commReceived = null; + } + else { + if (Msg.getClock() >= nextChokedUpdate) { + updateChokedPeers(); + //TODO: Change the choked peer algorithm when seeding + nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL; + } + else { + waitFor(1); + } + } + } + catch (MsgException e) { + commReceived = null; + } + + } + } + + /** + * Initialize the various peer data + * @param id id of the peer to take in the network + * @param seed indicates if the peer is a seed + */ + private void init(int id, boolean seed) { + this.id = id; + this.mailbox = Integer.toString(id); + this.mailboxTracker = "tracker_" + Integer.toString(id); + if (seed) { + for (int i = 0; i < bitfield.length; i++) { + bitfield[i] = '1'; + } + } + else { + for (int i = 0; i < bitfield.length; i++) { + bitfield[i] = '0'; + } + } + this.hostname = host.getName(); + } + /** + * Retrieves the peer list from the tracker + */ + private boolean getPeersData() { + + boolean success = false, sendSuccess = false; + double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT; + //Build the task to send to the tracker + TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id); + + while (!sendSuccess && Msg.getClock() < timeout) { + try { + Msg.debug("Sending a peer request to the tracker."); + taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT); + sendSuccess = true; + } + catch (MsgException e) { + + } + } + while (!success && Msg.getClock() < timeout) { + commReceived = Task.irecv(this.mailboxTracker); + try { + commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT); + if (commReceived.getTask() instanceof TrackerTask) { + TrackerTask task = (TrackerTask)commReceived.getTask(); + for (Integer peerId: task.peers) { + if (peerId != this.id) { + peers.put(peerId, new Connection(peerId)); + } + } + success = true; + } + } + catch (MsgException e) { + + } + commReceived = null; + } + commReceived = null; + return success; + } + /** + * Handle a received message sent by another peer + * @param task task received. + */ + void handleMessage(Task task) { + MessageTask message = (MessageTask)task; + Connection remotePeer = peers.get(message.peerId); + switch (message.type) { + case HANDSHAKE: + Msg.debug("Received a HANDSHAKE message from " + message.mailbox); + //Check if the peer is in our connection list + if (remotePeer == null) { + peers.put(message.peerId, new Connection(message.peerId)); + sendHandshake(message.mailbox); + } + //Send our bitfield to the pair + sendBitfield(message.mailbox); + break; + case BITFIELD: + Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")"); + //update the pieces list + updatePiecesCountFromBitfield(message.bitfield); + //Update the current piece + if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) { + updateCurrentPiece(); + } + remotePeer.bitfield = message.bitfield.clone(); + break; + case INTERESTED: + Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")"); + assert remotePeer != null; + remotePeer.interested = true; + break; + case NOTINTERESTED: + Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")"); + assert remotePeer != null; + remotePeer.interested = false; + break; + case UNCHOKE: + Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")"); + assert remotePeer != null; + remotePeer.chokedDownload = false; + activePeers.put(remotePeer.id,remotePeer); + sendRequestsToPeer(remotePeer); + break; + case CHOKE: + Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")"); + assert remotePeer != null; + remotePeer.chokedDownload = true; + activePeers.remove(remotePeer.id); + break; + case HAVE: + if (remotePeer.bitfield == null) { + return; + } + Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")"); + assert message.index >= 0 && message.index < Common.FILE_PIECES; + assert remotePeer.bitfield != null; + remotePeer.bitfield[message.index] = '1'; + piecesCount[message.index]++; + //Send interested message to the peer if he has what we want + if (!remotePeer.amInterested && currentPieces.contains(message.index) ) { + remotePeer.amInterested = true; + sendInterested(remotePeer.mailbox); + } + + if (currentPieces.contains(message.index)) { + sendRequest(message.mailbox,message.index); + } + break; + case REQUEST: + assert message.index >= 0 && message.index < Common.FILE_PIECES; + if (!remotePeer.chokedUpload) { + Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId); + if (bitfield[message.index] == '1') { + sendPiece(message.mailbox,message.index,false); + } + else { + Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" ); + } + } + break; + case PIECE: + if (message.stalled) { + Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")"); + } + else { + Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")"); + if (bitfield[message.index] == '0') { + piecesRequested--; + //Removing the piece from our piece list. + //TODO: It can not work, I should test it + if (!currentPieces.remove((Object)Integer.valueOf(message.index))) { + } + //Setting the fact that we have the piece + bitfield[message.index] = '1'; + pieces++; + Msg.debug("My status is now " + getStatus()); + //Sending the information to all the peers we are connected to + sendHave(message.index); + //sending UNINTERSTED to peers that doesn't have what we want. + updateInterestedAfterReceive(); + } + else { + Msg.debug("However, we already have it."); + } + } + break; + } + } + /** + * Wait for the node to receive interesting bitfield messages (ie: non empty) + * to be received + */ + void waitForPieces() { + boolean finished = false; + while (Msg.getClock() < deadline && !finished) { + if (commReceived == null) { + commReceived = Task.irecv(mailbox); + } + try { + commReceived.waitCompletion(Common.TIMEOUT_MESSAGE); + handleMessage(commReceived.getTask()); + if (currentPiece != -1) { + finished = true; + } + commReceived = null; + } + catch (MsgException e) { + commReceived = null; + } + } + } + + private boolean hasFinished() { + for (int i = 0; i < bitfield.length; i++) { + if (bitfield[i] == '1') { + return true; + } + } + return false; + } + /** + * Updates the list of who has a piece from a bitfield + * @param bitfield bitfield + */ + private void updatePiecesCountFromBitfield(char bitfield[]) { + for (int i = 0; i < Common.FILE_PIECES; i++) { + if (bitfield[i] == '1') { + piecesCount[i]++; + } + } + } + /** + * Update the piece the peer is currently interested in. + * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole : + * If the peer has less than 3 pieces, he chooses a piece at random. + * If the peer has more than pieces, he downloads the pieces that are the less + * replicated + */ + void updateCurrentPiece() { + if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) { + return; + } + if (true || pieces < 3) { + int i = 0, peerPiece; + do { + currentPiece = ((int)Msg.getClock() + id + i) % Common.FILE_PIECES; + i++; + } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece))); + } + else { + //trivial min algorithm. + //TODO + } + currentPieces.add(currentPiece); + Msg.debug("New interested piece: " + currentPiece); + assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES; + } + /** + * Update the list of current choked and unchoked peers, using the + * choke algorithm + */ + private void updateChokedPeers() { + round = (round + 1) % 3; + if (peers.size() == 0) { + return; + } + //remove a peer from the list + Iterator> it = activePeers.entrySet().iterator(); + if (it.hasNext()) { + Entry e = it.next(); + Connection peerChoked = e.getValue(); + sendChoked(peerChoked.mailbox); + peerChoked.chokedUpload = true; + activePeers.remove(e.getKey()); + } + //Random optimistic unchoking + if (round == 0 || true) { + int j = 0, i; + Connection peerChoosed = null; + do { + i = 0; + int idChosen = ((int)Msg.getClock() + j) % peers.size(); + for (Connection connection : peers.values()) { + if (i == idChosen) { + peerChoosed = connection; + break; + } + i++; + } //TODO: Not really the best way ever + if (!peerChoosed.interested) { + peerChoosed = null; + } + j++; + } while (peerChoosed == null && j < Common.MAXIMUM_PAIRS); + if (peerChoosed != null) { + activePeers.put(peerChoosed.id,peerChoosed); + peerChoosed.chokedUpload = false; + sendUnchoked(peerChoosed.mailbox); + } + } + } + /** + * Updates our "interested" state about peers: send "not interested" to peers + * that don't have any more pieces we want. + */ + private void updateInterestedAfterReceive() { + boolean interested; + for (Connection connection : peers.values()) { + interested = false; + if (connection.amInterested) { + for (Integer piece : currentPieces) { + if (connection.bitfield[piece] == '1') { + interested = true; + break; + } + } + if (!interested) { + connection.amInterested = false; + sendNotInterested(connection.mailbox); + } + } + } + } + /** + * Send request messages to a peer that have unchoked us + * @param remotePeer peer data to the peer we want to send the request + */ + private void sendRequestsToPeer(Connection remotePeer) { + for (Integer piece : currentPieces) { + if (remotePeer.bitfield != null && remotePeer.bitfield[piece] == '1') { + sendRequest(remotePeer.mailbox, piece); + } + } + } + /** + * Find the peers that have the current interested piece and send them + * the "interested" message + */ + private void sendInterestedToPeers() { + if (currentPiece == -1) { + return; + } + for (Connection connection : peers.values()) { + if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) { + connection.amInterested = true; + MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id); + task.dsend(connection.mailbox); + } + } + currentPiece = -1; + piecesRequested++; + } + /** + * Send a "interested" message to a peer. + */ + private void sendInterested(String mailbox) { + MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id); + task.dsend(mailbox); + } + /** + * Send a "not interested" message to a peer + * @param mailbox mailbox destination mailbox + */ + private void sendNotInterested(String mailbox) { + MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id); + task.dsend(mailbox); + } + /** + * Send a handshake message to all the peers the peer has. + * @param peer peer data + */ + private void sendHandshakeAll() { + for (Connection remotePeer : peers.values()) { + MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox, + id); + task.dsend(remotePeer.mailbox); + } + } + /** + * Send a "handshake" message to an user + * @param mailbox mailbox where to we send the message + */ + private void sendHandshake(String mailbox) { + Msg.debug("Sending a HANDSHAKE to " + mailbox); + MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id); + task.dsend(mailbox); + } + /** + * Send a "choked" message to a peer + */ + private void sendChoked(String mailbox) { + Msg.debug("Sending a CHOKE to " + mailbox); + MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id); + task.dsend(mailbox); + } + /** + * Send a "unchoked" message to a peer + */ + private void sendUnchoked(String mailbox) { + Msg.debug("Sending a UNCHOKE to " + mailbox); + MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id); + task.dsend(mailbox); + } + /** + * Send a "HAVE" message to all peers we are connected to + */ + private void sendHave(int piece) { + Msg.debug("Sending HAVE message to all my peers"); + for (Connection remotePeer : peers.values()) { + MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece); + task.dsend(remotePeer.mailbox); + } + } + /** + * Send a bitfield message to all the peers the peer has. + * @param peer peer data + */ + private void sendBitfield(String mailbox) { + Msg.debug("Sending a BITFIELD to " + mailbox); + MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield); + task.dsend(mailbox); + } + /** + * Send a "request" message to a pair, containing a request for a piece + */ + private void sendRequest(String mailbox, int piece) { + Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece); + MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece); + task.dsend(mailbox); + } + /** + * Send a "piece" message to a pair, containing a piece of the file + */ + private void sendPiece(String mailbox, int piece, boolean stalled) { + Msg.debug("Sending the PIECE " + piece + " to " + mailbox); + MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled); + task.dsend(mailbox); + } + + private String getStatus() { + String s = ""; + for (int i = 0; i < Common.FILE_PIECES; i++) { + s = s + bitfield[i]; + } + return s; + } +} + diff --git a/examples/bittorrent/Tracker.java b/examples/bittorrent/Tracker.java new file mode 100644 index 0000000000..537a3a5ed1 --- /dev/null +++ b/examples/bittorrent/Tracker.java @@ -0,0 +1,90 @@ +package bittorrent; +import java.util.ArrayList; +import java.util.Iterator; + +import org.simgrid.msg.Comm; +import org.simgrid.msg.Host; +import org.simgrid.msg.Process; +import org.simgrid.msg.Msg; +import org.simgrid.msg.MsgException; +import org.simgrid.msg.Task; + +import org.simgrid.msg.RngStream; +/** + * Tracker, handle requests from peers. + */ +public class Tracker extends Process { + protected RngStream stream; + /** + * Peers list + */ + protected ArrayList peersList; + /** + * End time for the simulation + */ + protected double deadline; + /** + * Current comm received + */ + protected Comm commReceived = null; + + public Tracker(Host host, String name, String[]args) { + super(host,name,args); + } + + @Override + public void main(String[] args) throws MsgException { + if (args.length != 1) { + Msg.info("Wrong number of arguments for the tracker."); + return; + } + //Build the RngStream object for randomness + stream = new RngStream("tracker"); + //Retrieve the end time + deadline = Double.valueOf(args[0]); + //Building peers array + peersList = new ArrayList(); + + Msg.info("Tracker launched."); + while (Msg.getClock() < deadline) { + if (commReceived == null) { + commReceived = Task.irecv(Common.TRACKER_MAILBOX); + } + try { + if (commReceived.test()) { + Task task = commReceived.getTask(); + if (task instanceof TrackerTask) { + TrackerTask tTask = (TrackerTask)task; + //Sending peers to the peer + //TODO: Send RANDOM pairs using RngStreams. + int nbPeers = 0; + while (nbPeers < Common.MAXIMUM_PAIRS && nbPeers < peersList.size()) { + int nextPeer; + do { + nextPeer = stream.randInt(0, peersList.size() - 1); + } while (tTask.peers.contains(nextPeer)); + tTask.peers.add(peersList.get(nextPeer)); + nbPeers++; + } + //Adding the peer to our list + peersList.add(tTask.peerId); + tTask.type = TrackerTask.Type.ANSWER; + //Setting the interval + tTask.interval = Common.TRACKER_QUERY_INTERVAL; + //Sending the task back to the peer + tTask.dsend(tTask.mailbox); + } + commReceived = null; + } + else { + waitFor(1); + } + } + catch (MsgException e) { + commReceived = null; + } + } + Msg.info("Tracker is leaving"); + } + +} diff --git a/examples/bittorrent/TrackerTask.java b/examples/bittorrent/TrackerTask.java new file mode 100644 index 0000000000..41cac26616 --- /dev/null +++ b/examples/bittorrent/TrackerTask.java @@ -0,0 +1,43 @@ +package bittorrent; +import java.util.ArrayList; + +import org.simgrid.msg.Task; + +/** + * Task exchanged between the tracker + * and the peers. + */ +public class TrackerTask extends Task { + /** + * Type of the tasks + */ + public enum Type { + REQUEST, + ANSWER + }; + public Type type; + public String hostname; + public String mailbox; + public int peerId; + public int uploaded; + public int downloaded; + public int left; + public double interval; + public ArrayList peers; + + public TrackerTask(String hostname, String mailbox, int peerId) { + this(hostname, mailbox, peerId, 0, 0, Common.FILE_SIZE); + } + public TrackerTask(String hostname, String mailbox, int peerId, int uploaded, int downloaded, int left) { + super("", 0, Common.TRACKER_COMM_SIZE); + this.type = Type.REQUEST; + this.hostname = hostname; + this.mailbox = mailbox; + this.peerId = peerId; + this.uploaded = uploaded; + this.downloaded = downloaded; + this.left = left; + this.peers = new ArrayList(); + } + +} diff --git a/examples/bittorrent/bittorrent.xml b/examples/bittorrent/bittorrent.xml new file mode 100644 index 0000000000..0699cac2b5 --- /dev/null +++ b/examples/bittorrent/bittorrent.xml @@ -0,0 +1,39 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/bittorrent/generate.py b/examples/bittorrent/generate.py new file mode 100755 index 0000000000..9ba8611257 --- /dev/null +++ b/examples/bittorrent/generate.py @@ -0,0 +1,41 @@ +#!/usr/bin/python + +# This script generates a specific deployment file for the Bittorrent example. +# It assumes that the platform will be a cluster. +# Usage: python generate.py nb_nodes nb_bits end_date percentage +# Example: python generate.py 10000 5000 + +import sys, random + +if len(sys.argv) != 4: + print("Usage: python generate.py nb_nodes end_date seed_percentage > deployment_file.xml") + sys.exit(1) + +nb_nodes = int(sys.argv[1]) +end_date = int(sys.argv[2]) +seed_percentage = int(sys.argv[3]); + +nb_bits = 24 +max_id = 2 ** nb_bits - 1 +all_ids = [42] + +sys.stdout.write("\n" +"\n" +"\n" +" \n" % end_date) + +for i in range(1, nb_nodes): + + ok = False + while not ok: + my_id = random.randint(0, max_id) + ok = not my_id in all_ids + start_date = i * 10 + line = " " % (i, my_id, end_date) + if random.randint(0,100) < seed_percentage: + line += "" + line += "\n"; + sys.stdout.write(line) + all_ids.append(my_id) +sys.stdout.write("") + -- 2.20.1