*/
package async;
+import java.util.ArrayList;
+
+import org.simgrid.msg.Comm;
import org.simgrid.msg.Msg;
import org.simgrid.msg.MsgException;
import org.simgrid.msg.Task;
int slavesCount = Integer.valueOf(args[3]).intValue();
Msg.info("Hello! Got "+ slavesCount + " slaves and "+tasksCount+" tasks to process");
-
+
+ ArrayList<Comm> comms = new ArrayList<Comm>();
+
for (int i = 0; i < tasksCount; i++) {
Task task = new Task("Task_" + i, taskComputeSize, taskCommunicateSize);
- //Msg.info("Sending \"" + task.getName()+ "\" to \"slave_" + i % slavesCount + "\"");
- task.send("slave_"+(i%slavesCount));
+ Msg.info("Sending \"" + task.getName()+ "\" to \"slave_" + i % slavesCount + "\"");
+ //task.send("slave_"+(i%slavesCount));
+ Comm comm = task.isend("slave_"+(i%slavesCount));
+ comms.add(comm);
}
-
+
+ while (comms.size() > 0) {
+ for (int i = 0; i < comms.size(); i++) {
+ try {
+ if (comms.get(i).test()) {
+ comms.remove(i);
+ i--;
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ simulatedSleep(1);
+ }
+
Msg.info("All tasks have been dispatched. Let's tell (asynchronously) everybody the computation is over, and sleep 20s so that nobody gets a message from a terminated process.");
for (int i = 0; i < slavesCount; i++) {
comm = null;
break;
}
+ Msg.info("Received a task");
Msg.info("Received \"" + task.getName() + "\". Processing it.");
try {
task.execute();
* between processes.
*/
public class Comm {
+ /**
+ * Indicates if the communication is a receiving communication
+ */
+ boolean receiving;
/**
* Represents the bind between the java comm and the
* native C comm. You must never access it, since it is
/**
* Finalize the communication object, destroying it.
*/
- protected void finalize() {
+ protected void finalize() throws Throwable {
unbind();
}
/**
* Unbind the communication object
*/
- public native void unbind();
+ public native void unbind() throws NativeException;
/**
* Returns if the communication is finished or not.
* If the communication has finished and there was an error,
try {
MsgNative.processCreate(this, host.getName());
} catch (HostNotFoundException e) {
- throw new RuntimeException("The impossible happend (yet again): the host that I have were not found",e);
+ throw new RuntimeException("The impossible happened (yet again): the host that I have were not found",e);
}
}
public void sendBounded(String alias, double maxrate) throws TransferFailureException, HostFailureException, TimeoutException {
MsgNative.taskSendBounded(alias, this, maxrate);
}
+ /**
+ * Sends the task on the mailbox asynchronously
+ */
+ public native Comm isend(String mailbox);
+
/**
* Starts listening for receiving a task from an asynchronous communication
* @param mailbox
* @return
*/
- public static Comm irecv(String mailbox) {
- Comm comm = new Comm();
- irecvBind(comm,mailbox);
- return comm;
- }
- public static native void irecvBind(Comm comm, String mailbox);
+ public static native Comm irecv(String mailbox);
/**
* Retrieves next task from the mailbox identified by the specified name
*
Java_org_simgrid_msg_MsgNative_taskDestroy(JNIEnv * env, jclass cls,
jobject jtask_arg)
{
-
/* get the native task */
m_task_t task = jtask_to_native_task(jtask_arg, env);
jxbt_throw_notbound(env, "task", task);
return;
}
-
MSG_error_t rv = MSG_task_destroy(task);
jxbt_check_res("MSG_task_destroy()", rv, MSG_OK,
jfieldID id_task = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bindTask", "J");
msg_comm_t comm;
m_task_t *task_received;
-
if (!id || !id_task)
return;
task_received = (m_task_t*) (long) (*env)->GetLongField(env, jcomm, id_task);
- xbt_free(task_received);
+ if (task_received != NULL) {
+ xbt_free(task_received);
+ }
comm = (msg_comm_t) (long) (*env)->GetLongField(env, jcomm, id);
MSG_comm_destroy(comm);
jfieldID idComm = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J");
jclass jclass = jxbt_get_class(env,"org/simgrid/msg/Comm");
jfieldID idTask = jxbt_get_jfield(env, jclass, "task", "Lorg/simgrid/msg/Task;");
- if (!idComm || !idTask) {
+ jfieldID id_receiving = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "receiving", "Z");
+
+
+ if (!idComm || !idTask || !id_receiving) {
jxbt_throw_native(env,bprintf("idTask or idComm is null"));
return JNI_FALSE;
}
MSG_error_t status = MSG_comm_get_status(comm);
if (status == MSG_OK) {
- //bind the task object.
- m_task_t task = MSG_comm_get_task(comm);
- jobject jtask_global = MSG_task_get_data(task);
- jobject jtask_local = (*env)->NewLocalRef(env, jtask_global);
-
- (*env)->DeleteGlobalRef(env, jtask_global);
+ //test if we are receiving or sending a task.
+ jboolean jreceiving = (*env)->GetBooleanField(env, jcomm, id_receiving);
+ if (jreceiving == JNI_TRUE) {
+ //bind the task object.
+ m_task_t task = MSG_comm_get_task(comm);
+ xbt_assert(task != NULL, "Task is NULL");
+ jobject jtask_global = MSG_task_get_data(task);
+ //case where the data has already been retrieved
+ if (jtask_global == NULL)
+ {
+ return JNI_TRUE;
+ }
- (*env)->SetObjectField(env, jcomm, idTask, jtask_local);
+ //Make sure the data will be correctly gc.
+ jobject jtask_local = (*env)->NewLocalRef(env, jtask_global);
+ (*env)->DeleteGlobalRef(env, jtask_global);
- MSG_task_set_data(task, NULL);
+ (*env)->SetObjectField(env, jcomm, idTask, jtask_local);
+ MSG_task_set_data(task, NULL);
+ }
return JNI_TRUE;
}
else {
return (*env)->GetLongField(env, jtask, id) ? JNI_TRUE : JNI_FALSE;
}
-JNIEXPORT void JNICALL
-Java_org_simgrid_msg_Task_irecvBind(JNIEnv * env, jclass cls, jobject jcomm, jstring jmailbox) {
+JNIEXPORT jobject JNICALL
+Java_org_simgrid_msg_Task_irecv(JNIEnv * env, jclass cls, jstring jmailbox) {
msg_comm_t comm;
const char *mailbox;
+ jclass comm_class;
+ jmethodID cid;
+ jfieldID id;
+ jfieldID id_task;
+ jfieldID id_receiving;
//pointer to store the task object pointer.
m_task_t *task = xbt_new(m_task_t,1);
*task = NULL;
/* There should be a cache here */
- jfieldID id = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J");
- jfieldID id_task = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bindTask", "J");
+ comm_class = (*env)->FindClass(env, "org/simgrid/msg/Comm");
+ cid = (*env)->GetMethodID(env, comm_class, "<init>", "()V");
+ id = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J");
+ id_task = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bindTask", "J");
+ id_receiving = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "receiving", "Z");
- if (!id || !id_task)
- return;
+ if (!id || !id_task || !comm_class || !cid || !id_receiving) {
+ jxbt_throw_native(env,bprintf("fieldID or methodID or class not found."));
+ return NULL;
+ }
+
+ jobject jcomm = (*env)->NewObject(env, comm_class, cid);
+ if (!jcomm) {
+ jxbt_throw_native(env,bprintf("Can't create a Comm object."));
+ return NULL;
+ }
mailbox = (*env)->GetStringUTFChars(env, jmailbox, 0);
(*env)->SetLongField(env, jcomm, id, (jlong) (long)(comm));
(*env)->SetLongField(env, jcomm, id_task, (jlong) (long)(task));
+ (*env)->SetBooleanField(env, jcomm, id_receiving, JNI_TRUE);
+
+ (*env)->ReleaseStringUTFChars(env, jmailbox, mailbox);
+
+ return jcomm;
+}
+
+JNIEXPORT jobject JNICALL
+Java_org_simgrid_msg_Task_isend(JNIEnv *env, jobject jtask, jstring jmailbox) {
+ jclass comm_class;
+ jmethodID cid;
+ jfieldID id_bind;
+ jfieldID id_bind_task;
+ jfieldID id_receiving;
+ jobject jcomm;
+ const char *mailbox;
+ m_task_t task;
+ msg_comm_t comm;
+
+ comm_class = (*env)->FindClass(env, "org/simgrid/msg/Comm");
+ cid = (*env)->GetMethodID(env, comm_class, "<init>", "()V");
+ id_bind = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J");
+ id_bind_task = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bindTask", "J");
+ id_receiving = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "receiving", "Z");
+
+ if (!comm_class || !cid || !id_bind || !id_bind_task || !id_receiving) return NULL;
+
+ jcomm = (*env)->NewObject(env, comm_class, cid);
+ mailbox = (*env)->GetStringUTFChars(env, jmailbox, 0);
+
+ task = jtask_to_native_task(jtask, env);
+
+ if (!task) {
+ (*env)->ReleaseStringUTFChars(env, jmailbox, mailbox);
+ (*env)->DeleteLocalRef(env, jcomm);
+ jxbt_throw_notbound(env, "task", jtask);
+ return NULL;
+ }
+ MSG_task_set_data(task, (void *) (*env)->NewGlobalRef(env, jtask));
+ comm = MSG_task_isend(task,mailbox);
+
+ (*env)->SetLongField(env, jcomm, id_bind, (jlong) (long)(comm));
+ (*env)->SetLongField(env, jcomm, id_bind_task, (jlong) (long)(NULL));
+ (*env)->SetBooleanField(env, jcomm, id_receiving, JNI_FALSE);
(*env)->ReleaseStringUTFChars(env, jmailbox, mailbox);
+
+ return jcomm;
}
*/
jboolean jtask_is_valid(jobject jtask, JNIEnv * env);
-JNIEXPORT void JNICALL
-Java_org_simgrid_msg_Task_irecvBind(JNIEnv * env, jclass cls, jobject jcomm, jstring jmailbox);
+JNIEXPORT jobject JNICALL
+Java_org_simgrid_msg_Task_irecv(JNIEnv * env, jclass cls, jstring jmailbox);
+JNIEXPORT jobject JNICALL
+Java_org_simgrid_msg_Task_isend(JNIEnv *env, jobject jtask, jstring jmailbox);
#endif /* !MSG_JTASK_H */