import java.util.Arrays;
import java.util.Hashtable;
import java.util.Vector;
+import java.util.concurrent.Semaphore;
/**
* A process may be defined as a code, with some private data, executing
* The name of the process.
*/
protected String name;
+ /**
+ * The PID of the process
+ */
+ protected int pid = -1;
+ /**
+ * The PPID of the process
+ */
+ protected int ppid = -1;
+ /**
+ * The host of the process
+ */
+ protected Host host = null;
/**
*
* @return
public Vector<String> args;
/* process synchronization tools */
- /**
- *
- */
- /**
- *
- */
- protected Sem schedBegin, schedEnd;
+
+ /* give the full path to semaphore to ensure that our own implementation don't get selected */
+ protected java.util.concurrent.Semaphore schedBegin, schedEnd;
private boolean nativeStop = false;
/**
this.bind = 0;
this.args = new Vector<String>();
this.properties = null;
- schedBegin = new Sem(0);
- schedEnd = new Sem(0);
+ schedBegin = new java.util.concurrent.Semaphore(0);
+ schedEnd = new java.util.concurrent.Semaphore(0);
}
if (null != args)
this.args.addAll(Arrays.asList(args));
- MsgNative.processCreate(this, host);
+ try {
+ MsgNative.processCreate(this, host.getName());
+ } catch (HostNotFoundException e) {
+ throw new RuntimeException("The impossible happend (yet again): the host that I have were not found",e);
+ }
+
}
{
return nativeStop;
}
- /**
- * checks if the flag that indicates that this thread must be killed is set to true; if true, starts to kill it. End users should not have to deal with it
- * If you develop a new MSG native call, please include a call to interruptedStop() at the beginning of your method code, so as the process can be killed if he call
- * your method.
- *
- * @return
- *
- */
- public static void ifInterruptedStop() {
- if ( (Thread.currentThread() instanceof Process) &&((Process) Thread.currentThread()).getNativeStop()) {
- throw new RuntimeException("Interrupted");
- }
- }
-
/**
* This method kill a process.
* @param process the process to be killed.
*
*/
- public static void kill(Process process) {
- process.nativeStop();
- Msg.info("Process " + process.msgName() + " will be killed.");
+ public void kill() {
+ nativeStop();
+ Msg.info("Process " + msgName() + " will be killed.");
}
- /**
- * This method adds an argument in the list of the arguments of the main function
- * of the process.
- *
- * @param arg The argument to add.
- *
- * @deprecated
- */
- @Deprecated
- protected void addArg(String arg) {
- args.add(arg);
- }
/**
* Suspends the process by suspending the task on which it was
*
*/
public void pause() {
- Process.ifInterruptedStop();
MsgNative.processSuspend(this);
}
/**
*
*/
public void restart() {
- Process.ifInterruptedStop();
MsgNative.processResume(this);
}
/**
* Otherwise the method returns false.
*/
public boolean isSuspended() {
- Process.ifInterruptedStop();
return MsgNative.processIsSuspended(this);
}
/**
*
*/
public Host getHost() {
- Process.ifInterruptedStop();
- return MsgNative.processGetHost(this);
+ if (this.host == null) {
+ this.host = MsgNative.processGetHost(this);
+ }
+ return this.host;
}
/**
* This static method gets a process from a PID.
* @exception NativeException on error in the native SimGrid code
*/
public static Process fromPID(int PID) throws NativeException {
- Process.ifInterruptedStop();
return MsgNative.processFromPID(PID);
}
/**
*
*/
public int getPID() {
- Process.ifInterruptedStop();
- return MsgNative.processGetPID(this);
+ if (pid == -1) {
+ pid = MsgNative.processGetPID(this);
+ }
+ return pid;
}
/**
* This method returns the PID of the parent of a process.
*
*/
public int getPPID() {
- Process.ifInterruptedStop();
- return MsgNative.processGetPPID(this);
+ if (ppid == -1) {
+ ppid = MsgNative.processGetPPID(this);
+ }
+ return ppid;
}
/**
* This static method returns the currently running process.
*
*/
public static Process currentProcess() {
- Process.ifInterruptedStop();
return MsgNative.processSelf();
}
/**
*
*/
public static void migrate(Process process, Host host) {
- Process.ifInterruptedStop();
MsgNative.processMigrate(process, host);
+ process.host = null;
}
/**
* Makes the current process sleep until time seconds have elapsed.
* @exception HostFailureException on error in the native SimGrid code
*/
public static void waitFor(double seconds) throws HostFailureException {
- Process.ifInterruptedStop();
MsgNative.processWaitFor(seconds);
}
/**
*
*/
public void showArgs() {
- Process.ifInterruptedStop();
Msg.info("[" + this.name + "/" + this.getHost().getName() + "] argc=" +
this.args.size());
for (int i = 0; i < this.args.size(); i++)
Msg.info("[" + this.msgName() + "/" + this.getHost().getName() +
"] args[" + i + "]=" + (String) (this.args.get(i)));
}
+ /**
+ * Let the simulated process sleep for the given amount of millisecond in the simulated world.
+ *
+ * You don't want to use sleep instead, because it would freeze your simulation
+ * run without any impact on the simulated world.
+ * @param millis
+ */
+ public native void simulatedSleep(double seconds);
+
/**
* This method runs the process. Il calls the method function that you must overwrite.
*/
Msg.info("Unexpected behavior. Stopping now");
System.exit(1);
}
- catch(RuntimeException re) {
- if (nativeStop)
- {
- MsgNative.processExit(this);
+ catch(ProcessKilled pk) {
+ if (nativeStop) {
+ try {
+ MsgNative.processExit(this);
+ } catch (ProcessKilled pk2) {
+ /* Ignore that other exception that *will* occur all the time.
+ * This is because the C mechanic gives the control to the now-killed process
+ * so that it does some garbage collecting on its own. When it does so here,
+ * the Java thread checks when starting if it's supposed to be killed (to inform
+ * the C world). To avoid the infinite loop or anything similar, we ignore that
+ * exception now. This should be ok since we ignore only a very specific exception
+ * class and not a generic (such as any RuntimeException).
+ */
+ System.err.println(currentThread().getName()+": I ignore that other exception");
+ }
Msg.info(" Process " + ((Process) Thread.currentThread()).msgName() + " has been killed.");
schedEnd.release();
}
else {
- re.printStackTrace();
+ pk.printStackTrace();
Msg.info("Unexpected behavior. Stopping now");
System.exit(1);
}
public abstract void main(String[]args) throws MsgException;
- /**
+ /** @brief Gives the control from the given user thread back to the maestro
+ *
+ * schedule() and unschedule() are the basis of interactions between the user threads
+ * (executing the user code), and the maestro thread (executing the platform models to decide
+ * which user thread should get executed when. Once it decided which user thread should be run
+ * (because the blocking action it were blocked onto are terminated in the simulated world), the
+ * maestro passes the control to this uthread by calling uthread.schedule() in the maestro thread
+ * (check its code for the simple semaphore-based synchronization schema).
+ *
+ * The uthread executes (while the maestro is blocked), until it starts another blocking
+ * action, such as a communication or so. In that case, uthread.unschedule() gets called from
+ * the user thread.
*
+ * As other complications, these methods are called directly by the C through a JNI upcall in
+ * response to the JNI downcalls done by the Java code. For example, you have this (simplified)
+ * execution path:
+ * - a process calls the Task.send() method in java
+ * - this calls Java_org_simgrid_msg_MsgNative_taskSend() in C through JNI
+ * - this ends up calling jprocess_unschedule(), still in C
+ * - this calls the java method "org/simgrid/msg/Process/unschedule()V" through JNI
+ * - that is to say, the unschedule() method that you are reading the documentation of.
+ *
+ * To understand all this, you must keep in mind that there is no difference between the C thread
+ * describing a process, and the Java thread doing the same. Most of the time, they are system
+ * threads from the kernel anyway. In the other case (such as when using green java threads when
+ * the OS does not provide any thread feature), I'm unsure of what happens: it's a very long time
+ * that I didn't see any such OS.
+ *
+ * The synchronization itself is implemented using simple semaphores in Java, as you can see by
+ * checking the code of these functions (and run() above). That's super simple, and thus welcome
+ * given the global complexity of the synchronization architecture: getting C and Java cooperate
+ * with regard to thread handling in a portable manner is very uneasy. A simple and straightforward
+ * implementation of each synchronization point is precious.
+ *
+ * But this kinda limits the system scalability. It may reveal difficult to simulate dozens of
+ * thousands of processes this way, both for memory limitations and for hard limits pushed by the
+ * system on the amount of threads and semaphores (we have 2 semaphores per user process).
+ *
+ * At time of writing, the best source of information on how to simulate large systems within the
+ * Java bindings of simgrid is here: http://tomp2p.net/dev/simgrid/
+ *
*/
public void unschedule() {
- //Process.ifInterruptedStop();
- try {
+ /* this function is called from the user thread only */
+ try {
+
+ /* unlock the maestro before going to sleep */
schedEnd.release();
+ /* Here, the user thread is locked, waiting for the semaphore, and maestro executes instead */
schedBegin.acquire();
- } catch (InterruptedException e) {
+ /* now that the semaphore is acquired, it means that maestro gave us the control back */
+
+ /* the user thread is starting again after giving the control to maestro.
+ * Let's check if we were asked to die in between */
+ if ( (Thread.currentThread() instanceof Process) &&((Process) Thread.currentThread()).getNativeStop()) {
+ throw new ProcessKilled();
+ }
+
+ } catch (InterruptedException e) {
+ /* ignore this exception because this is how we get killed on process.kill or end of simulation.
+ * I don't like hiding exceptions this way, but fail to see any other solution
+ */
}
+
}
- /**
+ /** @brief Gives the control from the maestro back to the given user thread
+ *
+ * Must be called from the maestro thread -- see unschedule() for details.
*
*/
public void schedule() {
- //System.err.println("Scheduling process in Java");
- //Process.ifInterruptedStop();
try {
+ /* unlock the user thread before going to sleep */
schedBegin.release();
+ /* Here, maestro is locked, waiting for the schedEnd semaphore to get signaled by used thread, that executes instead */
schedEnd.acquire();
+ /* Maestro now has the control back and the user thread went to sleep gently */
+
} catch(InterruptedException e) {
- System.err.println("Got an interuption while scheduling process in Java");
- e.printStackTrace();
+ throw new RuntimeException("The impossible did happend once again: I got interrupted in schedEnd.acquire()",e);
}
}
* @throws HostFailureException
* @throws TransferFailureException */
public void taskSend(String mailbox, Task task, double timeout) throws TransferFailureException, HostFailureException, TimeoutException {
- Process.ifInterruptedStop();
MsgNative.taskSend(mailbox, task, timeout);
}
* @throws HostFailureException
* @throws TransferFailureException */
public void taskSend(String mailbox, Task task) throws TransferFailureException, HostFailureException, TimeoutException {
- Process.ifInterruptedStop();
MsgNative.taskSend(mailbox, task, -1);
}
* @throws TimeoutException
*/
public Task taskReceive(String mailbox) throws TransferFailureException, HostFailureException, TimeoutException {
- Process.ifInterruptedStop();
return MsgNative.taskReceive(mailbox, -1.0, null);
}
* @throws TimeoutException
*/
public Task taskReceive(String mailbox, double timeout) throws TransferFailureException, HostFailureException, TimeoutException {
- Process.ifInterruptedStop();
return MsgNative.taskReceive(mailbox, timeout, null);
}
* @throws TimeoutException
*/
public Task taskReceive(String mailbox, double timeout, Host host) throws TransferFailureException, HostFailureException, TimeoutException {
- Process.ifInterruptedStop();
return MsgNative.taskReceive(mailbox, timeout, host);
}
* @throws TimeoutException
*/
public Task taskReceive(String mailbox, Host host) throws TransferFailureException, HostFailureException, TimeoutException {
- Process.ifInterruptedStop();
return MsgNative.taskReceive(mailbox, -1.0, host);
}
}