Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge concerns - Adrien
authoralebre <alebre@dhcp-2-172.rech172-28.emn.fr>
Tue, 5 Jun 2012 12:23:49 +0000 (14:23 +0200)
committeralebre <alebre@dhcp-2-172.rech172-28.emn.fr>
Tue, 5 Jun 2012 12:23:49 +0000 (14:23 +0200)
19 files changed:
CMakeLists.txt
examples/bittorrent/Bittorrent.java [new file with mode: 0644]
examples/bittorrent/Common.java [new file with mode: 0644]
examples/bittorrent/Connection.java [new file with mode: 0644]
examples/bittorrent/MessageTask.java [new file with mode: 0644]
examples/bittorrent/Peer.java [new file with mode: 0644]
examples/bittorrent/Tracker.java [new file with mode: 0644]
examples/bittorrent/TrackerTask.java [new file with mode: 0644]
examples/bittorrent/bittorrent.tesh [new file with mode: 0644]
examples/bittorrent/bittorrent.xml [new file with mode: 0644]
examples/bittorrent/generate.py [new file with mode: 0755]
examples/chord/chord.tesh [new file with mode: 0644]
org/simgrid/msg/Comm.java
org/simgrid/msg/Process.java
org/simgrid/msg/RngStream.java [new file with mode: 0644]
src/jmsg_comm.c
src/jmsg_process.c
src/jmsg_rngstream.c [new file with mode: 0644]
src/jmsg_rngstream.h [new file with mode: 0644]

