From e881539db7cf1f6bb7e6d4e2ad11fbcea275c035 Mon Sep 17 00:00:00 2001 From: Samuel Lepetit Date: Fri, 4 May 2012 13:02:11 +0200 Subject: [PATCH] Add isend in Task --- examples/async/Master.java | 30 +++++++++++++-- examples/async/Slave.java | 1 + org/simgrid/msg/Comm.java | 8 +++- org/simgrid/msg/Process.java | 2 +- org/simgrid/msg/Task.java | 12 +++--- src/jmsg.c | 2 - src/jmsg_comm.c | 36 ++++++++++++------ src/jmsg_task.c | 74 +++++++++++++++++++++++++++++++++--- src/jmsg_task.h | 6 ++- 9 files changed, 137 insertions(+), 34 deletions(-) diff --git a/examples/async/Master.java b/examples/async/Master.java index 88d6d1249e..6c28a36e04 100644 --- a/examples/async/Master.java +++ b/examples/async/Master.java @@ -8,6 +8,9 @@ */ package async; +import java.util.ArrayList; + +import org.simgrid.msg.Comm; import org.simgrid.msg.Msg; import org.simgrid.msg.MsgException; import org.simgrid.msg.Task; @@ -27,13 +30,32 @@ public class Master extends Process { int slavesCount = Integer.valueOf(args[3]).intValue(); Msg.info("Hello! Got "+ slavesCount + " slaves and "+tasksCount+" tasks to process"); - + + ArrayList comms = new ArrayList(); + for (int i = 0; i < tasksCount; i++) { Task task = new Task("Task_" + i, taskComputeSize, taskCommunicateSize); - //Msg.info("Sending \"" + task.getName()+ "\" to \"slave_" + i % slavesCount + "\""); - task.send("slave_"+(i%slavesCount)); + Msg.info("Sending \"" + task.getName()+ "\" to \"slave_" + i % slavesCount + "\""); + //task.send("slave_"+(i%slavesCount)); + Comm comm = task.isend("slave_"+(i%slavesCount)); + comms.add(comm); } - + + while (comms.size() > 0) { + for (int i = 0; i < comms.size(); i++) { + try { + if (comms.get(i).test()) { + comms.remove(i); + i--; + } + } + catch (Exception e) { + e.printStackTrace(); + } + } + simulatedSleep(1); + } + Msg.info("All tasks have been dispatched. Let's tell (asynchronously) everybody the computation is over, and sleep 20s so that nobody gets a message from a terminated process."); for (int i = 0; i < slavesCount; i++) { diff --git a/examples/async/Slave.java b/examples/async/Slave.java index f1a8fabcf8..ab07cac7d3 100644 --- a/examples/async/Slave.java +++ b/examples/async/Slave.java @@ -40,6 +40,7 @@ public class Slave extends Process { comm = null; break; } + Msg.info("Received a task"); Msg.info("Received \"" + task.getName() + "\". Processing it."); try { task.execute(); diff --git a/org/simgrid/msg/Comm.java b/org/simgrid/msg/Comm.java index 2865757c34..0aae5c484d 100644 --- a/org/simgrid/msg/Comm.java +++ b/org/simgrid/msg/Comm.java @@ -12,6 +12,10 @@ package org.simgrid.msg; * between processes. */ public class Comm { + /** + * Indicates if the communication is a receiving communication + */ + boolean receiving; /** * Represents the bind between the java comm and the * native C comm. You must never access it, since it is @@ -36,13 +40,13 @@ public class Comm { /** * Finalize the communication object, destroying it. */ - protected void finalize() { + protected void finalize() throws Throwable { unbind(); } /** * Unbind the communication object */ - public native void unbind(); + public native void unbind() throws NativeException; /** * Returns if the communication is finished or not. * If the communication has finished and there was an error, diff --git a/org/simgrid/msg/Process.java b/org/simgrid/msg/Process.java index fb69cd1db8..2aceb10107 100644 --- a/org/simgrid/msg/Process.java +++ b/org/simgrid/msg/Process.java @@ -185,7 +185,7 @@ public abstract class Process extends Thread { try { MsgNative.processCreate(this, host.getName()); } catch (HostNotFoundException e) { - throw new RuntimeException("The impossible happend (yet again): the host that I have were not found",e); + throw new RuntimeException("The impossible happened (yet again): the host that I have were not found",e); } } diff --git a/org/simgrid/msg/Task.java b/org/simgrid/msg/Task.java index 5a08ec6e67..a680e64cfd 100644 --- a/org/simgrid/msg/Task.java +++ b/org/simgrid/msg/Task.java @@ -184,17 +184,17 @@ public class Task { public void sendBounded(String alias, double maxrate) throws TransferFailureException, HostFailureException, TimeoutException { MsgNative.taskSendBounded(alias, this, maxrate); } + /** + * Sends the task on the mailbox asynchronously + */ + public native Comm isend(String mailbox); + /** * Starts listening for receiving a task from an asynchronous communication * @param mailbox * @return */ - public static Comm irecv(String mailbox) { - Comm comm = new Comm(); - irecvBind(comm,mailbox); - return comm; - } - public static native void irecvBind(Comm comm, String mailbox); + public static native Comm irecv(String mailbox); /** * Retrieves next task from the mailbox identified by the specified name * diff --git a/src/jmsg.c b/src/jmsg.c index f1328c482a..bc81dc2fb9 100644 --- a/src/jmsg.c +++ b/src/jmsg.c @@ -758,7 +758,6 @@ JNIEXPORT void JNICALL Java_org_simgrid_msg_MsgNative_taskDestroy(JNIEnv * env, jclass cls, jobject jtask_arg) { - /* get the native task */ m_task_t task = jtask_to_native_task(jtask_arg, env); @@ -766,7 +765,6 @@ Java_org_simgrid_msg_MsgNative_taskDestroy(JNIEnv * env, jclass cls, jxbt_throw_notbound(env, "task", task); return; } - MSG_error_t rv = MSG_task_destroy(task); jxbt_check_res("MSG_task_destroy()", rv, MSG_OK, diff --git a/src/jmsg_comm.c b/src/jmsg_comm.c index 17a714ecdb..4bdbf41e06 100644 --- a/src/jmsg_comm.c +++ b/src/jmsg_comm.c @@ -12,12 +12,13 @@ Java_org_simgrid_msg_Comm_unbind(JNIEnv *env, jobject jcomm) { jfieldID id_task = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bindTask", "J"); msg_comm_t comm; m_task_t *task_received; - if (!id || !id_task) return; task_received = (m_task_t*) (long) (*env)->GetLongField(env, jcomm, id_task); - xbt_free(task_received); + if (task_received != NULL) { + xbt_free(task_received); + } comm = (msg_comm_t) (long) (*env)->GetLongField(env, jcomm, id); MSG_comm_destroy(comm); @@ -30,7 +31,10 @@ Java_org_simgrid_msg_Comm_test(JNIEnv *env, jobject jcomm) { jfieldID idComm = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J"); jclass jclass = jxbt_get_class(env,"org/simgrid/msg/Comm"); jfieldID idTask = jxbt_get_jfield(env, jclass, "task", "Lorg/simgrid/msg/Task;"); - if (!idComm || !idTask) { + jfieldID id_receiving = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "receiving", "Z"); + + + if (!idComm || !idTask || !id_receiving) { jxbt_throw_native(env,bprintf("idTask or idComm is null")); return JNI_FALSE; } @@ -44,17 +48,27 @@ Java_org_simgrid_msg_Comm_test(JNIEnv *env, jobject jcomm) { MSG_error_t status = MSG_comm_get_status(comm); if (status == MSG_OK) { - //bind the task object. - m_task_t task = MSG_comm_get_task(comm); - jobject jtask_global = MSG_task_get_data(task); - jobject jtask_local = (*env)->NewLocalRef(env, jtask_global); - - (*env)->DeleteGlobalRef(env, jtask_global); + //test if we are receiving or sending a task. + jboolean jreceiving = (*env)->GetBooleanField(env, jcomm, id_receiving); + if (jreceiving == JNI_TRUE) { + //bind the task object. + m_task_t task = MSG_comm_get_task(comm); + xbt_assert(task != NULL, "Task is NULL"); + jobject jtask_global = MSG_task_get_data(task); + //case where the data has already been retrieved + if (jtask_global == NULL) + { + return JNI_TRUE; + } - (*env)->SetObjectField(env, jcomm, idTask, jtask_local); + //Make sure the data will be correctly gc. + jobject jtask_local = (*env)->NewLocalRef(env, jtask_global); + (*env)->DeleteGlobalRef(env, jtask_global); - MSG_task_set_data(task, NULL); + (*env)->SetObjectField(env, jcomm, idTask, jtask_local); + MSG_task_set_data(task, NULL); + } return JNI_TRUE; } else { diff --git a/src/jmsg_task.c b/src/jmsg_task.c index c66babe94f..4800dd784b 100644 --- a/src/jmsg_task.c +++ b/src/jmsg_task.c @@ -43,20 +43,36 @@ jboolean jtask_is_valid(jobject jtask, JNIEnv * env) return (*env)->GetLongField(env, jtask, id) ? JNI_TRUE : JNI_FALSE; } -JNIEXPORT void JNICALL -Java_org_simgrid_msg_Task_irecvBind(JNIEnv * env, jclass cls, jobject jcomm, jstring jmailbox) { +JNIEXPORT jobject JNICALL +Java_org_simgrid_msg_Task_irecv(JNIEnv * env, jclass cls, jstring jmailbox) { msg_comm_t comm; const char *mailbox; + jclass comm_class; + jmethodID cid; + jfieldID id; + jfieldID id_task; + jfieldID id_receiving; //pointer to store the task object pointer. m_task_t *task = xbt_new(m_task_t,1); *task = NULL; /* There should be a cache here */ - jfieldID id = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J"); - jfieldID id_task = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bindTask", "J"); + comm_class = (*env)->FindClass(env, "org/simgrid/msg/Comm"); + cid = (*env)->GetMethodID(env, comm_class, "", "()V"); + id = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J"); + id_task = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bindTask", "J"); + id_receiving = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "receiving", "Z"); - if (!id || !id_task) - return; + if (!id || !id_task || !comm_class || !cid || !id_receiving) { + jxbt_throw_native(env,bprintf("fieldID or methodID or class not found.")); + return NULL; + } + + jobject jcomm = (*env)->NewObject(env, comm_class, cid); + if (!jcomm) { + jxbt_throw_native(env,bprintf("Can't create a Comm object.")); + return NULL; + } mailbox = (*env)->GetStringUTFChars(env, jmailbox, 0); @@ -64,6 +80,52 @@ Java_org_simgrid_msg_Task_irecvBind(JNIEnv * env, jclass cls, jobject jcomm, jst (*env)->SetLongField(env, jcomm, id, (jlong) (long)(comm)); (*env)->SetLongField(env, jcomm, id_task, (jlong) (long)(task)); + (*env)->SetBooleanField(env, jcomm, id_receiving, JNI_TRUE); + + (*env)->ReleaseStringUTFChars(env, jmailbox, mailbox); + + return jcomm; +} + +JNIEXPORT jobject JNICALL +Java_org_simgrid_msg_Task_isend(JNIEnv *env, jobject jtask, jstring jmailbox) { + jclass comm_class; + jmethodID cid; + jfieldID id_bind; + jfieldID id_bind_task; + jfieldID id_receiving; + jobject jcomm; + const char *mailbox; + m_task_t task; + msg_comm_t comm; + + comm_class = (*env)->FindClass(env, "org/simgrid/msg/Comm"); + cid = (*env)->GetMethodID(env, comm_class, "", "()V"); + id_bind = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J"); + id_bind_task = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bindTask", "J"); + id_receiving = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "receiving", "Z"); + + if (!comm_class || !cid || !id_bind || !id_bind_task || !id_receiving) return NULL; + + jcomm = (*env)->NewObject(env, comm_class, cid); + mailbox = (*env)->GetStringUTFChars(env, jmailbox, 0); + + task = jtask_to_native_task(jtask, env); + + if (!task) { + (*env)->ReleaseStringUTFChars(env, jmailbox, mailbox); + (*env)->DeleteLocalRef(env, jcomm); + jxbt_throw_notbound(env, "task", jtask); + return NULL; + } + MSG_task_set_data(task, (void *) (*env)->NewGlobalRef(env, jtask)); + comm = MSG_task_isend(task,mailbox); + + (*env)->SetLongField(env, jcomm, id_bind, (jlong) (long)(comm)); + (*env)->SetLongField(env, jcomm, id_bind_task, (jlong) (long)(NULL)); + (*env)->SetBooleanField(env, jcomm, id_receiving, JNI_FALSE); (*env)->ReleaseStringUTFChars(env, jmailbox, mailbox); + + return jcomm; } diff --git a/src/jmsg_task.h b/src/jmsg_task.h index 32e803a03d..7b9c3d8eb4 100644 --- a/src/jmsg_task.h +++ b/src/jmsg_task.h @@ -75,8 +75,10 @@ m_task_t jtask_to_native_task(jobject jtask, JNIEnv * env); */ jboolean jtask_is_valid(jobject jtask, JNIEnv * env); -JNIEXPORT void JNICALL -Java_org_simgrid_msg_Task_irecvBind(JNIEnv * env, jclass cls, jobject jcomm, jstring jmailbox); +JNIEXPORT jobject JNICALL +Java_org_simgrid_msg_Task_irecv(JNIEnv * env, jclass cls, jstring jmailbox); +JNIEXPORT jobject JNICALL +Java_org_simgrid_msg_Task_isend(JNIEnv *env, jobject jtask, jstring jmailbox); #endif /* !MSG_JTASK_H */ -- 2.20.1