X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/fc783dc02e4e1bf955b152713d1df4914ae3adb8..7b2435d30a045a0c8c292c0fd98a2a2379a40fb3:/org/simgrid/msg/Process.java diff --git a/org/simgrid/msg/Process.java b/org/simgrid/msg/Process.java index 8d2b2ca0a8..d9cedd97aa 100644 --- a/org/simgrid/msg/Process.java +++ b/org/simgrid/msg/Process.java @@ -9,11 +9,12 @@ *(GNU LGPL) which comes with this package. */ -package simgrid.msg; - +package org.simgrid.msg; + import java.util.Arrays; import java.util.Hashtable; import java.util.Vector; +import java.util.concurrent.Semaphore; /** * A process may be defined as a code, with some private data, executing @@ -67,20 +68,33 @@ public abstract class Process extends Thread { */ public long id; - public Hashtable properties; + public Hashtable properties; /** * The name of the process. */ protected String name; - public String msgName() { - return this.name; - } + /** + * The PID of the process + */ + protected int pid = -1; + /** + * The PPID of the process + */ + protected int ppid = -1; + /** + * The host of the process + */ + protected Host host = null; + /** The arguments of the method function of the process. */ public Vector args; /* process synchronization tools */ - protected Sem schedBegin, schedEnd; + + /* give the full path to semaphore to ensure that our own implementation don't get selected */ + protected java.util.concurrent.Semaphore schedBegin, schedEnd; + private boolean nativeStop = false; /** * Default constructor (used in ApplicationHandler to initialize it) @@ -92,8 +106,8 @@ public abstract class Process extends Thread { this.bind = 0; this.args = new Vector(); this.properties = null; - schedBegin = new Sem(0); - schedEnd = new Sem(0); + schedBegin = new java.util.concurrent.Semaphore(0); + schedEnd = new java.util.concurrent.Semaphore(0); } @@ -120,7 +134,8 @@ public abstract class Process extends Thread { * @param args The arguments of the main function of the process. * * @exception HostNotFoundException if no host with this name exists. - * NativeException + * NativeException + * @throws NativeException * */ public Process(String hostname, String name, String args[]) throws HostNotFoundException, NativeException { @@ -158,10 +173,18 @@ public abstract class Process extends Thread { if (null != args) this.args.addAll(Arrays.asList(args)); - MsgNative.processCreate(this, host); + try { + create(host.getName()); + } catch (HostNotFoundException e) { + throw new RuntimeException("The impossible happened (yet again): the host that I have were not found",e); + } + } - - + /** + * The natively implemented method to create an MSG process. + * @param host A valid (binded) host where create the process. + */ + protected native void create(String hostName) throws HostNotFoundException; /** * This method kills all running process of the simulation. * @@ -172,19 +195,34 @@ public abstract class Process extends Thread { * @return The function returns the PID of the next created process. * */ - public static int killAll(int resetPID) { - return MsgNative.processKillAll(resetPID); + public static native int killAll(int resetPID); + /** + * This method sets a flag to indicate that this thread must be killed. End user must use static method kill + * + * @return + * + */ + public void nativeStop() { + nativeStop = true; + } + /** + * getter for the flag that indicates that this thread must be killed + * + * @return + * + */ + public boolean getNativeStop() { + return nativeStop; } /** - * This method adds an argument in the list of the arguments of the main function - * of the process. + * This method kill a process. + * @param process the process to be killed. * - * @param arg The argument to add. */ - @Deprecated - protected void addArg(String arg) { - args.add(arg); + public void kill() { + nativeStop(); + Msg.info("Process " + msgName() + " will be killed."); } /** @@ -192,37 +230,33 @@ public abstract class Process extends Thread { * waiting for the completion. * */ - public void pause() { - MsgNative.processSuspend(this); - } + public native void pause(); /** * Resumes a suspended process by resuming the task on which it was * waiting for the completion. * * */ - public void restart() { - MsgNative.processResume(this); - } + public native void restart(); /** * Tests if a process is suspended. * * @return The method returns true if the process is suspended. * Otherwise the method returns false. */ - public boolean isSuspended() { - return MsgNative.processIsSuspended(this); + public native boolean isSuspended(); + /** + * Returns the name of the process + */ + public String msgName() { + return this.name; } /** - * Returns the host of a process. - * + * Returns the host of the process. * @return The host instance of the process. - * - * @exception NativeException on error in the native SimGrid code - * */ public Host getHost() { - return MsgNative.processGetHost(this); + return this.host; } /** * This static method gets a process from a PID. @@ -233,9 +267,7 @@ public abstract class Process extends Thread { * * @exception NativeException on error in the native SimGrid code */ - public static Process fromPID(int PID) throws NativeException { - return MsgNative.processFromPID(PID); - } + public static native Process fromPID(int PID) throws NativeException; /** * This method returns the PID of the process. * @@ -243,7 +275,7 @@ public abstract class Process extends Thread { * */ public int getPID() { - return MsgNative.processGetPID(this); + return pid; } /** * This method returns the PID of the parent of a process. @@ -252,7 +284,7 @@ public abstract class Process extends Thread { * */ public int getPPID() { - return MsgNative.processGetPPID(this); + return ppid; } /** * This static method returns the currently running process. @@ -260,41 +292,67 @@ public abstract class Process extends Thread { * @return The current process. * */ - public static Process currentProcess() { - return MsgNative.processSelf(); - } + public static native Process currentProcess(); + /** + * Kills a MSG process + * @param process Valid java process to kill + */ + final static native void kill(Process process); /** * Migrates a process to another host. * + * @param process The process to migrate. * @param host The host where to migrate the process. * */ - public void migrate(Host host) { - MsgNative.processChangeHost(this, host); + public static void migrate(Process process, Host host) { + MsgNative.processMigrate(process, host); + process.host = null; } + /** + * Makes the current process sleep until millis millisecondes have elapsed. + * You should note that unlike "waitFor" which takes seconds, this method takes milliseconds. + * FIXME: Not optimal, maybe we should have two native functions. + * @param millis the length of time to sleep in milliseconds. + */ + public static void sleep(long millis) { + sleep(millis,0); + } + /** + * Makes the current process sleep until millis milliseconds and nanos nanoseconds + * have elapsed. + * You should note that unlike "waitFor" which takes seconds, this method takes milliseconds and nanoseconds. + * Overloads Thread.sleep. + * @param millis the length of time to sleep in milliseconds. + * @param nanos additionnal nanoseconds to sleep. + */ + public native static void sleep(long millis, int nanos); /** * Makes the current process sleep until time seconds have elapsed. - * * @param seconds The time the current process must sleep. - * - * @exception HostFailureException on error in the native SimGrid code */ - public static void waitFor(double seconds) throws HostFailureException { - MsgNative.processWaitFor(seconds); - } - public void showArgs() { + public native void waitFor(double seconds); + /** + * + */ + public void showArgs() { Msg.info("[" + this.name + "/" + this.getHost().getName() + "] argc=" + this.args.size()); for (int i = 0; i < this.args.size(); i++) Msg.info("[" + this.msgName() + "/" + this.getHost().getName() + "] args[" + i + "]=" + (String) (this.args.get(i))); } + /** + * Exit the process + */ + public native void exit(); + /** * This method runs the process. Il calls the method function that you must overwrite. */ public void run() { - String[]args = null; /* do not fill it before the signal or this.args will be empty */ + String[] args = null; /* do not fill it before the signal or this.args will be empty */ //waitSignal(); /* wait for other people to fill the process in */ @@ -311,70 +369,137 @@ public abstract class Process extends Thread { } this.main(args); - MsgNative.processExit(this); + exit(); schedEnd.release(); } catch(MsgException e) { e.printStackTrace(); Msg.info("Unexpected behavior. Stopping now"); System.exit(1); } + catch(ProcessKilled pk) { + if (nativeStop) { + try { + exit(); + } catch (ProcessKilled pk2) { + /* Ignore that other exception that *will* occur all the time. + * This is because the C mechanic gives the control to the now-killed process + * so that it does some garbage collecting on its own. When it does so here, + * the Java thread checks when starting if it's supposed to be killed (to inform + * the C world). To avoid the infinite loop or anything similar, we ignore that + * exception now. This should be ok since we ignore only a very specific exception + * class and not a generic (such as any RuntimeException). + */ + System.err.println(currentThread().getName()+": I ignore that other exception"); + } + Msg.info(" Process " + ((Process) Thread.currentThread()).msgName() + " has been killed."); + schedEnd.release(); + } + else { + pk.printStackTrace(); + Msg.info("Unexpected behavior. Stopping now"); + System.exit(1); + } + } } /** * The main function of the process (to implement). - */ + * + * @param args + * @throws MsgException + */ public abstract void main(String[]args) throws MsgException; - public void unschedule() { - try { + /** @brief Gives the control from the given user thread back to the maestro + * + * schedule() and unschedule() are the basis of interactions between the user threads + * (executing the user code), and the maestro thread (executing the platform models to decide + * which user thread should get executed when. Once it decided which user thread should be run + * (because the blocking action it were blocked onto are terminated in the simulated world), the + * maestro passes the control to this uthread by calling uthread.schedule() in the maestro thread + * (check its code for the simple semaphore-based synchronization schema). + * + * The uthread executes (while the maestro is blocked), until it starts another blocking + * action, such as a communication or so. In that case, uthread.unschedule() gets called from + * the user thread. + * + * As other complications, these methods are called directly by the C through a JNI upcall in + * response to the JNI downcalls done by the Java code. For example, you have this (simplified) + * execution path: + * - a process calls the Task.send() method in java + * - this calls Java_org_simgrid_msg_MsgNative_taskSend() in C through JNI + * - this ends up calling jprocess_unschedule(), still in C + * - this calls the java method "org/simgrid/msg/Process/unschedule()V" through JNI + * - that is to say, the unschedule() method that you are reading the documentation of. + * + * To understand all this, you must keep in mind that there is no difference between the C thread + * describing a process, and the Java thread doing the same. Most of the time, they are system + * threads from the kernel anyway. In the other case (such as when using green java threads when + * the OS does not provide any thread feature), I'm unsure of what happens: it's a very long time + * that I didn't see any such OS. + * + * The synchronization itself is implemented using simple semaphores in Java, as you can see by + * checking the code of these functions (and run() above). That's super simple, and thus welcome + * given the global complexity of the synchronization architecture: getting C and Java cooperate + * with regard to thread handling in a portable manner is very uneasy. A simple and straightforward + * implementation of each synchronization point is precious. + * + * But this kinda limits the system scalability. It may reveal difficult to simulate dozens of + * thousands of processes this way, both for memory limitations and for hard limits pushed by the + * system on the amount of threads and semaphores (we have 2 semaphores per user process). + * + * At time of writing, the best source of information on how to simulate large systems within the + * Java bindings of simgrid is here: http://tomp2p.net/dev/simgrid/ + * + */ + public void unschedule() { + /* this function is called from the user thread only */ + try { + + /* unlock the maestro before going to sleep */ schedEnd.release(); + /* Here, the user thread is locked, waiting for the semaphore, and maestro executes instead */ schedBegin.acquire(); - } catch(InterruptedException e) { + /* now that the semaphore is acquired, it means that maestro gave us the control back */ + + /* the user thread is starting again after giving the control to maestro. + * Let's check if we were asked to die in between */ + if ( (Thread.currentThread() instanceof Process) &&((Process) Thread.currentThread()).getNativeStop()) { + throw new ProcessKilled(); + } + + } catch (InterruptedException e) { + /* ignore this exception because this is how we get killed on process.kill or end of simulation. + * I don't like hiding exceptions this way, but fail to see any other solution + */ } + } - public void schedule() { + /** @brief Gives the control from the maestro back to the given user thread + * + * Must be called from the maestro thread -- see unschedule() for details. + * + */ + public void schedule() { try { + /* unlock the user thread before going to sleep */ schedBegin.release(); + /* Here, maestro is locked, waiting for the schedEnd semaphore to get signaled by used thread, that executes instead */ schedEnd.acquire(); + /* Maestro now has the control back and the user thread went to sleep gently */ + } catch(InterruptedException e) { + throw new RuntimeException("The impossible did happend once again: I got interrupted in schedEnd.acquire()",e); } } - - /** Send the given task in the mailbox associated with the specified alias (waiting at most given time) - * @throws TimeoutException - * @throws HostFailureException - * @throws TransferFailureException */ - public void taskSend(String mailbox, Task task, double timeout) throws TransferFailureException, HostFailureException, TimeoutException { - MsgNative.taskSend(mailbox, task, timeout); - } - - /** Send the given task in the mailbox associated with the specified alias - * @throws TimeoutException - * @throws HostFailureException - * @throws TransferFailureException */ - public void taskSend(String mailbox, Task task) throws TransferFailureException, HostFailureException, TimeoutException { - MsgNative.taskSend(mailbox, task, -1); - } - - /** Receive a task on mailbox associated with the specified mailbox */ - public Task taskReceive(String mailbox) throws TransferFailureException, HostFailureException, TimeoutException { - return MsgNative.taskReceive(mailbox, -1.0, null); - } - - /** Receive a task on mailbox associated with the specified alias (waiting at most given time) */ - public Task taskReceive(String mailbox, double timeout) throws TransferFailureException, HostFailureException, TimeoutException { - return MsgNative.taskReceive(mailbox, timeout, null); - } - - /** Receive a task on mailbox associated with the specified alias from given sender */ - public Task taskReceive(String mailbox, double timeout, Host host) throws TransferFailureException, HostFailureException, TimeoutException { - return MsgNative.taskReceive(mailbox, timeout, host); - } - - /** Receive a task on mailbox associated with the specified alias from given sender*/ - public Task taskReceive(String mailbox, Host host) throws TransferFailureException, HostFailureException, TimeoutException { - return MsgNative.taskReceive(mailbox, -1.0, host); + + /** + * Class initializer, to initialize various JNI stuff + */ + public static native void nativeInit(); + static { + nativeInit(); } }