index c6e0b7c..7c442ce 100644 (file)
@@ -90,6 +90,8 @@ set(JMSG_C_SRC
        src/jmsg_synchro.h
        src/jtrace.c
        src/jtrace.h
+       src/jmsg_rngstream.c
+       src/jmsg_rngstream.h
 )
 
 set(JMSG_JAVA_SRC
@@ -110,9 +112,17 @@ set(JMSG_JAVA_SRC
        org/simgrid/msg/Mutex.java
        org/simgrid/msg/Comm.java
        org/simgrid/trace/Trace.java
+       org/simgrid/msg/RngStream.java
 )
 
 set(JAVA_EXAMPLES
+  examples/bittorrent/Bittorrent.java
+  examples/bittorrent/Common.java
+  examples/bittorrent/Connection.java
+  examples/bittorrent/MessageTask.java
+  examples/bittorrent/Peer.java
+  examples/bittorrent/Tracker.java
+  examples/bittorrent/TrackerTask.java
   examples/chord/Chord.java
   examples/chord/Common.java
   examples/chord/Node.java
@@ -181,6 +191,7 @@ set(XML_FILES
     examples/master_slave_kill/platform.xml
     examples/async/asyncDeployment.xml
     examples/tracing/tracingPingPongDeployment.xml
+    examples/bittorrent/bittorrent.xml
 )
 
 set(source_to_pack
@@ -204,6 +215,7 @@ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O0 -g -Wall -Wunused -Wmissing-prototypes -
 set(INCLUDE_PATH "-I${CMAKE_HOME_DIRECTORY}/src -I${SIMGRID_INCLUDES} ")
 SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${INCLUDE_PATH}")
 
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-error=int-to-pointer-cast -Wno-error=pointer-to-int-cast")
 if(COMPILER_C_VERSION_MAJOR_MINOR MATCHES "4.6")
     set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-error=unused-but-set-variable")
 endif(COMPILER_C_VERSION_MAJOR_MINOR MATCHES "4.6")
@@ -277,6 +289,7 @@ add_custom_command(
        COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/pingPong/*.java
        COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/startKillTime/*.java
        COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/tracing/*.java
+       COMMAND ${JAVA_COMPILE} -d ${CMAKE_HOME_DIRECTORY}/examples -cp ${CMAKE_HOME_DIRECTORY}/simgrid.jar ${CMAKE_HOME_DIRECTORY}/examples/bittorrent/*.java
 )
 
 add_custom_target(simgrid_java_examples ALL
@@ -299,8 +312,10 @@ ${CMAKE_HOME_DIRECTORY}/simgrid.jar
 INCLUDE(CTest)
 ENABLE_TESTING()
 
-ADD_TEST(basic           ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/basic/basic.tesh)
 ADD_TEST(async           ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/async/async.tesh)
+ADD_TEST(basic           ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/basic/basic.tesh)
+ADD_TEST(bittorrent           ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/bittorrent/bittorrent.tesh)
+ADD_TEST(chord          ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/chord/chord.tesh)
 ADD_TEST(pingPong        ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/pingPong/pingpong.tesh)
 ADD_TEST(CommTime        ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/commTime/commtime.tesh)
 ADD_TEST(mutualExclusion ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/mutualExclusion/mutualexclusion.tesh)
@@ -309,7 +324,7 @@ ADD_TEST(kill            ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE
 ADD_TEST(startKillTime   ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/startKillTime/startKillTime.tesh)
 ADD_TEST(tracing         ${TESH_BIN_PATH} ${TESH_OPTION} --setenv srcdir=${CMAKE_HOME_DIRECTORY} ${CMAKE_HOME_DIRECTORY}/examples/tracing/tracingPingPong.tesh)
 #Don't forget to put new test in this list!!!
-set(test_list basic async pingPong CommTime mutualExclusion bypass kill startKillTime tracing)
+set(test_list basic bittorrent chord async pingPong CommTime mutualExclusion bypass kill startKillTime tracing)
 
 ##########################################
 # Set the  DYLD_LIBRARY_PATH for mac     #
diff --git a/examples/bittorrent/Bittorrent.java b/examples/bittorrent/Bittorrent.java
new file mode 100644 (file)
index 0000000..5288ca9
--- /dev/null
@@ -0,0 +1,24 @@
+package bittorrent;
+
+import org.simgrid.msg.Msg;
+import org.simgrid.msg.MsgException;
+
+public class Bittorrent {
+       public static void main(String[] args) throws MsgException {
+               /* initialize the MSG simulation. Must be done before anything else (even logging). */
+               Msg.init(args);
+       if(args.length < 2) {
+               Msg.info("Usage   : Bittorrent platform_file deployment_file");
+               Msg.info("example : Bittorrent platform.xml deployment.xml");
+               System.exit(1);
+       }
+               
+               /* construct the platform and deploy the application */
+               Msg.createEnvironment(args[0]);
+               Msg.deployApplication(args[1]);
+                       
+               /*  execute the simulation. */
+        Msg.run();             
+       }
+
+}
diff --git a/examples/bittorrent/Common.java b/examples/bittorrent/Common.java
new file mode 100644 (file)
index 0000000..9f15539
--- /dev/null
@@ -0,0 +1,53 @@
+package bittorrent;
+/**
+ * Common constants for use in the simulation
+ */
+public class Common {  
+       public static String TRACKER_MAILBOX = "tracker_mailbox";
+       
+       public static int FILE_SIZE = 5120;
+       public static int FILE_PIECE_SIZE = 512;
+       public static int FILE_PIECES = 10;
+       
+       public static int PIECE_COMM_SIZE = 1;
+       /**
+        * Information message size
+        */
+       public static int MESSAGE_SIZE = 1;
+       /**
+        * Max number of pairs sent by the tracker to clients
+        */
+       public static int MAXIMUM_PAIRS = 50;
+       /**
+        * Interval of time where the peer should send a request to the tracker
+        */
+       public static int TRACKER_QUERY_INTERVAL = 1000;
+       /**
+        * Communication size for a task to the tracker
+        */
+       public static double TRACKER_COMM_SIZE = 0.01;
+       /**
+        * Timeout for the get peers data
+        */
+       public static int GET_PEERS_TIMEOUT = 10000;
+       /**
+        * Timeout for "standard" messages.
+        */
+       public static int TIMEOUT_MESSAGE = 10;
+       /**
+        * Timeout for tracker receive.
+        */
+       public static int TRACKER_RECEIVE_TIMEOUT = 10;
+       /**
+        * Number of peers that can be unchocked at a given time
+        */
+       public static int MAX_UNCHOKED_PEERS = 4;
+       /**
+        * Interval between each update of the choked peers
+        */
+       public static int UPDATE_CHOKED_INTERVAL = 50;
+       /**
+        * Number of pieces the peer asks for simultaneously
+        */
+       public static int MAX_PIECES = 1;
+}
diff --git a/examples/bittorrent/Connection.java b/examples/bittorrent/Connection.java
new file mode 100644 (file)
index 0000000..5cb1b62
--- /dev/null
@@ -0,0 +1,54 @@
+package bittorrent;
+
+import java.util.Arrays;
+
+public class Connection {
+       /**
+        * Remote peer id
+        */
+       public int id;
+       /**
+        * Remote peer bitfield.
+        */
+       public char bitfield[];
+       /**
+        * Remote peer mailbox
+        */
+       public String mailbox;
+       /**
+        * Indicates if we are interested in something this peer has
+        */
+       public boolean amInterested = false;
+       /**
+        * Indicates if the peer is interested in one of our pieces
+        */
+       public boolean interested = false;
+       /**
+        * Indicates if the peer is choked for the current peer
+        */
+       public boolean chokedUpload = true;
+       /**
+        * Indicates if the peer has choked the current peer
+        */
+       public boolean chokedDownload = true;
+       
+       /**
+        * Constructor
+        */
+       public Connection(int id) {
+               this.id = id;
+               this.mailbox = Integer.toString(id);
+       }
+
+       @Override
+       public String toString() {
+               return "Connection [id=" + id + ", bitfield="
+                               + Arrays.toString(bitfield) + ", mailbox=" + mailbox
+                               + ", amInterested=" + amInterested + ", interested="
+                               + interested + ", chokedUpload=" + chokedUpload
+                               + ", chokedDownload=" + chokedDownload + "]";
+       }
+       
+       
+}
\ No newline at end of file
diff --git a/examples/bittorrent/MessageTask.java b/examples/bittorrent/MessageTask.java
new file mode 100644 (file)
index 0000000..558515f
--- /dev/null
@@ -0,0 +1,86 @@
+package bittorrent;
+
+import org.simgrid.msg.Task;
+/**
+ * Tasks sent between peers
+ */
+public class MessageTask extends Task {
+       public enum Type {
+               HANDSHAKE,
+               CHOKE,
+               UNCHOKE,
+               INTERESTED,
+               NOTINTERESTED,
+               HAVE,
+               BITFIELD,
+               REQUEST,
+               PIECE
+       };
+       public Type type;
+       public String issuerHostname;
+       public String mailbox;
+       public int peerId;
+       public char bitfield[];
+       public int index;
+       public boolean stalled;
+       /**
+        * Constructor, builds a value-less message
+        * @param type
+        * @param issuerHostname
+        * @param mailbox
+        * @param peerId
+        */
+       public MessageTask(Type type, String issuerHostname, String mailbox, int peerId) {
+               this.type = type;
+               this.issuerHostname = issuerHostname;
+               this.mailbox = mailbox;
+               this.peerId = peerId;
+       }
+       /**
+        * Constructor, builds a new "have/request/piece" message
+        * @param type
+        * @param issuerHostname
+        * @param mailbox
+        * @param peerId
+        * @param index
+        */
+       public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index) {
+               this.type = type;
+               this.issuerHostname = issuerHostname;
+               this.mailbox = mailbox;
+               this.peerId = peerId;
+               this.index = index;
+       }
+       /**
+        * Constructor, builds a new bitfield message
+        * @param type
+        * @param issuerHostname
+        * @param mailbox
+        * @param peerId
+        * @param bitfield
+        */
+       public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, char bitfield[]) {
+               this.type = type;
+               this.issuerHostname = issuerHostname;
+               this.mailbox = mailbox;
+               this.peerId = peerId;
+               this.bitfield = bitfield;
+       }
+       /**
+        * Constructor, build a new "piece" message
+        * @param type
+        * @param issuerHostname
+        * @param mailbox
+        * @param peerId
+        * @param index
+        * @param stalled
+        */
+       public MessageTask(Type type, String issuerHostname, String mailbox, int peerId, int index, boolean stalled) {
+               this.type = type;
+               this.issuerHostname = issuerHostname;
+               this.mailbox = mailbox;
+               this.peerId = peerId;
+               this.index = index;
+               this.stalled = stalled;
+       }       
+}
diff --git a/examples/bittorrent/Peer.java b/examples/bittorrent/Peer.java
new file mode 100644 (file)
index 0000000..c5a6653
--- /dev/null
@@ -0,0 +1,601 @@
+package bittorrent;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.simgrid.msg.Comm;
+import org.simgrid.msg.Host;
+import org.simgrid.msg.Msg;
+import org.simgrid.msg.MsgException;
+import org.simgrid.msg.RngStream;
+import org.simgrid.msg.Process;
+import org.simgrid.msg.Task;
+
+/**
+ * Main class for peers execution
+ */
+public class Peer extends Process {
+       protected int round = 0;
+       
+       protected double deadline;
+       
+       protected static RngStream stream = new RngStream();
+       
+       protected int id;
+       protected String mailbox;
+       protected String mailboxTracker;
+       protected String hostname;
+       protected int pieces = 0;
+       protected char[] bitfield = new char[Common.FILE_PIECES];
+       protected short[] piecesCount = new short[Common.FILE_PIECES];
+       
+       protected int piecesRequested = 0;
+       
+       protected ArrayList<Integer> currentPieces = new ArrayList<Integer>();
+       protected int currentPiece = -1;
+
+       protected HashMap<Integer, Connection> activePeers = new HashMap<Integer, Connection>();        
+       protected HashMap<Integer, Connection> peers = new HashMap<Integer, Connection>();
+       
+       protected Comm commReceived = null;
+
+       public Peer(Host host, String name, String[]args) {
+               super(host,name,args);
+       }       
+       
+       @Override
+       public void main(String[] args) throws MsgException {
+               //Check arguments
+               if (args.length != 3 && args.length != 2) {
+                       Msg.info("Wrong number of arguments");
+               }
+               if (args.length == 3) {
+                       init(Integer.valueOf(args[0]),true);
+               }
+               else {
+                       init(Integer.valueOf(args[0]),false);
+               }
+               //Retrieve the deadline
+               deadline = Double.valueOf(args[1]);
+               if (deadline < 0) {
+                       Msg.info("Wrong deadline supplied");
+                       return;
+               }
+               Msg.info("Hi, I'm joining the network with id " + id);
+               //Getting peer data from the tracker
+               if (getPeersData()) {
+                       Msg.debug("Got " + peers.size() + " peers from the tracker");
+                       Msg.debug("Here is my current status: " + getStatus());
+                       if (hasFinished()) {
+                               pieces = Common.FILE_PIECES;
+                               sendHandshakeAll();
+                               seedLoop();
+                       }
+                       else {
+                               leechLoop();
+                               seedLoop();
+                       }
+               }
+               else {
+                       Msg.info("Couldn't contact the tracker.");
+               }
+               Msg.info("Here is my current status: " + getStatus());
+       }
+       /**
+        * Peer main loop when it is leeching.
+        */
+       private void leechLoop() {
+               double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
+               Msg.debug("Start downloading.");
+               /**
+                * Send a "handshake" message to all the peers it got
+                * (it couldn't have gotten more than 50 peers anyway)
+                */
+               sendHandshakeAll();
+               //Wait for at least one "bitfield" message.
+               waitForPieces();
+               Msg.debug("Starting main leech loop");
+               while (Msg.getClock() < deadline && pieces < Common.FILE_PIECES) {
+                       if (commReceived == null) {
+                               commReceived = Task.irecv(mailbox);
+                       }
+                       try {
+                               if (commReceived.test()) {
+                                       handleMessage(commReceived.getTask());
+                                       commReceived = null;
+                               }
+                               else {
+                                       //If the user has a pending interesting
+                                       if (currentPiece != -1) {
+                                               sendInterestedToPeers();
+                                       }
+                                       else {
+                                               if (currentPieces.size() < Common.MAX_PIECES) {
+                                                       updateCurrentPiece();
+                                               }
+                                       }
+                                       //We don't execute the choke algorithm if we don't already have a piece
+                                       if (Msg.getClock() >= nextChokedUpdate && pieces > 0) {
+                                               updateChokedPeers();
+                                               nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
+                                       }
+                                       else {
+                                               waitFor(1);
+                                       }
+                               }
+                       }
+                       catch (MsgException e) {
+                               commReceived = null;                            
+                       }
+               }
+       }
+       
+       /**
+        * Peer main loop when it is seeding
+        */
+       private void seedLoop() {
+               double nextChokedUpdate = Msg.getClock() + Common.UPDATE_CHOKED_INTERVAL;
+               Msg.debug("Start seeding.");
+               //start the main seed loop
+               while (Msg.getClock() < deadline) {
+                       if (commReceived == null) {
+                               commReceived = Task.irecv(mailbox);
+                       }
+                       try {
+                               if (commReceived.test()) {
+                                       handleMessage(commReceived.getTask());
+                                       commReceived = null;
+                               }
+                               else {
+                                       if (Msg.getClock() >= nextChokedUpdate) {
+                                               updateChokedPeers();
+                                               //TODO: Change the choked peer algorithm when seeding
+                                               nextChokedUpdate += Common.UPDATE_CHOKED_INTERVAL;
+                                       }
+                                       else {
+                                               waitFor(1);
+                                       }
+                               }
+                       }
+                       catch (MsgException e) {
+                               commReceived = null;                            
+                       }
+
+               }
+       }
+       
+       /**
+        * Initialize the various peer data
+        * @param id id of the peer to take in the network
+        * @param seed indicates if the peer is a seed
+        */
+       private void init(int id, boolean seed) {
+               this.id = id;
+               this.mailbox = Integer.toString(id);
+               this.mailboxTracker = "tracker_" + Integer.toString(id);
+               if (seed) {
+                       for (int i = 0; i < bitfield.length; i++) {
+                               bitfield[i] = '1';
+                       }
+               }
+               else {
+                       for (int i = 0; i < bitfield.length; i++) {
+                               bitfield[i] = '0';
+                       }                       
+               }
+               this.hostname = host.getName();
+       }
+       /**
+        * Retrieves the peer list from the tracker
+        */
+       private boolean getPeersData() {
+               
+               boolean success = false, sendSuccess = false;
+               double timeout = Msg.getClock() + Common.GET_PEERS_TIMEOUT;
+               //Build the task to send to the tracker
+               TrackerTask taskSend = new TrackerTask(hostname, mailboxTracker, id);
+                       
+               while (!sendSuccess && Msg.getClock() < timeout) {
+                       try {
+                               Msg.debug("Sending a peer request to the tracker.");
+                               taskSend.send(Common.TRACKER_MAILBOX,Common.GET_PEERS_TIMEOUT);
+                               sendSuccess = true;
+                       }
+                       catch (MsgException e) {
+                               
+                       }
+               }
+               while (!success && Msg.getClock() < timeout) {
+                       commReceived = Task.irecv(this.mailboxTracker);
+                       try {
+                               commReceived.waitCompletion(Common.GET_PEERS_TIMEOUT);
+                               if (commReceived.getTask() instanceof TrackerTask) {
+                                       TrackerTask task = (TrackerTask)commReceived.getTask();
+                                       for (Integer peerId: task.peers) {
+                                               if (peerId != this.id) {
+                                                       peers.put(peerId, new Connection(peerId));
+                                               }       
+                                       }
+                                       success = true;
+                               }
+                       }
+                       catch (MsgException e) {
+                               
+                       }
+                       commReceived = null;
+               }
+               commReceived = null;
+               return success;
+       }
+       /**
+        * Handle a received message sent by another peer
+        * @param task task received.
+        */
+       void handleMessage(Task task) {
+               MessageTask message = (MessageTask)task;
+               Connection remotePeer = peers.get(message.peerId);
+               switch (message.type) {
+                       case HANDSHAKE:
+                               Msg.debug("Received a HANDSHAKE message from " + message.mailbox);
+                               //Check if the peer is in our connection list
+                               if (remotePeer == null) {
+                                       peers.put(message.peerId, new Connection(message.peerId));
+                                       sendHandshake(message.mailbox);
+                               }
+                               //Send our bitfield to the pair
+                               sendBitfield(message.mailbox);
+                       break;
+                       case BITFIELD:
+                               Msg.debug("Received a BITFIELD message from " + message.peerId + " (" + message.issuerHostname + ")");
+                               //update the pieces list
+                               updatePiecesCountFromBitfield(message.bitfield);
+                               //Update the current piece
+                               if (currentPiece == -1 && pieces < Common.FILE_PIECES && currentPieces.size() < Common.MAX_PIECES) {
+                                       updateCurrentPiece();
+                               }                               
+                               remotePeer.bitfield  = message.bitfield.clone();
+                       break;
+                       case INTERESTED:
+                               Msg.debug("Received an INTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
+                               assert remotePeer != null;
+                               remotePeer.interested = true;
+                       break;
+                       case NOTINTERESTED:
+                               Msg.debug("Received a NOTINTERESTED message from " + message.peerId + " (" + message.issuerHostname + ")");
+                               assert remotePeer != null;
+                               remotePeer.interested = false;
+                       break;
+                       case UNCHOKE:
+                               Msg.debug("Received an UNCHOKE message from " + message.peerId + "(" + message.issuerHostname + ")");
+                               assert remotePeer != null;
+                               remotePeer.chokedDownload = false;
+                               activePeers.put(remotePeer.id,remotePeer);
+                               sendRequestsToPeer(remotePeer);
+                       break;
+                       case CHOKE:
+                               Msg.debug("Received a CHOKE message from " + message.peerId + " (" + message.issuerHostname + ")");
+                               assert remotePeer != null;
+                               remotePeer.chokedDownload = true;
+                               activePeers.remove(remotePeer.id);
+                       break;
+                       case HAVE:
+                               if (remotePeer.bitfield == null) {
+                                       return;
+                               }
+                               Msg.debug("Received a HAVE message from " + message.peerId + " (" + message.issuerHostname + ")");
+                               assert message.index >= 0 && message.index < Common.FILE_PIECES;
+                               assert remotePeer.bitfield != null;
+                               remotePeer.bitfield[message.index] = '1';
+                               piecesCount[message.index]++; 
+                               //Send interested message to the peer if he has what we want
+                               if (!remotePeer.amInterested && currentPieces.contains(message.index) ) {
+                                       remotePeer.amInterested = true;
+                                       sendInterested(remotePeer.mailbox);
+                               }
+                               
+                               if (currentPieces.contains(message.index)) {
+                                       sendRequest(message.mailbox,message.index);
+                               }
+                       break;
+                       case REQUEST:
+                               assert message.index >= 0 && message.index < Common.FILE_PIECES;
+                               if (!remotePeer.chokedUpload) {
+                                       Msg.debug("Received a REQUEST from " + message.peerId + "(" + message.issuerHostname + ") for " + message.peerId);
+                                       if (bitfield[message.index] == '1') {
+                                               sendPiece(message.mailbox,message.index,false); 
+                                       }
+                                       else {
+                                               Msg.debug("Received a REQUEST from " + message.peerId + " (" + message.issuerHostname + ") but he is choked" );
+                                       }
+                               }
+                       break;
+                       case PIECE:
+                               if (message.stalled) {
+                                       Msg.debug("The received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
+                               }
+                               else {
+                                       Msg.debug("Received piece " + message.index + " from " + message.peerId + " (" + message.issuerHostname + ")");
+                                       if (bitfield[message.index] == '0') {
+                                               piecesRequested--;
+                                               //Removing the piece from our piece list.
+                                               if (!currentPieces.remove((Object)Integer.valueOf(message.index))) {
+                                               }
+                                               //Setting the fact that we have the piece
+                                               bitfield[message.index] = '1';
+                                               pieces++;
+                                               Msg.debug("My status is now " + getStatus());
+                                               //Sending the information to all the peers we are connected to
+                                               sendHave(message.index);
+                                               //sending UNINTERSTED to peers that doesn't have what we want.
+                                               updateInterestedAfterReceive();
+                                       }
+                                       else {
+                                               Msg.debug("However, we already have it.");
+                                       }
+                               }
+                       break;
+               }
+       }
+       /**
+        * Wait for the node to receive interesting bitfield messages (ie: non empty)
+        * to be received
+        */
+       void waitForPieces() {
+               boolean finished = false;
+               while (Msg.getClock() < deadline && !finished) {
+                       if (commReceived == null) {
+                               commReceived = Task.irecv(mailbox);
+                       }
+                       try {
+                               commReceived.waitCompletion(Common.TIMEOUT_MESSAGE);
+                               handleMessage(commReceived.getTask());
+                               if (currentPiece != -1) {
+                                       finished = true;
+                               }
+                               commReceived = null;
+                       }
+                       catch (MsgException e) {
+                               commReceived = null;
+                       }
+               }
+       }
+       
+       private boolean hasFinished() {
+               for (int i = 0; i < bitfield.length; i++) {
+                       if (bitfield[i] == '1') {
+                               return true;
+                       }
+               }
+               return false;
+       }
+       /**
+        * Updates the list of who has a piece from a bitfield
+        * @param bitfield bitfield
+        */
+       private void updatePiecesCountFromBitfield(char bitfield[]) {
+               for (int i = 0; i < Common.FILE_PIECES; i++) {
+                       if (bitfield[i] == '1') {
+                               piecesCount[i]++;
+                       }
+               }
+       }
+       /**
+        * Update the piece the peer is currently interested in.
+        * There is two cases (as described in "Bittorrent Architecture Protocol", Ryan Toole :
+        * If the peer has less than 3 pieces, he chooses a piece at random.
+        * If the peer has more than pieces, he downloads the pieces that are the less
+        * replicated
+        */
+       void updateCurrentPiece() {
+               if (currentPieces.size() >= (Common.FILE_PIECES - pieces)) {
+                       return;
+               }
+               if (true || pieces < 3) {
+                       int i = 0, peerPiece;
+                       do {
+                               currentPiece = stream.randInt(0,Common.FILE_PIECES - 1);
+                               i++;
+                       } while (!(bitfield[currentPiece] == '0' && !currentPieces.contains(currentPiece)));
+               }
+               else {
+                       //trivial min algorithm.
+                       //TODO
+               }
+               currentPieces.add(currentPiece);
+               Msg.debug("New interested piece: " + currentPiece);
+               assert currentPiece >= 0 && currentPiece < Common.FILE_PIECES;
+       }
+       /**
+        * Update the list of current choked and unchoked peers, using the
+        * choke algorithm
+        */
+       private void updateChokedPeers() {
+               round = (round + 1) % 3;
+               if (peers.size() == 0) {
+                       return;
+               }
+               //remove a peer from the list
+               Iterator<Entry<Integer, Connection>> it = activePeers.entrySet().iterator();
+               if (it.hasNext()) {
+                       Entry<Integer,Connection> e = it.next();
+                       Connection peerChoked = e.getValue();
+                       sendChoked(peerChoked.mailbox);
+                       peerChoked.chokedUpload = true;
+                       activePeers.remove(e.getKey());
+               }
+               //Random optimistic unchoking
+               if (round == 0 || true) {
+                       int j = 0, i;
+                       Connection peerChoosed = null;
+                       do {
+                               i = 0;
+                               int idChosen = ((int)Msg.getClock() + j) % peers.size();
+                               for (Connection connection : peers.values()) {
+                                       if (i == idChosen) {
+                                               peerChoosed = connection;
+                                               break;
+                                       }
+                                       i++;
+                               } //TODO: Not really the best way ever
+                               if (!peerChoosed.interested) {
+                                       peerChoosed = null;
+                               }
+                               j++;
+                       } while (peerChoosed == null && j < Common.MAXIMUM_PAIRS);
+                       if (peerChoosed != null) {
+                               activePeers.put(peerChoosed.id,peerChoosed);
+                               peerChoosed.chokedUpload = false;
+                               sendUnchoked(peerChoosed.mailbox);
+                       }
+               }
+               //TODO: Use the leecher choke algorithm.
+       }
+       /**     
+        * Updates our "interested" state about peers: send "not interested" to peers
+        * that don't have any more pieces we want.
+        */
+       private void updateInterestedAfterReceive() {
+               boolean interested;
+               for (Connection connection : peers.values()) {
+                       interested = false;
+                       if (connection.amInterested) {
+                               for (Integer piece : currentPieces) {
+                                       if (connection.bitfield[piece] == '1') {
+                                               interested = true;
+                                               break;
+                                       }
+                               }       
+                               if (!interested) {
+                                       connection.amInterested = false;
+                                       sendNotInterested(connection.mailbox);
+                               }
+                       }
+               }
+       }
+       /**
+        * Send request messages to a peer that have unchoked us
+        * @param remotePeer peer data to the peer we want to send the request
+        */
+       private void sendRequestsToPeer(Connection remotePeer) {
+               for (Integer piece : currentPieces) {
+                       if (remotePeer.bitfield != null && remotePeer.bitfield[piece] == '1') {
+                               sendRequest(remotePeer.mailbox, piece);
+                       }                       
+               }
+       }       
+       /**
+        * Find the peers that have the current interested piece and send them
+        * the "interested" message
+        */
+       private void sendInterestedToPeers() {
+               if (currentPiece == -1) {
+                       return;
+               }
+               for (Connection connection : peers.values()) {
+                       if (connection.bitfield != null && connection.bitfield[currentPiece] == '1' && !connection.amInterested) {
+                               connection.amInterested = true;                         
+                               MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
+                               task.dsend(connection.mailbox);                         
+                       }
+               }
+               currentPiece = -1;
+               piecesRequested++;
+       }
+       /**
+        * Send a "interested" message to a peer.
+        */
+       private void sendInterested(String mailbox) {
+               MessageTask task = new MessageTask(MessageTask.Type.INTERESTED, hostname, this.mailbox, this.id);
+               task.dsend(mailbox);                                            
+       }
+       /**
+        * Send a "not interested" message to a peer
+        * @param mailbox mailbox destination mailbox
+        */
+       private void sendNotInterested(String mailbox) {
+               MessageTask task = new MessageTask(MessageTask.Type.NOTINTERESTED, hostname, this.mailbox, this.id);
+               task.dsend(mailbox);                            
+       }
+       /**
+        * Send a handshake message to all the peers the peer has.
+        * @param peer peer data
+        */
+       private void sendHandshakeAll() {
+               for (Connection remotePeer : peers.values()) {
+                       MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, mailbox,
+                       id);
+                       task.dsend(remotePeer.mailbox);
+               }
+       }
+       /**
+        * Send a "handshake" message to an user
+        * @param mailbox mailbox where to we send the message
+        */
+       private void sendHandshake(String mailbox) {
+               Msg.debug("Sending a HANDSHAKE to " + mailbox);
+               MessageTask task = new MessageTask(MessageTask.Type.HANDSHAKE, hostname, this.mailbox, this.id);
+               task.dsend(mailbox);            
+       }
+       /**
+        * Send a "choked" message to a peer
+        */
+       private void sendChoked(String mailbox) {
+               Msg.debug("Sending a CHOKE to " + mailbox);
+               MessageTask task = new MessageTask(MessageTask.Type.CHOKE, hostname, this.mailbox, this.id);
+               task.dsend(mailbox);
+       }
+       /**
+        * Send a "unchoked" message to a peer
+        */
+       private void sendUnchoked(String mailbox) {
+               Msg.debug("Sending a UNCHOKE to " + mailbox);
+               MessageTask task = new MessageTask(MessageTask.Type.UNCHOKE, hostname, this.mailbox, this.id);
+               task.dsend(mailbox);
+       }
+       /**
+        * Send a "HAVE" message to all peers we are connected to
+        */
+       private void sendHave(int piece) {
+               Msg.debug("Sending HAVE message to all my peers");
+               for (Connection remotePeer : peers.values()) {
+                       MessageTask task = new MessageTask(MessageTask.Type.HAVE, hostname, this.mailbox, this.id, piece);
+                       task.dsend(remotePeer.mailbox);
+               }
+       }
+       /**
+        * Send a bitfield message to all the peers the peer has.
+        * @param peer peer data
+        */
+       private void sendBitfield(String mailbox) {
+               Msg.debug("Sending a BITFIELD to " + mailbox);
+               MessageTask task = new MessageTask(MessageTask.Type.BITFIELD, hostname, this.mailbox, this.id, this.bitfield);
+               task.dsend(mailbox);
+       }
+       /**
+        * Send a "request" message to a pair, containing a request for a piece
+        */
+       private void sendRequest(String mailbox, int piece) {
+               Msg.debug("Sending a REQUEST to " + mailbox + " for piece " + piece);
+               MessageTask task = new MessageTask(MessageTask.Type.REQUEST, hostname, this.mailbox, this.id, piece);
+               task.dsend(mailbox);
+       }
+       /**
+        * Send a "piece" message to a pair, containing a piece of the file
+        */
+       private void sendPiece(String mailbox, int piece, boolean stalled) {
+               Msg.debug("Sending the PIECE " + piece + " to " + mailbox);
+               MessageTask task = new MessageTask(MessageTask.Type.PIECE, hostname, this.mailbox, this.id, piece, stalled);
+               task.dsend(mailbox);
+       }
+       
+       private String getStatus() {
+               String s = "";
+               for (int i = 0; i < Common.FILE_PIECES; i++) {
+                       s = s + bitfield[i];
+               }
+               return s;
+       }
+}
+       
diff --git a/examples/bittorrent/Tracker.java b/examples/bittorrent/Tracker.java
new file mode 100644 (file)
index 0000000..8f0d7f7
--- /dev/null
@@ -0,0 +1,89 @@
+package bittorrent;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.simgrid.msg.Comm;
+import org.simgrid.msg.Host;
+import org.simgrid.msg.Process;
+import org.simgrid.msg.Msg;
+import org.simgrid.msg.MsgException;
+import org.simgrid.msg.Task;
+
+import org.simgrid.msg.RngStream;
+/**
+ * Tracker, handle requests from peers.
+ */
+public class Tracker extends Process {
+       protected RngStream stream;
+       /**
+        * Peers list
+        */
+       protected ArrayList<Integer> peersList;
+       /**
+        * End time for the simulation
+        */
+       protected double deadline;
+       /**
+        * Current comm received
+        */
+       protected Comm commReceived = null;
+       
+       public Tracker(Host host, String name, String[]args) {
+               super(host,name,args);
+       }
+       
+       @Override
+       public void main(String[] args) throws MsgException {
+               if (args.length != 1) {
+                       Msg.info("Wrong number of arguments for the tracker.");
+                       return;
+               }
+               //Build the RngStream object for randomness
+               stream = new RngStream("tracker");
+               //Retrieve the end time
+               deadline = Double.valueOf(args[0]);
+               //Building peers array
+               peersList = new ArrayList<Integer>();
+               
+               Msg.info("Tracker launched.");          
+               while (Msg.getClock() < deadline) {
+                       if (commReceived == null) {
+                               commReceived = Task.irecv(Common.TRACKER_MAILBOX);
+                       }
+                       try {
+                               if (commReceived.test()) {
+                                       Task task = commReceived.getTask();
+                                       if (task instanceof TrackerTask) {
+                                               TrackerTask tTask = (TrackerTask)task;
+                                               //Sending peers to the peer
+                                               int nbPeers = 0;
+                                               while (nbPeers < Common.MAXIMUM_PAIRS && nbPeers < peersList.size()) {
+                                                       int nextPeer;
+                                                       do {
+                                                               nextPeer = stream.randInt(0, peersList.size() - 1);
+                                                       } while (tTask.peers.contains(nextPeer));
+                                                       tTask.peers.add(peersList.get(nextPeer));
+                                                       nbPeers++;
+                                               }
+                                               //Adding the peer to our list
+                                               peersList.add(tTask.peerId);
+                                               tTask.type = TrackerTask.Type.ANSWER;
+                                               //Setting the interval
+                                               tTask.interval = Common.TRACKER_QUERY_INTERVAL;
+                                               //Sending the task back to the peer
+                                               tTask.dsend(tTask.mailbox);
+                                       }
+                                       commReceived = null;
+                               }
+                               else {
+                                       waitFor(1);
+                               }
+                       }
+                       catch (MsgException e) {
+                               commReceived = null;                            
+                       }
+               }
+               Msg.info("Tracker is leaving");
+       }
+
+}
diff --git a/examples/bittorrent/TrackerTask.java b/examples/bittorrent/TrackerTask.java
new file mode 100644 (file)
index 0000000..41cac26
--- /dev/null
@@ -0,0 +1,43 @@
+package bittorrent;
+import java.util.ArrayList;
+
+import org.simgrid.msg.Task;
+
+/**
+ * Task exchanged between the tracker
+ * and the peers. 
+ */
+public class TrackerTask extends Task {
+       /**
+        * Type of the tasks
+        */
+       public enum Type {
+               REQUEST,
+               ANSWER
+       };
+       public Type type;
+       public String hostname;
+       public String mailbox;
+       public int peerId;
+       public int uploaded;
+       public int downloaded;
+       public int left;
+       public double interval;
+       public ArrayList<Integer> peers;
+       
+       public TrackerTask(String hostname, String mailbox, int peerId) {
+               this(hostname, mailbox, peerId, 0, 0, Common.FILE_SIZE);
+       }       
+       public TrackerTask(String hostname, String mailbox, int peerId, int uploaded, int downloaded, int left) {
+               super("", 0, Common.TRACKER_COMM_SIZE);
+               this.type = Type.REQUEST;
+               this.hostname = hostname;
+               this.mailbox = mailbox;
+               this.peerId = peerId;
+               this.uploaded = uploaded;
+               this.downloaded = downloaded;
+               this.left = left;
+               this.peers = new ArrayList<Integer>();
+       }
+       
+}
diff --git a/examples/bittorrent/bittorrent.tesh b/examples/bittorrent/bittorrent.tesh
new file mode 100644 (file)
index 0000000..3919b49
--- /dev/null
@@ -0,0 +1,27 @@
+#! ./tesh
+
+! output sort
+
+$ java -cp .:${srcdir:=.}/examples:${srcdir:=.}/simgrid.jar bittorrent/Bittorrent ${srcdir:=.}/examples/platform.xml ${srcdir:=.}/examples/bittorrent/bittorrent.xml
+> [0.000000] [jmsg/INFO] Ready to run MSG_MAIN
+> [5000.832370] [jmsg/INFO] Done running MSG_MAIN
+> [5000.832370] [jmsg/INFO] MSG_main finished
+> [5000.832370] [jmsg/INFO] Clean java world
+> [5000.832370] [jmsg/INFO] Clean native world
+> [Boivin:bittorrent.Peer:(2) 0.000000] [jmsg/INFO] Hi, I'm joining the network with id 2
+> [Boivin:bittorrent.Peer:(2) 5000.048882] [jmsg/INFO] Here is my current status: 1111111111
+> [Disney:bittorrent.Peer:(6) 0.000000] [jmsg/INFO] Hi, I'm joining the network with id 6
+> [Disney:bittorrent.Peer:(6) 5000.131405] [jmsg/INFO] Here is my current status: 1111111111
+> [Geoff:bittorrent.Peer:(5) 0.000000] [jmsg/INFO] Hi, I'm joining the network with id 5
+> [Geoff:bittorrent.Peer:(5) 5000.764583] [jmsg/INFO] Here is my current status: 1111111111
+> [Jacquelin:bittorrent.Tracker:(1) 0.000000] [jmsg/INFO] Tracker launched.
+> [Jacquelin:bittorrent.Tracker:(1) 3000.000000] [jmsg/INFO] Tracker is leaving
+> [Jean_Yves:bittorrent.Peer:(3) 0.000000] [jmsg/INFO] Hi, I'm joining the network with id 3
+> [Jean_Yves:bittorrent.Peer:(3) 5000.832370] [jmsg/INFO] Here is my current status: 1111111111
+> [McGee:bittorrent.Peer:(8) 0.000000] [jmsg/INFO] Hi, I'm joining the network with id 8
+> [McGee:bittorrent.Peer:(8) 5000.783574] [jmsg/INFO] Here is my current status: 1111111111
+> [TeX:bittorrent.Peer:(4) 0.000000] [jmsg/INFO] Hi, I'm joining the network with id 4
+> [TeX:bittorrent.Peer:(4) 5000.304959] [jmsg/INFO] Here is my current status: 1111111111
+> [iRMX:bittorrent.Peer:(7) 0.000000] [jmsg/INFO] Hi, I'm joining the network with id 7
+> [iRMX:bittorrent.Peer:(7) 5000.729205] [jmsg/INFO] Here is my current status: 1111111111
+
diff --git a/examples/bittorrent/bittorrent.xml b/examples/bittorrent/bittorrent.xml
new file mode 100644 (file)
index 0000000..0699cac
--- /dev/null
@@ -0,0 +1,39 @@
+<?xml version='1.0'?>
+<!DOCTYPE platform SYSTEM "http://simgrid.gforge.inria.fr/simgrid.dtd">
+<platform version="3">
+
+  <process host="Jacquelin" function="bittorrent.Tracker">
+    <argument value="3000" />                  
+  </process>
+
+  <process host="Boivin" function="bittorrent.Peer">
+    <argument value="00000002"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+    <argument value="1" />                     <!-- indicates if the bittorrent.Peer is a seed at the begining of the simulation -->   
+  </process>
+  <process host="Jean_Yves" function="bittorrent.Peer">
+    <argument value="00000003"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+  </process>
+  <process host="TeX" function="bittorrent.Peer">
+    <argument value="00000004"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+  </process>
+  <process host="Geoff" function="bittorrent.Peer">
+    <argument value="00000005"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+  </process>
+  <process host="Disney" function="bittorrent.Peer">
+    <argument value="00000006"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+  </process>
+  <process host="iRMX" function="bittorrent.Peer">
+    <argument value="00000007"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+  </process>
+  <process host="McGee" function="bittorrent.Peer">
+    <argument value="00000008"/>        <!-- my id -->
+    <argument value="5000" />                  <!-- end time -->       
+  </process>
+
+</platform>
diff --git a/examples/bittorrent/generate.py b/examples/bittorrent/generate.py
new file mode 100755 (executable)
index 0000000..9ba8611
--- /dev/null
@@ -0,0 +1,41 @@
+#!/usr/bin/python
+
+# This script generates a specific deployment file for the Bittorrent example.
+# It assumes that the platform will be a cluster.
+# Usage: python generate.py nb_nodes nb_bits end_date percentage
+# Example: python generate.py 10000 5000
+
+import sys, random
+
+if len(sys.argv) != 4:
+       print("Usage: python generate.py nb_nodes end_date seed_percentage > deployment_file.xml")
+       sys.exit(1)
+
+nb_nodes = int(sys.argv[1])
+end_date = int(sys.argv[2])
+seed_percentage = int(sys.argv[3]);
+
+nb_bits = 24
+max_id = 2 ** nb_bits - 1
+all_ids = [42]
+
+sys.stdout.write("<?xml version='1.0'?>\n"
+"<!DOCTYPE platform SYSTEM \"http://simgrid.gforge.inria.fr/simgrid.dtd\">\n"
+"<platform version=\"3\">\n"
+"  <process host=\"c-0.me\" function=\"bittorrent.Tracker\"><argument value=\"%d\"/></process>\n" % end_date)
+
+for i in range(1, nb_nodes):
+
+  ok = False
+  while not ok:
+    my_id = random.randint(0, max_id)
+    ok = not my_id in all_ids
+  start_date = i * 10
+  line = "  <process host=\"c-%d.me\" function=\"bittorrent.Peer\"><argument value=\"%d\" /><argument value=\"%d\" />" % (i, my_id, end_date)
+  if random.randint(0,100) < seed_percentage:
+    line += "<argument value=\"1\" />"
+  line += "</process>\n";
+  sys.stdout.write(line)
+  all_ids.append(my_id)
+sys.stdout.write("</platform>")
+
diff --git a/examples/chord/chord.tesh b/examples/chord/chord.tesh
new file mode 100644 (file)
index 0000000..7f950b6
--- /dev/null
@@ -0,0 +1,17 @@
+#! ./tesh
+
+! output sort
+
+$ java -cp .:${srcdir:=.}/examples:${srcdir:=.}/simgrid.jar chord/Chord ${srcdir:=.}/examples/platform.xml ${srcdir:=.}/examples/chord/chord.xml
+> [0.000000] [jmsg/INFO] Ready to run MSG_MAIN
+> [652.548539] [jmsg/INFO] Done running MSG_MAIN
+> [652.548539] [jmsg/INFO] MSG_main finished
+> [652.548539] [jmsg/INFO] Clean java world
+> [652.548539] [jmsg/INFO] Clean native world
+> [Boivin:chord.Node:(7) 0.000000] [jmsg/INFO] Joining the ring with id 8 knowing node 1
+> [Gatien:chord.Node:(1) 0.000000] [jmsg/INFO] Joining the ring with id 48 knowing node 1
+> [Geoff:chord.Node:(4) 0.000000] [jmsg/INFO] Joining the ring with id 32 knowing node 1
+> [Jean_Yves:chord.Node:(6) 0.000000] [jmsg/INFO] Joining the ring with id 14 knowing node 1
+> [McGee:chord.Node:(2) 0.000000] [jmsg/INFO] Joining the ring with id 42 knowing node 1
+> [TeX:chord.Node:(5) 0.000000] [jmsg/INFO] Joining the ring with id 21 knowing node 1
+> [iRMX:chord.Node:(3) 0.000000] [jmsg/INFO] Joining the ring with id 38 knowing node 1
\ No newline at end of file
index d3d9799..423371d 100644 (file)
@@ -15,7 +15,11 @@ public class Comm {
        /**
         * Indicates if the communication is a receiving communication
         */
-       boolean receiving;
+       protected boolean receiving;
+       /**
+        * Indicates if the communication is finished
+        */
+       protected boolean finished = false;
        /**
         * Represents the bind between the java comm and the
         * native C comm. You must never access it, since it is 
index abdd52c..82e6c4f 100644 (file)
@@ -224,7 +224,6 @@ public abstract class Process implements Runnable {
 
        /**
         * This method kill a process.
-        * @param process  the process to be killed.
         *
         */
        public native void kill();
diff --git a/org/simgrid/msg/RngStream.java b/org/simgrid/msg/RngStream.java
new file mode 100644 (file)
index 0000000..4b9b734
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * JNI interface to C RngStream code
+ * 
+ * Copyright 2006,2007,2010,2012 The SimGrid Team.           
+ * All right reserved. 
+ *
+ * This program is free software; you can redistribute 
+ * it and/or modify it under the terms of the license 
+ * (GNU LGPL) which comes with this package.
+ */
+package org.simgrid.msg;
+/**
+ * Export of RngStreams for Java
+ */
+public class RngStream {
+       /**
+        * Represents the bind between the RngStream java object and the C object.
+        */
+       public long bind;
+       /**
+        * Creates and returns a new stream without identifier. 
+        * This procedure reserves space to keep the information relative to
+        * the RngStream, initializes its seed Ig , sets Bg and Cg equal to Ig , sets its antithetic and
+        * precision switches to 0. The seed Ig is equal to the initial seed of the package given by
+        * setPackageSeed if this is the first stream created, otherwise it is Z steps ahead
+        * of that of the most recently created stream.
+        */
+       public RngStream() {
+               create("");
+       }
+       /**
+        * Creates and returns a new stream with identifier "name". 
+        * This procedure reserves space to keep the information relative to
+        * the RngStream, initializes its seed Ig , sets Bg and Cg equal to Ig , sets its antithetic and
+        * precision switches to 0. The seed Ig is equal to the initial seed of the package given by
+        * setPackageSeed if this is the first stream created, otherwise it is Z steps ahead
+        * of that of the most recently created stream.
+        */
+       public RngStream(String name) {
+               create(name);
+       }
+       /**
+        * The natively implemented method to create a C RngStream object.
+        */
+       private native void create(String name);
+       /**
+        * Destructor
+        */
+       protected void finalize() {
+               destroy();
+       }
+       /**
+        * Release the C RngStream object
+        */
+       private native void destroy();
+       
+       /**
+        * Sets the initial seed of the package RngStreams to the six integers in the vector seed. This will
+        * be the seed (initial state) of the first stream. If this procedure is not called, the default initial
+        * seed is (12345, 12345, 12345, 12345, 12345, 12345). If it is called, the first 3 values of the seed
+        * must all be less than m1 = 4294967087, and not all 0; and the last 3 values must all be less
+        * than m2 = 4294944443, and not all 0. Returns false for invalid seeds, and true otherwise.
+        */
+       public static native boolean setPackageSeed(int seed[]);
+       /**
+        * Reinitializes the stream g to its initial state: Cg and Bg are set to Ig .
+        */
+       public native void resetStart();
+       /**
+        * Reinitializes the stream g to the beginning of its current substream: Cg is set to Bg .
+        */
+       public native void restartStartSubstream();
+       /**
+        * Reinitializes the stream g to the beginning of its next substream: Ng is computed, and Cg and
+        * Bg are set to Ng .
+        */
+       public native void resetNextSubstream();
+       /**
+        * If a = true the stream g will start generating antithetic variates, i.e., 1 − U instead of U , until
+        *  this method is called again with a = false.
+        */
+       public native void setAntithetic(boolean a);
+       /**
+         * Sets the initial seed Ig of stream g to the vector seed. This vector must satisfy the same
+         * conditions as in setPackageSeed. The stream is then reset to this initial seed. The
+         * states and seeds of the other streams are not modified. As a result, after calling this procedure,
+         * the initial seeds of the streams are no longer spaced Z values apart. We discourage the use of
+         * this procedure. Returns false for invalid seeds, and true otherwise.
+        */
+       public native boolean setSeed(int seed[]);
+       /**
+         * Advances the state of the stream by k values, without modifying the states of other streams (as
+         * in RngStream_SetSeed), nor the values of Bg and Ig associated with this stream. If e > 0, then
+         * k = 2e + c; if e < 0, then k = −2−e + c; and if e = 0, then k = c. Note: c is allowed to take
+         *negative values. We discourage the use of this procedure.     
+         */
+       public native void advanceState(int e, int g);
+       
+       /**
+        * Returns a (pseudo)random number from the uniform distribution over the interval (0, 1), after advancing the state by one step. The returned number has 32 bits of precision
+        * in the sense that it is always a multiple of 1/(232 − 208), unless RngStream_IncreasedPrecis
+        * has been called for this stream.
+        */
+       public native double randU01();
+       /**
+        * Returns a (pseudo)random number from the discrete uniform distribution over the integers
+        * {i, i + 1, . . . , j}
+        */
+       public native int randInt(int i, int j);
+       
+       /**
+        * Class initializer, to initialize various JNI stuff
+        */
+       public static native void nativeInit();
+       static {
+               nativeInit();
+       }
+}
\ No newline at end of file
index 48638c0..976cf7e 100644 (file)
@@ -8,10 +8,11 @@
 #include <msg/msg.h>
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(jmsg);
 
-static jfieldID jtask_field_Comm_task;
 static jfieldID jcomm_field_Comm_bind;
-static jfieldID jcomm_field_Comm_taskBind;
+static jfieldID jcomm_field_Comm_finished;
 static jfieldID jcomm_field_Comm_receiving;
+static jfieldID jtask_field_Comm_task;
+static jfieldID jcomm_field_Comm_taskBind;
 
 void jcomm_bind_task(JNIEnv *env, jobject jcomm) {
        msg_comm_t comm = (msg_comm_t) (long) (*env)->GetLongField(env, jcomm, jcomm_field_Comm_bind);
@@ -49,7 +50,8 @@ Java_org_simgrid_msg_Comm_nativeInit(JNIEnv *env, jclass cls) {
        jcomm_field_Comm_taskBind  = jxbt_get_jfield(env, jfield_class_Comm, "taskBind", "J");
        jcomm_field_Comm_receiving = jxbt_get_jfield(env, jfield_class_Comm, "receiving", "Z");
        jtask_field_Comm_task = jxbt_get_jfield(env, jfield_class_Comm, "task", "Lorg/simgrid/msg/Task;");
-       if (!jcomm_field_Comm_bind || !jcomm_field_Comm_taskBind || !jcomm_field_Comm_receiving || !jtask_field_Comm_task) {
+       jcomm_field_Comm_finished = jxbt_get_jfield(env, jfield_class_Comm, "finished", "Z");
+       if (!jcomm_field_Comm_bind || !jcomm_field_Comm_taskBind || !jcomm_field_Comm_receiving || !jtask_field_Comm_task || !jcomm_field_Comm_finished) {
        jxbt_throw_native(env,bprintf("Can't find some fields in Java class."));
        }
 }
@@ -71,6 +73,11 @@ Java_org_simgrid_msg_Comm_test(JNIEnv *env, jobject jcomm) {
        msg_comm_t comm;
        comm = (msg_comm_t) (long) (*env)->GetLongField(env, jcomm, jcomm_field_Comm_bind);
 
+       jboolean finished = (*env)->GetBooleanField(env, jcomm, jcomm_field_Comm_finished);
+       if (finished == JNI_TRUE) {
+               return JNI_TRUE;
+       }
+
        if (!comm) {
                jxbt_throw_native(env,bprintf("comm is null"));
                return JNI_FALSE;
@@ -104,6 +111,12 @@ Java_org_simgrid_msg_Comm_waitCompletion(JNIEnv *env, jobject jcomm, jdouble tim
                jxbt_throw_native(env,bprintf("comm is null"));
                return;
        }
+
+       jboolean finished = (*env)->GetBooleanField(env, jcomm, jcomm_field_Comm_finished);
+       if (finished == JNI_TRUE) {
+               return;
+       }
+
        MSG_error_t status;
        TRY {
                status = MSG_comm_wait(comm,(double)timeout);
@@ -111,6 +124,7 @@ Java_org_simgrid_msg_Comm_waitCompletion(JNIEnv *env, jobject jcomm, jdouble tim
        CATCH_ANONYMOUS {
                return;
        }
+       (*env)->SetBooleanField(env, jcomm, jcomm_field_Comm_finished, JNI_TRUE);
        if (status == MSG_OK) {
                jcomm_bind_task(env,jcomm);
                return;
index e20c317..6b2284d 100644 (file)
@@ -43,7 +43,7 @@ m_process_t jprocess_to_native_process(jobject jprocess, JNIEnv * env)
 
 void jprocess_bind(jobject jprocess, m_process_t process, JNIEnv * env)
 {
-  (*env)->SetLongField(env, jprocess, jprocess_field_Process_bind, (jlong) (long) (process));
+  (*env)->SetLongField(env, jprocess, jprocess_field_Process_bind, (jlong)(process));
 }
 
 jlong jprocess_get_id(jobject jprocess, JNIEnv * env)
@@ -137,10 +137,9 @@ Java_org_simgrid_msg_Process_create(JNIEnv * env,
                                                (xbt_main_func_t) jprocess,
                                                /*data*/ jprocess,
                                                host,
-                                               (double)jkill, /* kill time */
                                                /*argc, argv, properties*/
                                                0,NULL,NULL);
-
+  MSG_process_set_kill_time(process, (double)jkill);
   MSG_process_set_data(process,&process);
   /* bind the java process instance to the native process */
   jprocess_bind(jprocess, process, env);
@@ -275,7 +274,7 @@ JNIEXPORT void JNICALL
 Java_org_simgrid_msg_Process_waitFor(JNIEnv * env, jobject jprocess,
                                      jdouble jseconds)
 {
-  MSG_error_t rv;
+       MSG_error_t rv;
   TRY {
         rv = MSG_process_sleep((double)jseconds);
   }
diff --git a/src/jmsg_rngstream.c b/src/jmsg_rngstream.c
new file mode 100644 (file)
index 0000000..f2ae496
--- /dev/null
@@ -0,0 +1,135 @@
+/* Functions related to the RngStream Java port                         */
+
+/* Copyright (c) 2007, 2009, 2010, 2012. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+  * under the terms of the license (GNU LGPL) which comes with this package. */
+
+
+#include "jmsg_rngstream.h"
+#include "jxbt_utilities.h"
+
+jfieldID jrngstream_bind;
+
+RngStream jrngstream_to_native(JNIEnv *env, jobject jrngstream) {
+       RngStream rngstream = (RngStream) (*env)->GetLongField(env, jrngstream, jrngstream_bind);
+       if (!rngstream) {
+    jxbt_throw_notbound(env, "rngstream", jrngstream);
+    return NULL;
+       }
+       return rngstream;
+}
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_nativeInit(JNIEnv *env, jclass cls) {
+       jclass class_RngStream = (*env)->FindClass(env, "org/simgrid/msg/RngStream");
+
+       jrngstream_bind = jxbt_get_jfield(env, class_RngStream, "bind", "J");
+}
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_create(JNIEnv *env, jobject jrngstream, jstring jname) {
+       const char *name = (*env)->GetStringUTFChars(env, jname, 0);
+       RngStream rngstream = RngStream_CreateStream(name);
+       //Bind the RngStream object
+       (*env)->SetLongField(env, jrngstream, jrngstream_bind, (jlong)rngstream);
+
+  (*env)->ReleaseStringUTFChars(env, jname, name);
+}
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_destroy(JNIEnv *env, jobject jrngstream) {
+       RngStream rngstream = jrngstream_to_native(env, jrngstream);
+       RngStream_DeleteStream(&rngstream);
+       (*env)->SetLongField(env, jrngstream, jrngstream_bind, (jlong)NULL);
+}
+JNIEXPORT jboolean JNICALL
+Java_org_simgrid_msg_RngStream_setPackageSeed(JNIEnv *env, jobject jrngstream, jintArray jseed) {
+               jint buffer[6];
+
+               (*env)->GetIntArrayRegion(env, jseed, 0, 6, buffer);
+
+               RngStream rngstream = jrngstream_to_native(env, jrngstream);
+               if (!rngstream)
+                       return JNI_FALSE;
+
+               int result = RngStream_SetPackageSeed((unsigned long*)buffer);
+
+               return result == -1 ? JNI_FALSE : JNI_TRUE;
+}
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_resetStart(JNIEnv *env, jobject jrngstream) {
+       RngStream rngstream = jrngstream_to_native(env, jrngstream);
+       if (!rngstream)
+               return;
+
+       RngStream_ResetStartStream(rngstream);
+}
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_resetStartSubstream(JNIEnv *env, jobject jrngstream) {
+       RngStream rngstream = jrngstream_to_native(env, jrngstream);
+       if (!rngstream)
+               return;
+
+       RngStream_ResetStartSubstream(rngstream);
+}
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_resetNextSubstream(JNIEnv *env, jobject jrngstream) {
+       RngStream rngstream = jrngstream_to_native(env, jrngstream);
+       if (!rngstream)
+               return;
+
+       RngStream_ResetNextSubstream(rngstream);
+}
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_setAntithetic(JNIEnv *env, jobject jrngstream, jboolean ja) {
+       RngStream rngstream = jrngstream_to_native(env, jrngstream);
+       if (!rngstream)
+               return;
+
+       if (ja == JNI_TRUE) {
+               RngStream_SetAntithetic(rngstream,-1);
+       }
+       else {
+               RngStream_SetAntithetic(rngstream,0);
+       }
+}
+JNIEXPORT jboolean JNICALL
+Java_org_simgrid_msg_RngStream_setSeed(JNIEnv *env, jobject jrngstream, jintArray jseed) {
+       jint buffer[6];
+
+       (*env)->GetIntArrayRegion(env, jseed, 0, 6, buffer);
+
+       RngStream rngstream = jrngstream_to_native(env, jrngstream);
+       if (!rngstream)
+               return JNI_FALSE;
+
+
+       int result = RngStream_SetSeed(rngstream, (unsigned long*)buffer);
+
+       return result == -1 ? JNI_FALSE : JNI_TRUE;
+}
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_advanceState(JNIEnv *env, jobject jrngstream, jint e, jint g) {
+       RngStream rngstream = jrngstream_to_native(env, jrngstream);
+       if (!rngstream)
+               return;
+
+       RngStream_AdvanceState(rngstream, (long)e, (long)g);
+}
+JNIEXPORT jdouble JNICALL
+Java_org_simgrid_msg_RngStream_randU01(JNIEnv *env, jobject jrngstream) {
+       RngStream rngstream = jrngstream_to_native(env, jrngstream);
+       if (!rngstream)
+               return 0;
+
+       return (jdouble)RngStream_RandU01(rngstream);
+}
+JNIEXPORT jint JNICALL
+Java_org_simgrid_msg_RngStream_randInt(JNIEnv *env, jobject jrngstream, jint i, jint j) {
+       RngStream rngstream = jrngstream_to_native(env, jrngstream);
+       if (!rngstream)
+               return 0;
+
+       return (jint)RngStream_RandInt(rngstream, (int)i, (int)j);
+}
diff --git a/src/jmsg_rngstream.h b/src/jmsg_rngstream.h
new file mode 100644 (file)
index 0000000..b8976bb
--- /dev/null
@@ -0,0 +1,51 @@
+/* Functions related to the RngStream Java port                         */
+
+/* Copyright (c) 2007, 2009, 2010, 2012. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+  * under the terms of the license (GNU LGPL) which comes with this package. */
+#ifndef MSG_RNGSTREAM_H
+#define MSG_RNGSTREAM_H
+#include <jni.h>
+#include <xbt/RngStream.h>
+
+RngStream jrngstream_to_native(JNIEnv *env, jobject jrngstream);
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_nativeInit(JNIEnv *env, jclass cls);
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_create(JNIEnv *env, jobject jrngstream, jstring name);
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_destroy(JNIEnv *env, jobject jrngstream);
+
+JNIEXPORT jboolean JNICALL
+Java_org_simgrid_msg_RngStream_setPackageSeed(JNIEnv *env, jobject jrngstream, jintArray seed);
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_resetStart(JNIEnv *env, jobject jrngstream);
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_resetStartSubstream(JNIEnv *env, jobject jrngstream);
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_resetNextSubstream(JNIEnv *env, jobject jrngstream);
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_setAntithetic(JNIEnv *env, jobject jrngstream, jboolean ja);
+
+JNIEXPORT jboolean JNICALL
+Java_org_simgrid_msg_RngStream_setSeed(JNIEnv *env, jobject jrngstream, jintArray jseed);
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_RngStream_advanceState(JNIEnv *env, jobject jrngstream, jint e, jint g);
+
+JNIEXPORT jdouble JNICALL
+Java_org_simgrid_msg_RngStream_randU01(JNIEnv *env, jobject jrngstream);
+
+JNIEXPORT jint JNICALL
+Java_org_simgrid_msg_RngStream_randInt(JNIEnv *env, jobject jrngstream, jint i, jint j);
+
+#endif                          /* MSG_RNGSTREAM_H */