src/jmsg_synchro.h
src/jtrace.c
src/jtrace.h
+ src/jmsg_rngstream.c
+ src/jmsg_rngstream.h
)
set(JMSG_JAVA_SRC
org/simgrid/msg/Mutex.java
org/simgrid/msg/Comm.java
org/simgrid/trace/Trace.java
+ org/simgrid/msg/RngStream.java
)
set(JAVA_EXAMPLES
+ examples/bittorrent/Bittorrent.java
+ examples/bittorrent/Common.java
+ examples/bittorrent/Connection.java
+ examples/bittorrent/MessageTask.java
+ examples/bittorrent/Peer.java
+ examples/bittorrent/Tracker.java
+ examples/bittorrent/TrackerTask.java
examples/chord/Chord.java
examples/chord/Common.java
examples/chord/Node.java
examples/master_slave_kill/platform.xml
examples/async/asyncDeployment.xml
examples/tracing/tracingPingPongDeployment.xml
+ examples/bittorrent/bittorrent.xml
)
set(source_to_pack
set(INCLUDE_PATH "-I${CMAKE_HOME_DIRECTORY}/src -I${SIMGRID_INCLUDES} ")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${INCLUDE_PATH}")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-error=int-to-pointer-cast -Wno-error=pointer-to-int-cast")
if(COMPILER_C_VERSION_MAJOR_MINOR MATCHES "4.6")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-error=unused-but-set-variable")
endif(COMPILER_C_VERSION_MAJOR_MINOR MATCHES "4.6")
COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/pingPong/*.java
COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/startKillTime/*.java
COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/tracing/*.java
+ COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/bittorrent/*.java
)
add_custom_target(simgrid_java_examples ALL
INCLUDE(CTest)
ENABLE_TESTING()
-ADD_TEST(basic ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/basic/basic.tesh)
ADD_TEST(async ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/async/async.tesh)
+ADD_TEST(basic ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/basic/basic.tesh)
+ADD_TEST(bittorrent ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/bittorrent/bittorrent.tesh)
+ADD_TEST(chord ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/chord/chord.tesh)
ADD_TEST(pingPong ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/pingPong/pingpong.tesh)
ADD_TEST(CommTime ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/commTime/commtime.tesh)
ADD_TEST(mutualExclusion ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/mutualExclusion/mutualexclusion.tesh)
ADD_TEST(startKillTime ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/startKillTime/startKillTime.tesh)
ADD_TEST(tracing ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/tracing/tracingPingPong.tesh)
#Don't forget to put new test in this list!!!
-set(test_list basic async pingPong CommTime mutualExclusion bypass kill startKillTime tracing)
+set(test_list basic bittorrent chord async pingPong CommTime mutualExclusion bypass kill startKillTime tracing)
##########################################
# Set the DYLD_LIBRARY_PATH for mac #
--- /dev/null
+package bittorrent;
+
+import org.simgrid.msg.Msg;
+import org.simgrid.msg.MsgException;
+
+public class Bittorrent {
+ public static void main(String[] args) throws MsgException {
+ /* initialize the MSG simulation. Must be done before anything else (even logging). */
+ Msg.init(args);
+ if(args.length < 2) {
+ Msg.info("Usage : Bittorrent platform_file deployment_file");
+ Msg.info("example : Bittorrent platform.xml deployment.xml");
+ System.exit(1);
+ }
+
+ /* construct the platform and deploy the application */
+ Msg.createEnvironment(args[0]);
+ Msg.deployApplication(args[1]);
+
+ /* execute the simulation. */
+ Msg.run();
+ }
+
+}
--- /dev/null
+package bittorrent;
+/**
+ * Common constants for use in the simulation
+ */
+public class Common {
+ public static String TRACKER_MAILBOX = "tracker_mailbox";
+
+ public static int FILE_SIZE = 5120;
+ public static int FILE_PIECE_SIZE = 512;
+ public static int FILE_PIECES = 10;
+
+ public static int PIECE_COMM_SIZE = 1;
+ /**
+ * Information message size
+ */
+ public static int MESSAGE_SIZE = 1;
+ /**
+ * Max number of pairs sent by the tracker to clients
+ */
+ public static int MAXIMUM_PAIRS = 50;
+ /**
+ * Interval of time where the peer should send a request to the tracker
+ */
+ public static int TRACKER_QUERY_INTERVAL = 1000;
+ /**
+ * Communication size for a task to the tracker
+ */
+ public static double TRACKER_COMM_SIZE = 0.01;
+ /**
+ * Timeout for the get peers data
+ */
+ public static int GET_PEERS_TIMEOUT = 10000;
+ /**
+ * Timeout for "standard" messages.
+ */
+ public static int TIMEOUT_MESSAGE = 10;
+ /**
+ * Timeout for tracker receive.
+ */
+ public static int TRACKER_RECEIVE_TIMEOUT = 10;
+ /**
+ * Number of peers that can be unchocked at a given time
+ */
+ public static int MAX_UNCHOKED_PEERS = 4;
+ /**
+ * Interval between each update of the choked peers
+ */
+ public static int UPDATE_CHOKED_INTERVAL = 50;
+ /**
+ * Number of pieces the peer asks for simultaneously
+ */
+ public static int MAX_PIECES = 1;
+}
--- /dev/null
+package bittorrent;
+
+import java.util.Arrays;
+
+public class Connection {
+ /**
+ * Remote peer id
+ */
+ public int id;
+ /**
+ * Remote peer bitfield.
+ */
+ public char bitfield[];
+ /**
+ * Remote peer mailbox
+ */
+ public String mailbox;
+ /**
+ * Indicates if we are interested in something this peer has
+ */
+ public boolean amInterested = false;
+ /**
+ * Indicates if the peer is interested in one of our pieces
+ */
+ public boolean interested = false;
+ /**
+ * Indicates if the peer is choked for the current peer
+ */
+ public boolean chokedUpload = true;
+ /**
+ * Indicates if the peer has choked the current peer
+ */
+ public boolean chokedDownload = true;
+
+ /**
+ * Constructor
+ */
+ public Connection(int id) {
+ this.id = id;
+ this.mailbox = Integer.toString(id);
+ }
+
+ @Override
+ public String toString() {
+ return "Connection [id=" + id + ", bitfield="
+ + Arrays.toString(bitfield) + ", mailbox=" + mailbox
+ + ", amInterested=" + amInterested + ", interested="
+ + interested + ", chokedUpload=" + chokedUpload
+ + ", chokedDownload=" + chokedDownload + "]";
+ }
+
+
+}
+
\ No newline at end of file
--- /dev/null
+package bittorrent;
+
+import org.simgrid.msg.Task;
+/**
+ * Tasks sent between peers
+ */
+public class MessageTask extends Task {
+ public enum Type {
+ HANDSHAKE,
+ CHOKE,
+ UNCHOKE,
+ INTERESTED,
+ NOTINTERESTED,
+ HAVE,
+ BITFIELD,
+ REQUEST,
+ PIECE
+ };
+ public Type type;
+ public String issuerHostname;
+ public String mailbox;
+ public int peerId;
+ public char bitfield[];
+ public int index;
+ public boolean stalled;
+ /**
+ * Constructor, builds a value-less message
+ * @param type
+ * @param issuerHostname
+ * @param mailbox
+ * @param peerId
+ */
+ public MessageTask(Type type, String issuerHostname, String mailbox, int peerId) {
+ this.type = type;
+ this.issuerHostname = issuerHostname;
+ this.mailbox = mailbox;
+ this.peerId = peerId;
+ }
+ /**
+ * Constructor, builds a new "have/request/piece" message
+ * @param type
+ * @param issuerHostname
+ * @param mailbox
+ * @param peerId
+ * @param index
+ */
+ public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index) {
+ this.type = type;
+ this.issuerHostname = issuerHostname;
+ this.mailbox = mailbox;
+ this.peerId = peerId;
+ this.index = index;
+ }
+ /**
+ * Constructor, builds a new bitfield message
+ * @param type
+ * @param issuerHostname
+ * @param mailbox
+ * @param peerId
+ * @param bitfield
+ */
+ public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, char bitfield[]) {
+ this.type = type;
+ this.issuerHostname = issuerHostname;
+ this.mailbox = mailbox;
+ this.peerId = peerId;
+ this.bitfield = bitfield;
+ }
+ /**
+ * Constructor, build a new "piece" message
+ * @param type
+ * @param issuerHostname
+ * @param mailbox
+ * @param peerId
+ * @param index
+ * @param stalled
+ */
+ public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index, boolean stalled) {
+ this.type = type;
+ this.issuerHostname = issuerHostname;
+ this.mailbox = mailbox;
+ this.peerId = peerId;
+ this.index = index;
+ this.stalled = stalled;
+ }
+}
--- /dev/null
+package bittorrent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.simgrid.msg.Comm;
+import org.simgrid.msg.Host;
+import org.simgrid.msg.Msg;
+import org.simgrid.msg.MsgException;
+import org.simgrid.msg.RngStream;
+import org.simgrid.msg.Process;
+import org.simgrid.msg.Task;
+
+/**
+ * Main class for peers execution
+ */
+public class Peer extends Process {
+ protected int round = 0;
+
+ protected double deadline;
+
+ protected static RngStream stream = new RngStream();
+
+ protected int id;
+ protected String mailbox;
+ protected String mailboxTracker;
+ protected String hostname;
+ protected int pieces = 0;
+ protected char[] bitfield = new char[Common.FILE_PIECES];
+ protected short[] piecesCount = new short[Common.FILE_PIECES];
+
+ protected int piecesRequested = 0;
+
+ protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
+ protected int currentPiece = -1;
+
+ protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();
+ protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
+
+ protected Comm commReceived = null;
+
+ public Peer(Host host, String name, String[]args) {
+ super(host,name,args);
+ }
+
+ @Override
+ public void main(String[] args) throws MsgException {
+ //Check arguments
+ if (args.length != 3 && args.length != 2) {
+ Msg.info("Wrong number of arguments");
+ }
+ if (args.length == 3) {
+ init(Integer.valueOf(args[0]),true);
+ }
+ else {
+ init(Integer.valueOf(args[0]),false);
+ }
+ //Retrieve the deadline
+ deadline = Double.valueOf(args[1]);
+ if (deadline < 0) {
+ Msg.info("Wrong deadline supplied");
+ return;
+ }
+ Msg.info("Hi, I'm joining the network with id " + id);
+ //Getting peer data from the tracker
+ if (getPeersData()) {
+ Msg.debug("Got " + peers.size() + " peers from the tracker");
+ Msg.debug("Here is my current status: " + getStatus());
+ if (hasFinished()) {
+ pieces = Common.FILE_PIECES;
+ sendHandshakeAll();
+ seedLoop();
+ }
+ else {
+ leechLoop();
+ seedLoop();
+ }
+ }
+ else {
+ Msg.info("Couldn't contact the tracker.");
+ }
+ Msg.info("Here is my current status: " + getStatus());
+ }
+ /**
+ * Peer main loop when it is leeching.
+ */
+ private void leechLoop() {
+ double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
+ Msg.debug("Start downloading.");
+ /**
+ * Send a "handshake" message to all the peers it got
+ * (it couldn't have gotten more than 50 peers anyway)
+ */
+ sendHandshakeAll();
+ //Wait for at least one "bitfield" message.
+ waitForPieces();
+ Msg.debug("Starting main leech loop");
+ while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
+ if (commReceived == null) {
+ commReceived = Task.irecv(mailbox);
+ }
+ try {
+ if (commReceived.test()) {
+ handleMessage(commReceived.getTask());
+ commReceived = null;
+ }
+ else {
+ //If the user has a pending interesting
+ if (currentPiece != -1) {
+ sendInterestedToPeers();
+ }
+ else {
+ if (currentPieces.size() < Common.MAX_PIECES) {
+ updateCurrentPiece();
+ }
+ }
+ //We don't execute the choke algorithm if we don't already have a piece
+ if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
+ updateChokedPeers();
+ nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
+ }
+ else {
+ waitFor(1);
+ }
+ }
+ }
+ catch (MsgException e) {
+ commReceived = null;
+ }
+ }
+ }
+
+ /**
+ * Peer main loop when it is seeding
+ */
+ private void seedLoop() {
+ double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
+ Msg.debug("Start seeding.");
+ //start the main seed loop
+ while (Msg.getClock() < deadline) {
+ if (commReceived == null) {
+ commReceived = Task.irecv(mailbox);
+ }
+ try {
+ if (commReceived.test()) {
+ handleMessage(commReceived.getTask());
+ commReceived = null;
+ }
+ else {
+ if (Msg.getClock() >= nextChokedUpdate) {
+ updateChokedPeers();
+ //TODO: Change the choked peer algorithm when seeding
+ nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
+ }
+ else {
+ waitFor(1);
+ }
+ }
+ }
+ catch (MsgException e) {
+ commReceived = null;
+ }
+
+ }
+ }
+
+ /**
+ * Initialize the various peer data
+ * @param id id of the peer to take in the network
+ * @param seed indicates if the peer is a seed
+ */
+ private void init(int id, boolean seed) {
+ this.id = id;
+ this.mailbox = Integer.toString(id);
+ this.mailboxTracker = "tracker_" + Integer.toString(id);
+ if (seed) {
+ for (int i = 0; i < bitfield.length; i++) {
+ bitfield[i] = '1';
+ }
+ }
+ else {
+ for (int i = 0; i < bitfield.length; i++) {
+ bitfield[i] = '0';
+ }
+ }
+ this.hostname = host.getName();
+ }
+ /**
+ * Retrieves the peer list from the tracker
+ */
+ private boolean getPeersData() {
+
+ boolean success = false, sendSuccess = false;
+ double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
+ //Build the task to send to the tracker
+ TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
+
+ while (!sendSuccess && Msg.getClock() < timeout) {
+ try {
+ Msg.debug("Sending a peer request to the tracker.");
+ taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
+ sendSuccess = true;
+ }
+ catch (MsgException e) {
+
+ }
+ }
+ while (!success && Msg.getClock() < timeout) {
+ commReceived = Task.irecv(this.mailboxTracker);
+ try {
+ commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
+ if (commReceived.getTask() instanceof TrackerTask) {
+ TrackerTask task = (TrackerTask)commReceived.getTask();
+ for (Integer peerId: task.peers) {
+ if (peerId != this.id) {
+ peers.put(peerId, new Connection(peerId));
+ }
+ }
+ success = true;
+ }
+ }
+ catch (MsgException e) {
+
+ }
+ commReceived = null;
+ }
+ commReceived = null;
+ return success;
+ }
+ /**
+ * Handle a received message sent by another peer
+ * @param task task received.
+ */
+ void handleMessage(Task task) {
+ MessageTask message = (MessageTask)task;
+ Connection remotePeer = peers.get(message.peerId);
+ switch (message.type) {
+ case HANDSHAKE:
+ Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
+ //Check if the peer is in our connection list
+ if (remotePeer == null) {
+ peers.put(message.peerId, new Connection(message.peerId));
+ sendHandshake(message.mailbox);
+ }
+ //Send our bitfield to the pair
+ sendBitfield(message.mailbox);
+ break;
+ case BITFIELD:
+ Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
+ //update the pieces list
+ updatePiecesCountFromBitfield(message.bitfield);
+ //Update the current piece
+ if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
+ updateCurrentPiece();
+ }
+ remotePeer.bitfield = message.bitfield.clone();
+ break;
+ case INTERESTED:
+ Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
+ assert remotePeer != null;
+ remotePeer.interested = true;
+ break;
+ case NOTINTERESTED:
+ Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
+ assert remotePeer != null;
+ remotePeer.interested = false;
+ break;
+ case UNCHOKE:
+ Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
+ assert remotePeer != null;
+ remotePeer.chokedDownload = false;
+ activePeers.put(remotePeer.id,remotePeer);
+ sendRequestsToPeer(remotePeer);
+ break;
+ case CHOKE:
+ Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
+ assert remotePeer != null;
+ remotePeer.chokedDownload = true;
+ activePeers.remove(remotePeer.id);
+ break;
+ case HAVE:
+ if (remotePeer.bitfield == null) {
+ return;
+ }
+ Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
+ assert message.index >= 0 && message.index < Common.FILE_PIECES;
+ assert remotePeer.bitfield != null;
+ remotePeer.bitfield[message.index] = '1';
+ piecesCount[message.index]++;
+ //Send interested message to the peer if he has what we want
+ if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
+ remotePeer.amInterested = true;
+ sendInterested(remotePeer.mailbox);
+ }
+
+ if (currentPieces.contains(message.index)) {
+ sendRequest(message.mailbox,message.index);
+ }
+ break;
+ case REQUEST:
+ assert message.index >= 0 && message.index < Common.FILE_PIECES;
+ if (!remotePeer.chokedUpload) {
+ Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
+ if (bitfield[message.index] == '1') {
+ sendPiece(message.mailbox,message.index,false);
+ }
+ else {
+ Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
+ }
+ }
+ break;
+ case PIECE:
+ if (message.stalled) {
+ Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
+ }
+ else {
+ Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
+ if (bitfield[message.index] == '0') {
+ piecesRequested--;
+ //Removing the piece from our piece list.
+ if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
+ }
+ //Setting the fact that we have the piece
+ bitfield[message.index] = '1';
+ pieces++;
+ Msg.debug("My status is now " + getStatus());
+ //Sending the information to all the peers we are connected to
+ sendHave(message.index);
+ //sending UNINTERSTED to peers that doesn't have what we want.
+ updateInterestedAfterReceive();
+ }
+ else {
+ Msg.debug("However, we already have it.");
+ }
+ }
+ break;
+ }
+ }
+ /**
+ * Wait for the node to receive interesting bitfield messages (ie: non empty)
+ * to be received
+ */
+ void waitForPieces() {
+ boolean finished = false;
+ while (Msg.getClock() < deadline && !finished) {
+ if (commReceived == null) {
+ commReceived = Task.irecv(mailbox);
+ }
+ try {
+ commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
+ handleMessage(commReceived.getTask());
+ if (currentPiece != -1) {
+ finished = true;
+ }
+ commReceived = null;
+ }
+ catch (MsgException e) {
+ commReceived = null;
+ }
+ }
+ }
+
+ private boolean hasFinished() {
+ for (int i = 0; i < bitfield.length; i++) {
+ if (bitfield[i] == '1') {
+ return true;
+ }
+ }
+ return false;
+ }
+ /**
+ * Updates the list of who has a piece from a bitfield
+ * @param bitfield bitfield
+ */
+ private void updatePiecesCountFromBitfield(char bitfield[]) {
+ for (int i = 0; i < Common.FILE_PIECES; i++) {
+ if (bitfield[i] == '1') {
+ piecesCount[i]++;
+ }
+ }
+ }
+ /**
+ * Update the piece the peer is currently interested in.
+ * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
+ * If the peer has less than 3 pieces, he chooses a piece at random.
+ * If the peer has more than pieces, he downloads the pieces that are the less
+ * replicated
+ */
+ void updateCurrentPiece() {
+ if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
+ return;
+ }
+ if (true || pieces < 3) {
+ int i = 0, peerPiece;
+ do {
+ currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
+ i++;
+ } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
+ }
+ else {
+ //trivial min algorithm.
+ //TODO
+ }
+ currentPieces.add(currentPiece);
+ Msg.debug("New interested piece: " + currentPiece);
+ assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
+ }
+ /**
+ * Update the list of current choked and unchoked peers, using the
+ * choke algorithm
+ */
+ private void updateChokedPeers() {
+ round = (round + 1) % 3;
+ if (peers.size() == 0) {
+ return;
+ }
+ //remove a peer from the list
+ Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
+ if (it.hasNext()) {
+ Entry<Integer,Connection> e = it.next();
+ Connection peerChoked = e.getValue();
+ sendChoked(peerChoked.mailbox);
+ peerChoked.chokedUpload = true;
+ activePeers.remove(e.getKey());
+ }
+ //Random optimistic unchoking
+ if (round == 0 || true) {
+ int j = 0, i;
+ Connection peerChoosed = null;
+ do {
+ i = 0;
+ int idChosen = ((int)Msg.getClock() + j) % peers.size();
+ for (Connection connection : peers.values()) {
+ if (i == idChosen) {
+ peerChoosed = connection;
+ break;
+ }
+ i++;
+ } //TODO: Not really the best way ever
+ if (!peerChoosed.interested) {
+ peerChoosed = null;
+ }
+ j++;
+ } while (peerChoosed == null && j < Common.MAXIMUM_PAIRS);
+ if (peerChoosed != null) {
+ activePeers.put(peerChoosed.id,peerChoosed);
+ peerChoosed.chokedUpload = false;
+ sendUnchoked(peerChoosed.mailbox);
+ }
+ }
+ //TODO: Use the leecher choke algorithm.
+ }
+ /**
+ * Updates our "interested" state about peers: send "not interested" to peers
+ * that don't have any more pieces we want.
+ */
+ private void updateInterestedAfterReceive() {
+ boolean interested;
+ for (Connection connection : peers.values()) {
+ interested = false;
+ if (connection.amInterested) {
+ for (Integer piece : currentPieces) {
+ if (connection.bitfield[piece] == '1') {
+ interested = true;
+ break;
+ }
+ }
+ if (!interested) {
+ connection.amInterested = false;
+ sendNotInterested(connection.mailbox);
+ }
+ }
+ }
+ }
+ /**
+ * Send request messages to a peer that have unchoked us
+ * @param remotePeer peer data to the peer we want to send the request
+ */
+ private void sendRequestsToPeer(Connection remotePeer) {
+ for (Integer piece : currentPieces) {
+ if (remotePeer.bitfield != null && remotePeer.bitfield[piece] == '1') {
+ sendRequest(remotePeer.mailbox, piece);
+ }
+ }
+ }
+ /**
+ * Find the peers that have the current interested piece and send them
+ * the "interested" message
+ */
+ private void sendInterestedToPeers() {
+ if (currentPiece == -1) {
+ return;
+ }
+ for (Connection connection : peers.values()) {
+ if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
+ connection.amInterested = true;
+ MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
+ task.dsend(connection.mailbox);
+ }
+ }
+ currentPiece = -1;
+ piecesRequested++;
+ }
+ /**
+ * Send a "interested" message to a peer.
+ */
+ private void sendInterested(String mailbox) {
+ MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
+ task.dsend(mailbox);
+ }
+ /**
+ * Send a "not interested" message to a peer
+ * @param mailbox mailbox destination mailbox
+ */
+ private void sendNotInterested(String mailbox) {
+ MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
+ task.dsend(mailbox);
+ }
+ /**
+ * Send a handshake message to all the peers the peer has.
+ * @param peer peer data
+ */
+ private void sendHandshakeAll() {
+ for (Connection remotePeer : peers.values()) {
+ MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
+ id);
+ task.dsend(remotePeer.mailbox);
+ }
+ }
+ /**
+ * Send a "handshake" message to an user
+ * @param mailbox mailbox where to we send the message
+ */
+ private void sendHandshake(String mailbox) {
+ Msg.debug("Sending a HANDSHAKE to " + mailbox);
+ MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
+ task.dsend(mailbox);
+ }
+ /**
+ * Send a "choked" message to a peer
+ */
+ private void sendChoked(String mailbox) {
+ Msg.debug("Sending a CHOKE to " + mailbox);
+ MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
+ task.dsend(mailbox);
+ }
+ /**
+ * Send a "unchoked" message to a peer
+ */
+ private void sendUnchoked(String mailbox) {
+ Msg.debug("Sending a UNCHOKE to " + mailbox);
+ MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
+ task.dsend(mailbox);
+ }
+ /**
+ * Send a "HAVE" message to all peers we are connected to
+ */
+ private void sendHave(int piece) {
+ Msg.debug("Sending HAVE message to all my peers");
+ for (Connection remotePeer : peers.values()) {
+ MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
+ task.dsend(remotePeer.mailbox);
+ }
+ }
+ /**
+ * Send a bitfield message to all the peers the peer has.
+ * @param peer peer data
+ */
+ private void sendBitfield(String mailbox) {
+ Msg.debug("Sending a BITFIELD to " + mailbox);
+ MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
+ task.dsend(mailbox);
+ }
+ /**
+ * Send a "request" message to a pair, containing a request for a piece
+ */
+ private void sendRequest(String mailbox, int piece) {
+ Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece);
+ MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece);
+ task.dsend(mailbox);
+ }
+ /**
+ * Send a "piece" message to a pair, containing a piece of the file
+ */
+ private void sendPiece(String mailbox, int piece, boolean stalled) {
+ Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
+ MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled);
+ task.dsend(mailbox);
+ }
+
+ private String getStatus() {
+ String s = "";
+ for (int i = 0; i < Common.FILE_PIECES; i++) {
+ s = s + bitfield[i];
+ }
+ return s;
+ }
+}
+
--- /dev/null
+package bittorrent;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.simgrid.msg.Comm;
+import org.simgrid.msg.Host;
+import org.simgrid.msg.Process;
+import org.simgrid.msg.Msg;
+import org.simgrid.msg.MsgException;
+import org.simgrid.msg.Task;
+
+import org.simgrid.msg.RngStream;
+/**
+ * Tracker, handle requests from peers.
+ */
+public class Tracker extends Process {
+ protected RngStream stream;
+ /**
+ * Peers list
+ */
+ protected ArrayList<Integer> peersList;
+ /**
+ * End time for the simulation
+ */
+ protected double deadline;
+ /**
+ * Current comm received
+ */
+ protected Comm commReceived = null;
+
+ public Tracker(Host host, String name, String[]args) {
+ super(host,name,args);
+ }
+
+ @Override
+ public void main(String[] args) throws MsgException {
+ if (args.length != 1) {
+ Msg.info("Wrong number of arguments for the tracker.");
+ return;
+ }
+ //Build the RngStream object for randomness
+ stream = new RngStream("tracker");
+ //Retrieve the end time
+ deadline = Double.valueOf(args[0]);
+ //Building peers array
+ peersList = new ArrayList<Integer>();
+
+ Msg.info("Tracker launched.");
+ while (Msg.getClock() < deadline) {
+ if (commReceived == null) {
+ commReceived = Task.irecv(Common.TRACKER_MAILBOX);
+ }
+ try {
+ if (commReceived.test()) {
+ Task task = commReceived.getTask();
+ if (task instanceof TrackerTask) {
+ TrackerTask tTask = (TrackerTask)task;
+ //Sending peers to the peer
+ int nbPeers = 0;
+ while (nbPeers < Common.MAXIMUM_PAIRS && nbPeers < peersList.size()) {
+ int nextPeer;
+ do {
+ nextPeer = stream.randInt(0, peersList.size() - 1);
+ } while (tTask.peers.contains(nextPeer));
+ tTask.peers.add(peersList.get(nextPeer));
+ nbPeers++;
+ }
+ //Adding the peer to our list
+ peersList.add(tTask.peerId);
+ tTask.type = TrackerTask.Type.ANSWER;
+ //Setting the interval
+ tTask.interval = Common.TRACKER_QUERY_INTERVAL;
+ //Sending the task back to the peer
+ tTask.dsend(tTask.mailbox);
+ }
+ commReceived = null;
+ }
+ else {
+ waitFor(1);
+ }
+ }
+ catch (MsgException e) {
+ commReceived = null;
+ }
+ }
+ Msg.info("Tracker is leaving");
+ }
+
+}
--- /dev/null
+package bittorrent;
+import java.util.ArrayList;
+
+import org.simgrid.msg.Task;
+
+/**
+ * Task exchanged between the tracker
+ * and the peers.
+ */
+public class TrackerTask extends Task {
+ /**
+ * Type of the tasks
+ */
+ public enum Type {
+ REQUEST,
+ ANSWER
+ };
+ public Type type;
+ public String hostname;
+ public String mailbox;
+ public int peerId;
+ public int uploaded;
+ public int downloaded;
+ public int left;
+ public double interval;
+ public ArrayList<Integer> peers;
+
+ public TrackerTask(String hostname, String mailbox, int peerId) {
+ this(hostname, mailbox, peerId, 0, 0, Common.FILE_SIZE);
+ }
+ public TrackerTask(String hostname, String mailbox, int peerId, int uploaded, int downloaded, int left) {
+ super("", 0, Common.TRACKER_COMM_SIZE);
+ this.type = Type.REQUEST;
+ this.hostname = hostname;
+ this.mailbox = mailbox;
+ this.peerId = peerId;
+ this.uploaded = uploaded;
+ this.downloaded = downloaded;
+ this.left = left;
+ this.peers = new ArrayList<Integer>();
+ }
+
+}
--- /dev/null
+#! ./tesh
+
+! output sort
+
+$ java -cp .:${srcdir:=.}/examples:${srcdir:=.}/simgrid.jar bittorrent/Bittorrent ${srcdir:=.}/examples/platform.xml ${srcdir:=.}/examples/bittorrent/bittorrent.xml
+> [0.000000] [jmsg/INFO] Ready to run MSG_MAIN
+> [5000.832370] [jmsg/INFO] Done running MSG_MAIN
+> [5000.832370] [jmsg/INFO] MSG_main finished
+> [5000.832370] [jmsg/INFO] Clean java world
+> [5000.832370] [jmsg/INFO] Clean native world
+> [Boivin:bittorrent.Peer:(2) 0.000000] [jmsg/INFO] Hi, I'm joining the network with id 2
+> [Boivin:bittorrent.Peer:(2) 5000.048882] [jmsg/INFO] Here is my current status: 1111111111
+> [Disney:bittorrent.Peer:(6) 0.000000] [jmsg/INFO] Hi, I'm joining the network with id 6
+> [Disney:bittorrent.Peer:(6) 5000.131405] [jmsg/INFO] Here is my current status: 1111111111
+> [Geoff:bittorrent.Peer:(5) 0.000000] [jmsg/INFO] Hi, I'm joining the network with id 5
+> [Geoff:bittorrent.Peer:(5) 5000.764583] [jmsg/INFO] Here is my current status: 1111111111
+> [Jacquelin:bittorrent.Tracker:(1) 0.000000] [jmsg/INFO] Tracker launched.
+> [Jacquelin:bittorrent.Tracker:(1) 3000.000000] [jmsg/INFO] Tracker is leaving
+> [Jean_Yves:bittorrent.Peer:(3) 0.000000] [jmsg/INFO] Hi, I'm joining the network with id 3
+> [Jean_Yves:bittorrent.Peer:(3) 5000.832370] [jmsg/INFO] Here is my current status: 1111111111
+> [McGee:bittorrent.Peer:(8) 0.000000] [jmsg/INFO] Hi, I'm joining the network with id 8
+> [McGee:bittorrent.Peer:(8) 5000.783574] [jmsg/INFO] Here is my current status: 1111111111
+> [TeX:bittorrent.Peer:(4) 0.000000] [jmsg/INFO] Hi, I'm joining the network with id 4
+> [TeX:bittorrent.Peer:(4) 5000.304959] [jmsg/INFO] Here is my current status: 1111111111
+> [iRMX:bittorrent.Peer:(7) 0.000000] [jmsg/INFO] Hi, I'm joining the network with id 7
+> [iRMX:bittorrent.Peer:(7) 5000.729205] [jmsg/INFO] Here is my current status: 1111111111
+
--- /dev/null
+<?xml version='1.0'?>
+<!DOCTYPE platform SYSTEM "http://simgrid.gforge.inria.fr/simgrid.dtd">
+<platform version="3">
+
+ <process host="Jacquelin" function="bittorrent.Tracker">
+ <argument value="3000" />
+ </process>
+
+ <process host="Boivin" function="bittorrent.Peer">
+ <argument value="00000002"/> <!-- my id -->
+ <argument value="5000" /> <!-- end time -->
+ <argument value="1" /> <!-- indicates if the bittorrent.Peer is a seed at the begining of the simulation -->
+ </process>
+ <process host="Jean_Yves" function="bittorrent.Peer">
+ <argument value="00000003"/> <!-- my id -->
+ <argument value="5000" /> <!-- end time -->
+ </process>
+ <process host="TeX" function="bittorrent.Peer">
+ <argument value="00000004"/> <!-- my id -->
+ <argument value="5000" /> <!-- end time -->
+ </process>
+ <process host="Geoff" function="bittorrent.Peer">
+ <argument value="00000005"/> <!-- my id -->
+ <argument value="5000" /> <!-- end time -->
+ </process>
+ <process host="Disney" function="bittorrent.Peer">
+ <argument value="00000006"/> <!-- my id -->
+ <argument value="5000" /> <!-- end time -->
+ </process>
+ <process host="iRMX" function="bittorrent.Peer">
+ <argument value="00000007"/> <!-- my id -->
+ <argument value="5000" /> <!-- end time -->
+ </process>
+ <process host="McGee" function="bittorrent.Peer">
+ <argument value="00000008"/> <!-- my id -->
+ <argument value="5000" /> <!-- end time -->
+ </process>
+
+</platform>
--- /dev/null
+#!/usr/bin/python
+
+# This script generates a specific deployment file for the Bittorrent example.
+# It assumes that the platform will be a cluster.
+# Usage: python generate.py nb_nodes nb_bits end_date percentage
+# Example: python generate.py 10000 5000
+
+import sys, random
+
+if len(sys.argv) != 4:
+ print("Usage: python generate.py nb_nodes end_date seed_percentage > deployment_file.xml")
+ sys.exit(1)
+
+nb_nodes = int(sys.argv[1])
+end_date = int(sys.argv[2])
+seed_percentage = int(sys.argv[3]);
+
+nb_bits = 24
+max_id = 2 ** nb_bits - 1
+all_ids = [42]
+
+sys.stdout.write("<?xml version='1.0'?>\n"
+"<!DOCTYPE platform SYSTEM \"http://simgrid.gforge.inria.fr/simgrid.dtd\">\n"
+"<platform version=\"3\">\n"
+" <process host=\"c-0.me\" function=\"bittorrent.Tracker\"><argument value=\"%d\"/></process>\n" % end_date)
+
+for i in range(1, nb_nodes):
+
+ ok = False
+ while not ok:
+ my_id = random.randint(0, max_id)
+ ok = not my_id in all_ids
+ start_date = i * 10
+ line = " <process host=\"c-%d.me\" function=\"bittorrent.Peer\"><argument value=\"%d\" /><argument value=\"%d\" />" % (i, my_id, end_date)
+ if random.randint(0,100) < seed_percentage:
+ line += "<argument value=\"1\" />"
+ line += "</process>\n";
+ sys.stdout.write(line)
+ all_ids.append(my_id)
+sys.stdout.write("</platform>")
+
--- /dev/null
+#! ./tesh
+
+! output sort
+
+$ java -cp .:${srcdir:=.}/examples:${srcdir:=.}/simgrid.jar chord/Chord ${srcdir:=.}/examples/platform.xml ${srcdir:=.}/examples/chord/chord.xml
+> [0.000000] [jmsg/INFO] Ready to run MSG_MAIN
+> [652.548539] [jmsg/INFO] Done running MSG_MAIN
+> [652.548539] [jmsg/INFO] MSG_main finished
+> [652.548539] [jmsg/INFO] Clean java world
+> [652.548539] [jmsg/INFO] Clean native world
+> [Boivin:chord.Node:(7) 0.000000] [jmsg/INFO] Joining the ring with id 8 knowing node 1
+> [Gatien:chord.Node:(1) 0.000000] [jmsg/INFO] Joining the ring with id 48 knowing node 1
+> [Geoff:chord.Node:(4) 0.000000] [jmsg/INFO] Joining the ring with id 32 knowing node 1
+> [Jean_Yves:chord.Node:(6) 0.000000] [jmsg/INFO] Joining the ring with id 14 knowing node 1
+> [McGee:chord.Node:(2) 0.000000] [jmsg/INFO] Joining the ring with id 42 knowing node 1
+> [TeX:chord.Node:(5) 0.000000] [jmsg/INFO] Joining the ring with id 21 knowing node 1
+> [iRMX:chord.Node:(3) 0.000000] [jmsg/INFO] Joining the ring with id 38 knowing node 1
\ No newline at end of file
/**
* Indicates if the communication is a receiving communication
*/
- boolean receiving;
+ protected boolean receiving;
+ /**
+ * Indicates if the communication is finished
+ */
+ protected boolean finished = false;
/**
* Represents the bind between the java comm and the
* native C comm. You must never access it, since it is
/**
* This method kill a process.
- * @param process the process to be killed.
*
*/
public native void kill();
--- /dev/null
+/*
+ * JNI interface to C RngStream code
+ *
+ * Copyright 2006,2007,2010,2012 The SimGrid Team.
+ * All right reserved.
+ *
+ * This program is free software; you can redistribute
+ * it and/or modify it under the terms of the license
+ * (GNU LGPL) which comes with this package.
+ */
+package org.simgrid.msg;
+/**
+ * Export of RngStreams for Java
+ */
+public class RngStream {
+ /**
+ * Represents the bind between the RngStream java object and the C object.
+ */
+ public long bind;
+ /**
+ * Creates and returns a new stream without identifier.
+ * This procedure reserves space to keep the information relative to
+ * the RngStream, initializes its seed Ig , sets Bg and Cg equal to Ig , sets its antithetic and
+ * precision switches to 0. The seed Ig is equal to the initial seed of the package given by
+ * setPackageSeed if this is the first stream created, otherwise it is Z steps ahead
+ * of that of the most recently created stream.
+ */
+ public RngStream() {
+ create("");
+ }
+ /**
+ * Creates and returns a new stream with identifier "name".
+ * This procedure reserves space to keep the information relative to
+ * the RngStream, initializes its seed Ig , sets Bg and Cg equal to Ig , sets its antithetic and
+ * precision switches to 0. The seed Ig is equal to the initial seed of the package given by
+ * setPackageSeed if this is the first stream created, otherwise it is Z steps ahead
+ * of that of the most recently created stream.
+ */
+ public RngStream(String name) {
+ create(name);
+ }
+ /**
+ * The natively implemented method to create a C RngStream object.
+ */
+ private native void create(String name);
+ /**
+ * Destructor
+ */
+ protected void finalize() {
+ destroy();
+ }
+ /**
+ * Release the C RngStream object
+ */
+ private native void destroy();
+
+ /**
+ * Sets the initial seed of the package RngStreams to the six integers in the vector seed. This will
+ * be the seed (initial state) of the first stream. If this procedure is not called, the default initial
+ * seed is (12345, 12345, 12345, 12345, 12345, 12345). If it is called, the first 3 values of the seed
+ * must all be less than m1 = 4294967087, and not all 0; and the last 3 values must all be less
+ * than m2 = 4294944443, and not all 0. Returns false for invalid seeds, and true otherwise.
+ */
+ public static native boolean setPackageSeed(int seed[]);
+ /**
+ * Reinitializes the stream g to its initial state: Cg and Bg are set to Ig .
+ */
+ public native void resetStart();
+ /**
+ * Reinitializes the stream g to the beginning of its current substream: Cg is set to Bg .
+ */
+ public native void restartStartSubstream();
+ /**
+ * Reinitializes the stream g to the beginning of its next substream: Ng is computed, and Cg and
+ * Bg are set to Ng .
+ */
+ public native void resetNextSubstream();
+ /**
+ * If a = true the stream g will start generating antithetic variates, i.e., 1 − U instead of U , until
+ * this method is called again with a = false.
+ */
+ public native void setAntithetic(boolean a);
+ /**
+ * Sets the initial seed Ig of stream g to the vector seed. This vector must satisfy the same
+ * conditions as in setPackageSeed. The stream is then reset to this initial seed. The
+ * states and seeds of the other streams are not modified. As a result, after calling this procedure,
+ * the initial seeds of the streams are no longer spaced Z values apart. We discourage the use of
+ * this procedure. Returns false for invalid seeds, and true otherwise.
+ */
+ public native boolean setSeed(int seed[]);
+ /**
+ * Advances the state of the stream by k values, without modifying the states of other streams (as
+ * in RngStream_SetSeed), nor the values of Bg and Ig associated with this stream. If e > 0, then
+ * k = 2e + c; if e < 0, then k = −2−e + c; and if e = 0, then k = c. Note: c is allowed to take
+ *negative values. We discourage the use of this procedure.
+ */
+ public native void advanceState(int e, int g);
+
+ /**
+ * Returns a (pseudo)random number from the uniform distribution over the interval (0, 1), after advancing the state by one step. The returned number has 32 bits of precision
+ * in the sense that it is always a multiple of 1/(232 − 208), unless RngStream_IncreasedPrecis
+ * has been called for this stream.
+ */
+ public native double randU01();
+ /**
+ * Returns a (pseudo)random number from the discrete uniform distribution over the integers
+ * {i, i + 1, . . . , j}
+ */
+ public native int randInt(int i, int j);
+
+ /**
+ * Class initializer, to initialize various JNI stuff
+ */
+ public static native void nativeInit();
+ static {
+ nativeInit();
+ }
+}
\ No newline at end of file
#include <msg/msg.h>
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(jmsg);
-static jfieldID jtask_field_Comm_task;
static jfieldID jcomm_field_Comm_bind;
-static jfieldID jcomm_field_Comm_taskBind;
+static jfieldID jcomm_field_Comm_finished;
static jfieldID jcomm_field_Comm_receiving;
+static jfieldID jtask_field_Comm_task;
+static jfieldID jcomm_field_Comm_taskBind;
void jcomm_bind_task(JNIEnv *env, jobject jcomm) {
msg_comm_t comm = (msg_comm_t) (long) (*env)->GetLongField(env, jcomm, jcomm_field_Comm_bind);
jcomm_field_Comm_taskBind = jxbt_get_jfield(env, jfield_class_Comm, "taskBind", "J");
jcomm_field_Comm_receiving = jxbt_get_jfield(env, jfield_class_Comm, "receiving", "Z");
jtask_field_Comm_task = jxbt_get_jfield(env, jfield_class_Comm, "task", "Lorg/simgrid/msg/Task;");
- if (!jcomm_field_Comm_bind || !jcomm_field_Comm_taskBind || !jcomm_field_Comm_receiving || !jtask_field_Comm_task) {
+ jcomm_field_Comm_finished = jxbt_get_jfield(env, jfield_class_Comm, "finished", "Z");
+ if (!jcomm_field_Comm_bind || !jcomm_field_Comm_taskBind || !jcomm_field_Comm_receiving || !jtask_field_Comm_task || !jcomm_field_Comm_finished) {
jxbt_throw_native(env,bprintf("Can't find some fields in Java class."));
}
}
msg_comm_t comm;
comm = (msg_comm_t) (long) (*env)->GetLongField(env, jcomm, jcomm_field_Comm_bind);
+ jboolean finished = (*env)->GetBooleanField(env, jcomm, jcomm_field_Comm_finished);
+ if (finished == JNI_TRUE) {
+ return JNI_TRUE;
+ }
+
if (!comm) {
jxbt_throw_native(env,bprintf("comm is null"));
return JNI_FALSE;
jxbt_throw_native(env,bprintf("comm is null"));
return;
}
+
+ jboolean finished = (*env)->GetBooleanField(env, jcomm, jcomm_field_Comm_finished);
+ if (finished == JNI_TRUE) {
+ return;
+ }
+
MSG_error_t status;
TRY {
status = MSG_comm_wait(comm,(double)timeout);
CATCH_ANONYMOUS {
return;
}
+ (*env)->SetBooleanField(env, jcomm, jcomm_field_Comm_finished, JNI_TRUE);
if (status == MSG_OK) {
jcomm_bind_task(env,jcomm);
return;
void jprocess_bind(jobject jprocess, m_process_t process, JNIEnv * env)
{
- (*env)->SetLongField(env, jprocess, jprocess_field_Process_bind, (jlong) (long) (process));
+ (*env)->SetLongField(env, jprocess, jprocess_field_Process_bind, (jlong)(process));
}
jlong jprocess_get_id(jobject jprocess, JNIEnv * env)
(xbt_main_func_t) jprocess,
/*data*/ jprocess,
host,
- (double)jkill, /* kill time */
/*argc, argv, properties*/
0,NULL,NULL);
-
+ MSG_process_set_kill_time(process, (double)jkill);
MSG_process_set_data(process,&process);
/* bind the java process instance to the native process */
jprocess_bind(jprocess, process, env);
Java_org_simgrid_msg_Process_waitFor(JNIEnv * env, jobject jprocess,
jdouble jseconds)
{
- MSG_error_t rv;
+ MSG_error_t rv;
TRY {
rv = MSG_process_sleep((double)jseconds);
}
--- /dev/null
+/* Functions related to the RngStream Java port */
+
+/* Copyright (c) 2007, 2009, 2010, 2012. The SimGrid Team.
+ * All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+
+#include "jmsg_rngstream.h"
+#include "jxbt_utilities.h"
+
+jfieldID jrngstream_bind;
+
+RngStream jrngstream_to_native(JNIEnv *env, jobject jrngstream) {
+ RngStream rngstream = (RngStream) (*env)->GetLongField(env, jrngstream, jrngstream_bind);
+ if (!rngstream) {
+ jxbt_throw_notbound(env, "rngstream", jrngstream);
+ return NULL;
+ }
+ return rngstream;
+}
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_nativeInit(JNIEnv *env, jclass cls) {
+ jclass class_RngStream = (*env)->FindClass(env, "org/simgrid/msg/RngStream");
+
+ jrngstream_bind = jxbt_get_jfield(env, class_RngStream, "bind", "J");
+}
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_create(JNIEnv *env, jobject jrngstream, jstring jname) {
+ const char *name = (*env)->GetStringUTFChars(env, jname, 0);
+ RngStream rngstream = RngStream_CreateStream(name);
+ //Bind the RngStream object
+ (*env)->SetLongField(env, jrngstream, jrngstream_bind, (jlong)rngstream);
+
+ (*env)->ReleaseStringUTFChars(env, jname, name);
+}
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_destroy(JNIEnv *env, jobject jrngstream) {
+ RngStream rngstream = jrngstream_to_native(env, jrngstream);
+ RngStream_DeleteStream(&rngstream);
+ (*env)->SetLongField(env, jrngstream, jrngstream_bind, (jlong)NULL);
+}
+JNIEXPORT jboolean JNICALL
+Java_org_simgrid_msg_RngStream_setPackageSeed(JNIEnv *env, jobject jrngstream, jintArray jseed) {
+ jint buffer[6];
+
+ (*env)->GetIntArrayRegion(env, jseed, 0, 6, buffer);
+
+ RngStream rngstream = jrngstream_to_native(env, jrngstream);
+ if (!rngstream)
+ return JNI_FALSE;
+
+ int result = RngStream_SetPackageSeed((unsigned long*)buffer);
+
+ return result == -1 ? JNI_FALSE : JNI_TRUE;
+}
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_resetStart(JNIEnv *env, jobject jrngstream) {
+ RngStream rngstream = jrngstream_to_native(env, jrngstream);
+ if (!rngstream)
+ return;
+
+ RngStream_ResetStartStream(rngstream);
+}
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_resetStartSubstream(JNIEnv *env, jobject jrngstream) {
+ RngStream rngstream = jrngstream_to_native(env, jrngstream);
+ if (!rngstream)
+ return;
+
+ RngStream_ResetStartSubstream(rngstream);
+}
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_resetNextSubstream(JNIEnv *env, jobject jrngstream) {
+ RngStream rngstream = jrngstream_to_native(env, jrngstream);
+ if (!rngstream)
+ return;
+
+ RngStream_ResetNextSubstream(rngstream);
+}
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_setAntithetic(JNIEnv *env, jobject jrngstream, jboolean ja) {
+ RngStream rngstream = jrngstream_to_native(env, jrngstream);
+ if (!rngstream)
+ return;
+
+ if (ja == JNI_TRUE) {
+ RngStream_SetAntithetic(rngstream,-1);
+ }
+ else {
+ RngStream_SetAntithetic(rngstream,0);
+ }
+}
+JNIEXPORT jboolean JNICALL
+Java_org_simgrid_msg_RngStream_setSeed(JNIEnv *env, jobject jrngstream, jintArray jseed) {
+ jint buffer[6];
+
+ (*env)->GetIntArrayRegion(env, jseed, 0, 6, buffer);
+
+ RngStream rngstream = jrngstream_to_native(env, jrngstream);
+ if (!rngstream)
+ return JNI_FALSE;
+
+
+ int result = RngStream_SetSeed(rngstream, (unsigned long*)buffer);
+
+ return result == -1 ? JNI_FALSE : JNI_TRUE;
+}
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_advanceState(JNIEnv *env, jobject jrngstream, jint e, jint g) {
+ RngStream rngstream = jrngstream_to_native(env, jrngstream);
+ if (!rngstream)
+ return;
+
+ RngStream_AdvanceState(rngstream, (long)e, (long)g);
+}
+JNIEXPORT jdouble JNICALL
+Java_org_simgrid_msg_RngStream_randU01(JNIEnv *env, jobject jrngstream) {
+ RngStream rngstream = jrngstream_to_native(env, jrngstream);
+ if (!rngstream)
+ return 0;
+
+ return (jdouble)RngStream_RandU01(rngstream);
+}
+JNIEXPORT jint JNICALL
+Java_org_simgrid_msg_RngStream_randInt(JNIEnv *env, jobject jrngstream, jint i, jint j) {
+ RngStream rngstream = jrngstream_to_native(env, jrngstream);
+ if (!rngstream)
+ return 0;
+
+ return (jint)RngStream_RandInt(rngstream, (int)i, (int)j);
+}
--- /dev/null
+/* Functions related to the RngStream Java port */
+
+/* Copyright (c) 2007, 2009, 2010, 2012. The SimGrid Team.
+ * All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+#ifndef MSG_RNGSTREAM_H
+#define MSG_RNGSTREAM_H
+#include <jni.h>
+#include <xbt/RngStream.h>
+
+RngStream jrngstream_to_native(JNIEnv *env, jobject jrngstream);
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_nativeInit(JNIEnv *env, jclass cls);
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_create(JNIEnv *env, jobject jrngstream, jstring name);
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_destroy(JNIEnv *env, jobject jrngstream);
+
+JNIEXPORT jboolean JNICALL
+Java_org_simgrid_msg_RngStream_setPackageSeed(JNIEnv *env, jobject jrngstream, jintArray seed);
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_resetStart(JNIEnv *env, jobject jrngstream);
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_resetStartSubstream(JNIEnv *env, jobject jrngstream);
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_resetNextSubstream(JNIEnv *env, jobject jrngstream);
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_setAntithetic(JNIEnv *env, jobject jrngstream, jboolean ja);
+
+JNIEXPORT jboolean JNICALL
+Java_org_simgrid_msg_RngStream_setSeed(JNIEnv *env, jobject jrngstream, jintArray jseed);
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_advanceState(JNIEnv *env, jobject jrngstream, jint e, jint g);
+
+JNIEXPORT jdouble JNICALL
+Java_org_simgrid_msg_RngStream_randU01(JNIEnv *env, jobject jrngstream);
+
+JNIEXPORT jint JNICALL
+Java_org_simgrid_msg_RngStream_randInt(JNIEnv *env, jobject jrngstream, jint i, jint j);
+
+#endif /* MSG_RNGSTREAM_H */