From: Samuel Lepetit Date: Thu, 3 May 2012 17:23:33 +0000 (+0200) Subject: Add asynchronous API (except wait) for communications X-Git-Tag: v3_9_90~569^2~19^2~111 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/eb86ef55ebafd813f093d34b293254fc1587a7b2?ds=sidebyside Add asynchronous API (except wait) for communications --- diff --git a/CMakeLists.txt b/CMakeLists.txt index 72f289e488..86c18538db 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -78,6 +78,8 @@ set(JMSG_C_SRC src/jxbt_utilities.h src/jmsg.c src/jmsg.h + src/jmsg_comm.c + src/jmsg_comm.h src/jmsg_host.c src/jmsg_host.h src/jmsg_process.c @@ -108,6 +110,7 @@ set(JMSG_JAVA_SRC org/simgrid/msg/TimeoutException.java org/simgrid/msg/TransferFailureException.java org/simgrid/msg/Mutex.java + org/simgrid/msg/Comm.java ) set(JAVA_EXAMPLES diff --git a/examples/async/Master.java b/examples/async/Master.java index 88d6d1249e..e7a89f25b1 100644 --- a/examples/async/Master.java +++ b/examples/async/Master.java @@ -33,6 +33,7 @@ public class Master extends Process { //Msg.info("Sending \"" + task.getName()+ "\" to \"slave_" + i % slavesCount + "\""); task.send("slave_"+(i%slavesCount)); } + simulatedSleep(1); Msg.info("All tasks have been dispatched. Let's tell (asynchronously) everybody the computation is over, and sleep 20s so that nobody gets a message from a terminated process."); diff --git a/examples/async/Slave.java b/examples/async/Slave.java index 3d2270b0d4..e2865a74b9 100644 --- a/examples/async/Slave.java +++ b/examples/async/Slave.java @@ -5,6 +5,7 @@ * under the terms of the license (GNU LGPL) which comes with this package. */ package async; +import org.simgrid.msg.Comm; import org.simgrid.msg.HostFailureException; import org.simgrid.msg.Msg; import org.simgrid.msg.Task; @@ -21,23 +22,40 @@ public class Slave extends Process { } int num = Integer.valueOf(args[0]).intValue(); - //Msg.info("Receiving on 'slave_"+num+"'"); - - while(true) { - Task task = Task.receive("slave_"+num); - - if (task instanceof FinalizeTask) { - break; + + Comm comm = null; + boolean slaveFinished = false; + while(!slaveFinished) { + try + { + if (comm == null) { + Msg.info("Receiving on 'slave_" + num + "'"); + comm = Task.irecv("slave_" + num); + } + else { + if (comm.test()) { + Task task = comm.getTask(); + + if (task instanceof FinalizeTask) { + break; + } + Msg.info("Received \"" + task.getName() + "\". Processing it."); + try { + task.execute(); + } catch (TaskCancelledException e) { + + } + } + else { + simulatedSleep(1); + } + } } - Msg.info("Received \"" + task.getName() + "\". Processing it."); - try { - task.execute(); - } catch (TaskCancelledException e) { + catch (Exception e) { } - // Msg.info("\"" + task.getName() + "\" done "); } - Msg.info("Received Finalize. I'm done. See you!"); + simulatedSleep(20); } } \ No newline at end of file diff --git a/org/simgrid/msg/Comm.java b/org/simgrid/msg/Comm.java new file mode 100644 index 0000000000..2865757c34 --- /dev/null +++ b/org/simgrid/msg/Comm.java @@ -0,0 +1,60 @@ +package org.simgrid.msg; +/** +* Copyright 2012 The SimGrid team. All right reserved. +* +* This program is free software; you can redistribute +* it and/or modify it under the terms of the license +* (GNU LGPL) which comes with this package. +* +*/ +/** + * Communication action, representing an ongoing communication + * between processes. + */ +public class Comm { + /** + * Represents the bind between the java comm and the + * native C comm. You must never access it, since it is + * automatically set. + */ + public long bind = 0; + /** + * Represents the bind for the task object pointer. Don't touch it. + */ + public long bindTask = 0; + /** + * Task associated with the comm. Beware, it can be null + */ + protected Task task = null; + /** + * Protected constructor, used by Comm factories + * in Task. + */ + protected Comm() { + + } + /** + * Finalize the communication object, destroying it. + */ + protected void finalize() { + unbind(); + } + /** + * Unbind the communication object + */ + public native void unbind(); + /** + * Returns if the communication is finished or not. + * If the communication has finished and there was an error, + * raise an exception. + */ + public native boolean test() throws TransferFailureException, HostFailureException, TimeoutException ; + /** + * Returns the task associated with the communication. + * if the communication isn't finished yet, will return null. + */ + public Task getTask() { + return task; + } + +} diff --git a/org/simgrid/msg/Task.java b/org/simgrid/msg/Task.java index 0e00afff57..5a08ec6e67 100644 --- a/org/simgrid/msg/Task.java +++ b/org/simgrid/msg/Task.java @@ -137,6 +137,7 @@ public class Task { MsgNative.taskDestroy(this); } + /** Send the task asynchronously on the mailbox identified by the specified name, * with no way to retrieve whether the communication succeeded or not * @@ -183,7 +184,17 @@ public class Task { public void sendBounded(String alias, double maxrate) throws TransferFailureException, HostFailureException, TimeoutException { MsgNative.taskSendBounded(alias, this, maxrate); } - + /** + * Starts listening for receiving a task from an asynchronous communication + * @param mailbox + * @return + */ + public static Comm irecv(String mailbox) { + Comm comm = new Comm(); + irecvBind(comm,mailbox); + return comm; + } + public static native void irecvBind(Comm comm, String mailbox); /** * Retrieves next task from the mailbox identified by the specified name * diff --git a/src/jmsg.c b/src/jmsg.c index b268837e0d..9bf60c8fad 100644 --- a/src/jmsg.c +++ b/src/jmsg.c @@ -547,7 +547,7 @@ Java_org_simgrid_msg_MsgNative_taskCreate(JNIEnv * env, jclass cls, task = MSG_task_create(name, (double) jcomputeDuration, (double) jmessageSize, NULL); - + XBT_INFO("Name: %s %p",name,task); if (jname) (*env)->ReleaseStringUTFChars(env, jname, name); diff --git a/src/jmsg_comm.c b/src/jmsg_comm.c new file mode 100644 index 0000000000..17a714ecdb --- /dev/null +++ b/src/jmsg_comm.c @@ -0,0 +1,81 @@ +/* Functions related to the java comm instances */ + +/* Copyright (c) 2012. The SimGrid Team. All rights reserved. */ +#include "jmsg_comm.h" +#include "jxbt_utilities.h" +#include +XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(jmsg); + +JNIEXPORT void JNICALL +Java_org_simgrid_msg_Comm_unbind(JNIEnv *env, jobject jcomm) { + jfieldID id = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J"); + jfieldID id_task = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bindTask", "J"); + msg_comm_t comm; + m_task_t *task_received; + + if (!id || !id_task) + return; + + task_received = (m_task_t*) (long) (*env)->GetLongField(env, jcomm, id_task); + xbt_free(task_received); + + comm = (msg_comm_t) (long) (*env)->GetLongField(env, jcomm, id); + MSG_comm_destroy(comm); +} + +JNIEXPORT jboolean JNICALL +Java_org_simgrid_msg_Comm_test(JNIEnv *env, jobject jcomm) { + msg_comm_t comm; + + jfieldID idComm = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J"); + jclass jclass = jxbt_get_class(env,"org/simgrid/msg/Comm"); + jfieldID idTask = jxbt_get_jfield(env, jclass, "task", "Lorg/simgrid/msg/Task;"); + if (!idComm || !idTask) { + jxbt_throw_native(env,bprintf("idTask or idComm is null")); + return JNI_FALSE; + } + + comm = (msg_comm_t) (long) (*env)->GetLongField(env, jcomm, idComm); + if (!comm) { + jxbt_throw_native(env,bprintf("comm is null")); + return JNI_FALSE; + } + if (MSG_comm_test(comm)) { + MSG_error_t status = MSG_comm_get_status(comm); + + if (status == MSG_OK) { + //bind the task object. + m_task_t task = MSG_comm_get_task(comm); + jobject jtask_global = MSG_task_get_data(task); + jobject jtask_local = (*env)->NewLocalRef(env, jtask_global); + + (*env)->DeleteGlobalRef(env, jtask_global); + + (*env)->SetObjectField(env, jcomm, idTask, jtask_local); + + MSG_task_set_data(task, NULL); + + return JNI_TRUE; + } + else { + //send the correct exception + switch (status) { + case MSG_TIMEOUT: + jxbt_throw_time_out_failure(env,NULL); + break; + case MSG_TRANSFER_FAILURE: + jxbt_throw_transfer_failure(env,NULL); + break; + case MSG_HOST_FAILURE: + jxbt_throw_host_failure(env,NULL); + break; + default: + jxbt_throw_native(env,bprintf("receive failed")); + } + return JNI_FALSE; + } + } + else { + return JNI_FALSE; + } +} diff --git a/src/jmsg_comm.h b/src/jmsg_comm.h new file mode 100644 index 0000000000..1d5a3427b5 --- /dev/null +++ b/src/jmsg_comm.h @@ -0,0 +1,14 @@ +/* Functions related to the java comm instances */ + +/* Copyright (c) 2012. The SimGrid Team. All rights reserved. */ + +#ifndef MSG_JCOMM_H +#define MSG_JCOMM_H +#include + +JNIEXPORT void JNICALL +Java_org_simgrid_msg_Comm_unbind(JNIEnv *env, jobject jcomm); + +JNIEXPORT jboolean JNICALL +Java_org_simgrid_msg_Comm_test(JNIEnv *env, jobject jcomm); +#endif /* MSG_JCOMM_H */ diff --git a/src/jmsg_task.c b/src/jmsg_task.c index 2c149a541e..c66babe94f 100644 --- a/src/jmsg_task.c +++ b/src/jmsg_task.c @@ -10,6 +10,9 @@ #include "jmsg_task.h" #include "jxbt_utilities.h" +#include +XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(jmsg); + void jtask_bind(jobject jtask, m_task_t task, JNIEnv * env) { jfieldID id = jxbt_get_sfield(env, "org/simgrid/msg/Task", "bind", "J"); @@ -39,3 +42,28 @@ jboolean jtask_is_valid(jobject jtask, JNIEnv * env) return (*env)->GetLongField(env, jtask, id) ? JNI_TRUE : JNI_FALSE; } + +JNIEXPORT void JNICALL +Java_org_simgrid_msg_Task_irecvBind(JNIEnv * env, jclass cls, jobject jcomm, jstring jmailbox) { + msg_comm_t comm; + const char *mailbox; + //pointer to store the task object pointer. + m_task_t *task = xbt_new(m_task_t,1); + *task = NULL; + /* There should be a cache here */ + jfieldID id = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J"); + jfieldID id_task = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bindTask", "J"); + + if (!id || !id_task) + return; + + + mailbox = (*env)->GetStringUTFChars(env, jmailbox, 0); + + comm = MSG_task_irecv(task,mailbox); + + (*env)->SetLongField(env, jcomm, id, (jlong) (long)(comm)); + (*env)->SetLongField(env, jcomm, id_task, (jlong) (long)(task)); + + (*env)->ReleaseStringUTFChars(env, jmailbox, mailbox); +} diff --git a/src/jmsg_task.h b/src/jmsg_task.h index 7b07a55c0d..32e803a03d 100644 --- a/src/jmsg_task.h +++ b/src/jmsg_task.h @@ -75,4 +75,8 @@ m_task_t jtask_to_native_task(jobject jtask, JNIEnv * env); */ jboolean jtask_is_valid(jobject jtask, JNIEnv * env); +JNIEXPORT void JNICALL +Java_org_simgrid_msg_Task_irecvBind(JNIEnv * env, jclass cls, jobject jcomm, jstring jmailbox); + + #endif /* !MSG_JTASK_H */