//Create a slave on host "alice"
try {
Msg.info("Create process on host 'alice'");
- new Slave("alice","process2");
+ new Slave("alice","process2").start();
}
catch (MsgException e){
System.out.println("Process2!");
/* bypass deployment */
try {
- new Master("bob","process1");
+ new Master("bob","process1").start();
}
catch (MsgException e){
System.out.println("Create processes failed!");
try {
Msg.info("Create process on host 'alice'");
process2 = new Slave("alice","slave");
+ process2.start();
} catch (MsgException e){
System.out.println("Process2!");
}
/* bypass deploymemt */
try {
- Master process1 = new Master("bob","master");
+ Master process1 = new Master("bob","master");
+ process1.start();
}
catch (MsgException e){
System.out.println("Create processes failed!");
Msg.info("Send Mail1!");
task.send("mail1");
+ Msg.info("Send Mail2 !");
Task task2 = Task.receive("mail2");
Msg.info("Receive Mail2!");
- }
+ }
}
Constructor<Process> constructor = cls.getConstructor(new Class [] {Host.class, java.lang.String.class, java.lang.String[].class});
String[] args_ = args.toArray(new String[args.size()]);
Process process = constructor.newInstance(Host.getByName(hostName), function, args_);
+ process.start();
}
catch (NoSuchMethodException e) {
throw new RuntimeException("Can't find the correct constructor for the class " + function + ". \n" +
import java.util.Arrays;
import java.util.Hashtable;
import java.util.Vector;
+import java.lang.Runnable;
import java.util.concurrent.Semaphore;
/**
*
*/
-public abstract class Process extends Thread {
+public abstract class Process implements Runnable {
/**
* This attribute represents a bind between a java process object and
* a native process. Even if this attribute is public you must never
/* process synchronization tools */
- /* give the full path to semaphore to ensure that our own implementation don't get selected */
- protected java.util.concurrent.Semaphore schedBegin, schedEnd;
private boolean nativeStop = false;
/**
* Default constructor (used in ApplicationHandler to initialize it)
*/
protected Process() {
- super();
this.id = nextProcessId++;
this.name = null;
this.bind = 0;
this.args = new Vector<String>();
this.properties = null;
- schedBegin = new java.util.concurrent.Semaphore(0);
- schedEnd = new java.util.concurrent.Semaphore(0);
}
public Process(Host host, String name, String[]args) {
/* This is the constructor called by all others */
this();
-
+ this.host = host;
if (name == null)
throw new NullPointerException("Process name cannot be NULL");
this.name = name;
this.args = new Vector<String>();
if (null != args)
this.args.addAll(Arrays.asList(args));
-
- try {
- create(host.getName());
- } catch (HostNotFoundException e) {
- throw new RuntimeException("The impossible happened (yet again): the host that I have were not found",e);
- }
this.properties = new Hashtable<String,String>();
* Exit the process
*/
public native void exit();
-
+ /**
+ * This method actually creates and run the process.
+ * @throws HostNotFoundException
+ */
+ public void start() throws HostNotFoundException {
+ create(host.getName());
+ }
+
/**
* This method runs the process. Il calls the method function that you must overwrite.
*/
public void run() {
String[] args = null; /* do not fill it before the signal or this.args will be empty */
-
//waitSignal(); /* wait for other people to fill the process in */
-
- try {
- schedBegin.acquire();
- } catch(InterruptedException e) {
- }
-
try {
args = new String[this.args.size()];
if (this.args.size() > 0) {
this.main(args);
exit();
- schedEnd.release();
} catch(MsgException e) {
e.printStackTrace();
Msg.info("Unexpected behavior. Stopping now");
* exception now. This should be ok since we ignore only a very specific exception
* class and not a generic (such as any RuntimeException).
*/
- System.err.println(currentThread().getName()+": I ignore that other exception");
+ //System.err.println(currentThread().getName()+": I ignore that other exception");
}
- Msg.info(" Process " + ((Process) Thread.currentThread()).msgName() + " has been killed.");
- schedEnd.release();
+ //Msg.info(" Process " + ((Process) Thread.currentThread()).msgName() + " has been killed.");
}
else {
pk.printStackTrace();
*/
public abstract void main(String[]args) throws MsgException;
-
- /** @brief Gives the control from the given user thread back to the maestro
- *
- * schedule() and unschedule() are the basis of interactions between the user threads
- * (executing the user code), and the maestro thread (executing the platform models to decide
- * which user thread should get executed when. Once it decided which user thread should be run
- * (because the blocking action it were blocked onto are terminated in the simulated world), the
- * maestro passes the control to this uthread by calling uthread.schedule() in the maestro thread
- * (check its code for the simple semaphore-based synchronization schema).
- *
- * The uthread executes (while the maestro is blocked), until it starts another blocking
- * action, such as a communication or so. In that case, uthread.unschedule() gets called from
- * the user thread.
- *
- * As other complications, these methods are called directly by the C through a JNI upcall in
- * response to the JNI downcalls done by the Java code. For example, you have this (simplified)
- * execution path:
- * - a process calls the Task.send() method in java
- * - this calls Java_org_simgrid_msg_MsgNative_taskSend() in C through JNI
- * - this ends up calling jprocess_unschedule(), still in C
- * - this calls the java method "org/simgrid/msg/Process/unschedule()V" through JNI
- * - that is to say, the unschedule() method that you are reading the documentation of.
- *
- * To understand all this, you must keep in mind that there is no difference between the C thread
- * describing a process, and the Java thread doing the same. Most of the time, they are system
- * threads from the kernel anyway. In the other case (such as when using green java threads when
- * the OS does not provide any thread feature), I'm unsure of what happens: it's a very long time
- * that I didn't see any such OS.
- *
- * The synchronization itself is implemented using simple semaphores in Java, as you can see by
- * checking the code of these functions (and run() above). That's super simple, and thus welcome
- * given the global complexity of the synchronization architecture: getting C and Java cooperate
- * with regard to thread handling in a portable manner is very uneasy. A simple and straightforward
- * implementation of each synchronization point is precious.
- *
- * But this kinda limits the system scalability. It may reveal difficult to simulate dozens of
- * thousands of processes this way, both for memory limitations and for hard limits pushed by the
- * system on the amount of threads and semaphores (we have 2 semaphores per user process).
- *
- * At time of writing, the best source of information on how to simulate large systems within the
- * Java bindings of simgrid is here: http://tomp2p.net/dev/simgrid/
- *
- */
- public void unschedule() {
- /* this function is called from the user thread only */
- try {
-
- /* unlock the maestro before going to sleep */
- schedEnd.release();
- /* Here, the user thread is locked, waiting for the semaphore, and maestro executes instead */
- schedBegin.acquire();
- /* now that the semaphore is acquired, it means that maestro gave us the control back */
-
- /* the user thread is starting again after giving the control to maestro.
- * Let's check if we were asked to die in between */
- if ( (Thread.currentThread() instanceof Process) &&((Process) Thread.currentThread()).getNativeStop()) {
- throw new ProcessKilled();
- }
-
- } catch (InterruptedException e) {
- /* ignore this exception because this is how we get killed on process.kill or end of simulation.
- * I don't like hiding exceptions this way, but fail to see any other solution
- */
- }
-
- }
-
- /** @brief Gives the control from the maestro back to the given user thread
- *
- * Must be called from the maestro thread -- see unschedule() for details.
- *
- */
- public void schedule() {
- try {
- /* unlock the user thread before going to sleep */
- schedBegin.release();
- /* Here, maestro is locked, waiting for the schedEnd semaphore to get signaled by used thread, that executes instead */
- schedEnd.acquire();
- /* Maestro now has the control back and the user thread went to sleep gently */
-
- } catch(InterruptedException e) {
- throw new RuntimeException("The impossible did happend once again: I got interrupted in schedEnd.acquire()",e);
- }
- }
/**
* Class initializer, to initialize various JNI stuff
#include "jmsg.h"
#include "jmsg_host.h"
#include "jxbt_utilities.h"
-
-#include "smx_context_java.h"
+#include "smx_context_java.h"
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(jmsg);
(*env)->DeleteGlobalRef(env, jprocess);
}
-jboolean jprocess_is_alive(jobject jprocess, JNIEnv * env)
-{
- jmethodID id =
- jxbt_get_smethod(env, "org/simgrid/msg/Process", "isAlive", "()Z");
-
- if (!id)
- return 0;
-
- return (*env)->CallBooleanMethod(env, jprocess, id);
-}
-
void jprocess_join(jobject jprocess, JNIEnv * env)
{
- jmethodID id =
- jxbt_get_smethod(env, "org/simgrid/msg/Process", "join", "()V");
-
- if (!id)
- return;
-
- (*env)->CallVoidMethod(env, jprocess, id);
+ m_process_t process = jprocess_to_native_process(jprocess,env);
+ smx_ctx_java_t context = (smx_ctx_java_t)MSG_process_get_smx_ctx(process);
+ xbt_os_thread_join(context->thread,NULL);
}
void jprocess_exit(jobject jprocess, JNIEnv * env)
(*env)->CallVoidMethod(env, jprocess, id);
}
-void jprocess_yield(jobject jprocess, JNIEnv * env)
-{
- jmethodID id =
- jxbt_get_smethod(env, "org/simgrid/msg/Process", "switchProcess", "()V");
-
- if (!id)
- return;
-
- (*env)->CallVoidMethod(env, jprocess, id);
-}
-
-void jprocess_lock_mutex(jobject jprocess, JNIEnv * env)
-{
- jmethodID id =
- jxbt_get_smethod(env, "org/simgrid/msg/Process", "lockMutex", "()V");
-
- if (!id)
- return;
-
- (*env)->CallVoidMethod(env, jprocess, id);
-}
-
-void jprocess_unlock_mutex(jobject jprocess, JNIEnv * env)
-{
- jmethodID id =
- jxbt_get_smethod(env, "org/simgrid/msg/Process", "unlockMutex", "()V");
-
- if (!id)
- return;
-
- (*env)->CallVoidMethod(env, jprocess, id);
-}
-
-
-void jprocess_signal_cond(jobject jprocess, JNIEnv * env)
-{
- jmethodID id =
- jxbt_get_smethod(env, "org/simgrid/msg/Process", "signalCond", "()V");
-
- if (!id)
- return;
-
- (*env)->CallVoidMethod(env, jprocess, id);
-}
-
-void jprocess_wait_cond(jobject jprocess, JNIEnv * env)
-{
- jmethodID id =
- jxbt_get_smethod(env, "org/simgrid/msg/Process", "waitCond", "()V");
-
- if (!id)
- return;
-
- (*env)->CallVoidMethod(env, jprocess, id);
-}
-
-
void jprocess_start(jobject jprocess, JNIEnv * env)
{
jmethodID id =
{
jfieldID id = jxbt_get_sfield(env, "org/simgrid/msg/Process", "bind", "J");
- if (!id)
+ if (!id) {
+ XBT_INFO("Can't find bind field in Process");
return NULL;
+ }
return (m_process_t) (long) (*env)->GetLongField(env, jprocess, id);
}
return (*env)->GetLongField(env, jprocess, id) ? JNI_TRUE : JNI_FALSE;
}
-
-void jprocess_schedule(smx_context_t context)
-{
- JNIEnv *env;
- jmethodID id;
-
- env = get_current_thread_env();
-
- id = jxbt_get_smethod(env, "org/simgrid/msg/Process", "schedule", "()V");
-
- if (!id) {
- XBT_CRITICAL("Cannot find java method org/simgrid/msg/Process/schedule()V");
- return;
- }
-
- (*env)->CallVoidMethod(env, ((smx_ctx_java_t) context)->jprocess, id);
-}
-
-
-
-void jprocess_unschedule(smx_context_t context)
-{
- JNIEnv *env;
- jmethodID id;
-
- env = get_current_thread_env();
-
-
- id = jxbt_get_smethod(env, "org/simgrid/msg/Process", "unschedule", "()V");
-
- if (!id)
- return;
-
- (*env)->CallVoidMethod(env, ((smx_ctx_java_t) context)->jprocess, id);
-}
JNIEXPORT void JNICALL
Java_org_simgrid_msg_Process_nativeInit(JNIEnv *env, jclass cls) {
jclass jprocess_class_Process = (*env)->FindClass(env, "org/simgrid/msg/Process");
jxbt_throw_notbound(env, "process", jprocess);
return;
}
-
- smx_ctx_java_stop(MSG_process_get_smx_ctx(process));
+ smx_ctx_java_t context = (smx_ctx_java_t)MSG_process_get_smx_ctx(process);
+ smx_ctx_java_stop(MSG_process_get_smx_ctx(process));
+ xbt_os_sem_release(context->end);
}
JNIEXPORT void JNICALL
*/
void jprocess_delete_global_ref(jobject jprocess, JNIEnv * env);
-/**
- *
- * This function tests if the specified java process instance is alive.
- * A java process object is alive if it has been started and has not yet
- * terminated.
- *
- * @param jprocess The java process to test.
- * @param env The env of the current thread
- *
- * @exception If the class Process is not found the function throws
- * the ClassNotFoundException. If the methos isAlive() of
- * this class is not found the function throws the exception
- * NotSuchMethodException.
- *
- * @return If the java process is alive the function returns
- * true. Otherwise the function returns false.
- */
-jboolean jprocess_is_alive(jobject jprocess, JNIEnv * env);
/**
* This function waits for a java process to terminate.
*/
jstring jprocess_get_name(jobject jprocess, JNIEnv * env);
-/**
- * This function yields the specified java process.
- *
- * @param jprocess The java process to yield.
- * @param env The env of the current thread.
- *
- * @exception If the class Process is not found the function throws
- * the ClassNotFoundException. If the method switchProcess of
- * this class is not found the function throws the exception
- * NotSuchMethodException.
- */
-void jprocess_yield(jobject jprocess, JNIEnv * env);
-
-/**
- * This function locks the mutex of the specified java process.
- *
- * @param jprocess The java process of the mutex to lock.
- * @param env The env of the current thread.
- *
- * @exception If the class Process is not found the function throws
- * the ClassNotFoundException. If the method lockMutex of
- * this class is not found the function throws the exception
- * NotSuchMethodException.
- */
-void jprocess_lock_mutex(jobject jprocess, JNIEnv * env);
-
-/**
- * This function unlocks the mutex of the specified java process.
- *
- * @param jprocess The java process of the mutex to unlock.
- * @param env The env of the current thread.
- *
- * @exception If the class Process is not found the function throws
- * the ClassNotFoundException. If the method unlockMutex of
- * this class is not found the function throws the exception
- * NotSuchMethodException.
- */
-void jprocess_unlock_mutex(jobject jprocess, JNIEnv * env);
-
-/**
- * This function signals the condition of the mutex of the specified java process.
- *
- * @param jprocess The java process of the condtion to signal.
- * @param env The env of the current thread.
- *
- * @exception If the class Process is not found the function throws
- * the ClassNotFoundException. If the method signalCond of
- * this class is not found the function throws the exception
- * NotSuchMethodException.
- */
-void jprocess_signal_cond(jobject jprocess, JNIEnv * env);
-
-/**
- * This function waits the condition of the mutex of the specified java process.
- *
- * @param jprocess The java process of the condtion to wait for.
- * @param env The env of the current thread.
- *
- * @exception If the class Process is not found the function throws
- * the ClassNotFoundException. If the method waitCond of
- * this class is not found the function throws the exception
- * NotSuchMethodException.
- */
-void jprocess_wait_cond(jobject jprocess, JNIEnv * env);
-
-void jprocess_schedule(smx_context_t context);
-
-void jprocess_unschedule(smx_context_t context);
/*
* Class org_simgrid_msg_Process
* Method nativeInit
*/
JNIEXPORT void JNICALL Java_org_simgrid_msg_Process_migrate
(JNIEnv *, jobject, jobject);
-
#endif /* !MSG_JPROCESS_H */
#include <xbt/function_types.h>
#include <simgrid/simix.h>
#include "smx_context_java.h"
+#include "jxbt_utilities.h"
#include "xbt/dynar.h"
+JavaVM *get_current_vm(void);
+JavaVM *get_current_vm(void)
+{
+ JavaVM *jvm;
+ JNI_GetCreatedJavaVMs(&jvm,1,NULL);
+ return jvm;
+}
+
+
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(jmsg, bindings, "MSG for Java(TM)");
static void smx_ctx_java_suspend(smx_context_t context);
static void smx_ctx_java_resume(smx_context_t new_context);
static void smx_ctx_java_runall(void);
-
+static void* smx_ctx_java_thread_run(void *data);
void SIMIX_ctx_java_factory_init(smx_context_factory_t * factory)
{
/* instantiate the context factory */
(*factory)->self = smx_ctx_java_self;
(*factory)->get_data = smx_ctx_base_get_data;
}
-
smx_context_t smx_ctx_java_self(void)
{
return my_current_context;
context->super.cleanup_func = cleanup_func;
context->jprocess = (jobject) code;
context->jenv = get_current_thread_env();
- jprocess_start(((smx_ctx_java_t) context)->jprocess,
- get_current_thread_env());
+ context->begin = xbt_os_sem_init(0);
+ context->end = xbt_os_sem_init(0);
+ context->thread = xbt_os_thread_create(NULL,smx_ctx_java_thread_run,context,NULL);
}
else {
my_current_context = (smx_context_t)context;
return (smx_context_t) context;
}
+static void* smx_ctx_java_thread_run(void *data) {
+ smx_ctx_java_t context = (smx_ctx_java_t)data;
+ //Attach the thread to the JVM
+ JNIEnv *env;
+ JavaVM *jvm = get_current_vm();
+ (*jvm)->AttachCurrentThread(jvm, (void **) &env, NULL);
+ //Wait for the first scheduling round to happen.
+ xbt_os_sem_acquire(context->begin);
+ //Execution of the "run" method.
+ jmethodID id = jxbt_get_smethod(env, "org/simgrid/msg/Process", "run", "()V");
+ xbt_assert( (id != NULL), "Method not found...");
+ (*env)->CallVoidMethod(env, context->jprocess, id);
+
+ return NULL;
+}
+
static void smx_ctx_java_free(smx_context_t context)
{
if (context) {
- smx_ctx_java_t ctx_java = (smx_ctx_java_t) context;
-
- if (ctx_java->jprocess) { /* the java process still exists */
- jobject jprocess = ctx_java->jprocess;
- ctx_java->jprocess = NULL;
-
- /* stop it */
- XBT_DEBUG("The process still exists, making it exit now");
- jprocess_exit(jprocess, get_current_thread_env());
-
- /* it's dead now, remove it from the JVM */
- jprocess_delete_global_ref(jprocess, get_current_thread_env());
- }
+ smx_ctx_java_t ctx_java = (smx_ctx_java_t) context;
+ if (ctx_java->jprocess) { /* the java process still exists */
+ jobject jprocess = ctx_java->jprocess;
+ ctx_java->jprocess = NULL; /* stop it */
+ XBT_DEBUG("The process still exists, making it exit now");
+ /* detach the thread and exit it */
+ JavaVM *jvm = get_current_vm();
+ (*jvm)->DetachCurrentThread(jvm);
+ xbt_os_thread_exit(NULL);
+ /* it's dead now, remove it from the JVM */
+ jprocess_delete_global_ref(jprocess, get_current_thread_env());
+ }
}
-
smx_ctx_base_free(context);
}
/* suspend myself again, smx_ctx_java_free() will destroy me later
* from maeastro */
- jprocess_unschedule(context);
+ smx_ctx_java_suspend(context);
XBT_DEBUG("Java stop finished");
}
static void smx_ctx_java_suspend(smx_context_t context)
{
- jprocess_unschedule(context);
+ smx_ctx_java_t ctx_java = (smx_ctx_java_t) context;
+ xbt_os_sem_release(ctx_java->end);
+ xbt_os_sem_acquire(ctx_java->begin);
+ //jprocess_unschedule(context);
}
// FIXME: inline those functions
static void smx_ctx_java_resume(smx_context_t new_context)
{
XBT_DEBUG("XXXX Context Resume\n");
- jprocess_schedule(new_context);
+ smx_ctx_java_t ctx_java = (smx_ctx_java_t) new_context;
+ xbt_os_sem_release(ctx_java->begin);
+ xbt_os_sem_acquire(ctx_java->end);
+ //jprocess_schedule(new_context);
}
static void smx_ctx_java_runall(void)
#include <xbt/misc.h>
#include <simgrid/simix.h>
+#include <xbt/os_thread.h>
+
#include "jmsg.h"
#include "jmsg_process.h"
s_smx_ctx_base_t super; /* Fields of super implementation */
jobject jprocess; /* the java process instance binded with the msg process structure */
JNIEnv *jenv; /* jni interface pointer associated to this thread */
+ xbt_os_thread_t thread;
+ xbt_os_sem_t begin; /* this semaphore is used to schedule/yield the process */
+ xbt_os_sem_t end; /* this semaphore is used to schedule/unschedule the process */
} s_smx_ctx_java_t, *smx_ctx_java_t;
void SIMIX_ctx_java_factory_init(smx_context_factory_t *factory);