Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add isend in Task
authorSamuel Lepetit <samuel.lepetit@inria.fr>
Fri, 4 May 2012 11:02:11 +0000 (13:02 +0200)
committerSamuel Lepetit <samuel.lepetit@inria.fr>
Fri, 4 May 2012 11:02:11 +0000 (13:02 +0200)
examples/async/Master.java
examples/async/Slave.java
org/simgrid/msg/Comm.java
org/simgrid/msg/Process.java
org/simgrid/msg/Task.java
src/jmsg.c
src/jmsg_comm.c
src/jmsg_task.c
src/jmsg_task.h

index 88d6d12..6c28a36 100644 (file)
@@ -8,6 +8,9 @@
  */
 
 package async;
  */
 
 package async;
+import java.util.ArrayList;
+
+import org.simgrid.msg.Comm;
 import org.simgrid.msg.Msg;
 import org.simgrid.msg.MsgException;
 import org.simgrid.msg.Task;
 import org.simgrid.msg.Msg;
 import org.simgrid.msg.MsgException;
 import org.simgrid.msg.Task;
@@ -27,13 +30,32 @@ public class Master extends Process {
                int slavesCount = Integer.valueOf(args[3]).intValue();
 
                Msg.info("Hello! Got "+  slavesCount + " slaves and "+tasksCount+" tasks to process");
                int slavesCount = Integer.valueOf(args[3]).intValue();
 
                Msg.info("Hello! Got "+  slavesCount + " slaves and "+tasksCount+" tasks to process");
-
+               
+               ArrayList<Comm> comms = new ArrayList<Comm>();
+               
                for (int i = 0; i < tasksCount; i++) {
                        Task task = new Task("Task_" + i, taskComputeSize, taskCommunicateSize); 
                for (int i = 0; i < tasksCount; i++) {
                        Task task = new Task("Task_" + i, taskComputeSize, taskCommunicateSize); 
-                       //Msg.info("Sending \"" + task.getName()+ "\" to \"slave_" + i % slavesCount + "\"");
-                       task.send("slave_"+(i%slavesCount));
+                       Msg.info("Sending \"" + task.getName()+ "\" to \"slave_" + i % slavesCount + "\"");
+                       //task.send("slave_"+(i%slavesCount));
+                       Comm comm = task.isend("slave_"+(i%slavesCount));
+                       comms.add(comm);
                }
                }
-
+               
+               while (comms.size() > 0) {
+                       for (int i = 0; i < comms.size(); i++) {
+                               try {
+                                       if (comms.get(i).test()) {
+                                               comms.remove(i);
+                                               i--;
+                                       }
+                               }
+                               catch (Exception e) {
+                                       e.printStackTrace();
+                               }
+                       }
+                       simulatedSleep(1);
+               }
+               
                Msg.info("All tasks have been dispatched. Let's tell (asynchronously) everybody the computation is over, and sleep 20s so that nobody gets a message from a terminated process.");
 
                for (int i = 0; i < slavesCount; i++) {
                Msg.info("All tasks have been dispatched. Let's tell (asynchronously) everybody the computation is over, and sleep 20s so that nobody gets a message from a terminated process.");
 
                for (int i = 0; i < slavesCount; i++) {
index f1a8fab..ab07cac 100644 (file)
@@ -40,6 +40,7 @@ public class Slave extends Process {
                                                        comm = null;
                                                        break;
                                                }
                                                        comm = null;
                                                        break;
                                                }
+                                               Msg.info("Received a task");
                                                Msg.info("Received \"" + task.getName() +  "\". Processing it.");
                                                try {
                                                        task.execute();
                                                Msg.info("Received \"" + task.getName() +  "\". Processing it.");
                                                try {
                                                        task.execute();
index 2865757..0aae5c4 100644 (file)
@@ -12,6 +12,10 @@ package org.simgrid.msg;
  * between processes.
  */
 public class Comm {
  * between processes.
  */
 public class Comm {
+       /**
+        * Indicates if the communication is a receiving communication
+        */
+       boolean receiving;
        /**
         * Represents the bind between the java comm and the
         * native C comm. You must never access it, since it is 
        /**
         * Represents the bind between the java comm and the
         * native C comm. You must never access it, since it is 
@@ -36,13 +40,13 @@ public class Comm {
        /**
         * Finalize the communication object, destroying it.
         */
        /**
         * Finalize the communication object, destroying it.
         */
-       protected void finalize() {
+       protected void finalize() throws Throwable {
                unbind();
        }
        /**
         * Unbind the communication object
         */
                unbind();
        }
        /**
         * Unbind the communication object
         */
-       public native void unbind();
+       public native void unbind() throws NativeException;
        /**
         * Returns if the communication is finished or not.
         * If the communication has finished and there was an error,
        /**
         * Returns if the communication is finished or not.
         * If the communication has finished and there was an error,
index fb69cd1..2aceb10 100644 (file)
@@ -185,7 +185,7 @@ public abstract class Process extends Thread {
                try {
                        MsgNative.processCreate(this, host.getName());
                } catch (HostNotFoundException e) {
                try {
                        MsgNative.processCreate(this, host.getName());
                } catch (HostNotFoundException e) {
-                       throw new RuntimeException("The impossible happend (yet again): the host that I have were not found",e);
+                       throw new RuntimeException("The impossible happened (yet again): the host that I have were not found",e);
                }
                
        }
                }
                
        }
index 5a08ec6..a680e64 100644 (file)
@@ -184,17 +184,17 @@ public class Task {
        public void sendBounded(String alias, double maxrate) throws TransferFailureException, HostFailureException, TimeoutException {
                MsgNative.taskSendBounded(alias, this, maxrate);
        } 
        public void sendBounded(String alias, double maxrate) throws TransferFailureException, HostFailureException, TimeoutException {
                MsgNative.taskSendBounded(alias, this, maxrate);
        } 
+       /**
+        * Sends the task on the mailbox asynchronously
+        */
+       public native Comm isend(String mailbox);
+       
        /**
         * Starts listening for receiving a task from an asynchronous communication
         * @param mailbox
         * @return
         */
        /**
         * Starts listening for receiving a task from an asynchronous communication
         * @param mailbox
         * @return
         */
-       public static Comm irecv(String mailbox) {
-               Comm comm = new Comm();
-               irecvBind(comm,mailbox);
-               return comm;
-       }
-       public static native void irecvBind(Comm comm, String mailbox);
+       public static native Comm irecv(String mailbox);
        /**
         * Retrieves next task from the mailbox identified by the specified name
         *
        /**
         * Retrieves next task from the mailbox identified by the specified name
         *
index f1328c4..bc81dc2 100644 (file)
@@ -758,7 +758,6 @@ JNIEXPORT void JNICALL
 Java_org_simgrid_msg_MsgNative_taskDestroy(JNIEnv * env, jclass cls,
                                        jobject jtask_arg)
 {
 Java_org_simgrid_msg_MsgNative_taskDestroy(JNIEnv * env, jclass cls,
                                        jobject jtask_arg)
 {
-
   /* get the native task */
   m_task_t task = jtask_to_native_task(jtask_arg, env);
 
   /* get the native task */
   m_task_t task = jtask_to_native_task(jtask_arg, env);
 
@@ -766,7 +765,6 @@ Java_org_simgrid_msg_MsgNative_taskDestroy(JNIEnv * env, jclass cls,
     jxbt_throw_notbound(env, "task", task);
     return;
   }
     jxbt_throw_notbound(env, "task", task);
     return;
   }
-
   MSG_error_t rv = MSG_task_destroy(task);
 
   jxbt_check_res("MSG_task_destroy()", rv, MSG_OK,
   MSG_error_t rv = MSG_task_destroy(task);
 
   jxbt_check_res("MSG_task_destroy()", rv, MSG_OK,
index 17a714e..4bdbf41 100644 (file)
@@ -12,12 +12,13 @@ Java_org_simgrid_msg_Comm_unbind(JNIEnv *env, jobject jcomm) {
        jfieldID id_task = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bindTask", "J");
        msg_comm_t comm;
        m_task_t *task_received;
        jfieldID id_task = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bindTask", "J");
        msg_comm_t comm;
        m_task_t *task_received;
-
        if (!id || !id_task)
                return;
 
        task_received = (m_task_t*)  (long) (*env)->GetLongField(env, jcomm, id_task);
        if (!id || !id_task)
                return;
 
        task_received = (m_task_t*)  (long) (*env)->GetLongField(env, jcomm, id_task);
-       xbt_free(task_received);
+       if (task_received != NULL) {
+               xbt_free(task_received);
+       }
 
        comm = (msg_comm_t) (long) (*env)->GetLongField(env, jcomm, id);
        MSG_comm_destroy(comm);
 
        comm = (msg_comm_t) (long) (*env)->GetLongField(env, jcomm, id);
        MSG_comm_destroy(comm);
@@ -30,7 +31,10 @@ Java_org_simgrid_msg_Comm_test(JNIEnv *env, jobject jcomm) {
        jfieldID idComm = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J");
        jclass jclass = jxbt_get_class(env,"org/simgrid/msg/Comm");
        jfieldID idTask = jxbt_get_jfield(env, jclass, "task", "Lorg/simgrid/msg/Task;");
        jfieldID idComm = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J");
        jclass jclass = jxbt_get_class(env,"org/simgrid/msg/Comm");
        jfieldID idTask = jxbt_get_jfield(env, jclass, "task", "Lorg/simgrid/msg/Task;");
-       if (!idComm || !idTask) {
+       jfieldID id_receiving = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "receiving", "Z");
+
+
+       if (!idComm || !idTask || !id_receiving) {
                jxbt_throw_native(env,bprintf("idTask or idComm is null"));
                return JNI_FALSE;
        }
                jxbt_throw_native(env,bprintf("idTask or idComm is null"));
                return JNI_FALSE;
        }
@@ -44,17 +48,27 @@ Java_org_simgrid_msg_Comm_test(JNIEnv *env, jobject jcomm) {
                MSG_error_t status = MSG_comm_get_status(comm);
 
                if (status == MSG_OK) {
                MSG_error_t status = MSG_comm_get_status(comm);
 
                if (status == MSG_OK) {
-                       //bind the task object.
-                       m_task_t task = MSG_comm_get_task(comm);
-                       jobject jtask_global = MSG_task_get_data(task);
-                       jobject jtask_local = (*env)->NewLocalRef(env, jtask_global);
-
-                       (*env)->DeleteGlobalRef(env, jtask_global);
+                       //test if we are receiving or sending a task.
+                       jboolean jreceiving = (*env)->GetBooleanField(env, jcomm, id_receiving);
+                       if (jreceiving == JNI_TRUE) {
+                               //bind the task object.
+                               m_task_t task = MSG_comm_get_task(comm);
+                               xbt_assert(task != NULL, "Task is NULL");
+                               jobject jtask_global = MSG_task_get_data(task);
+                               //case where the data has already been retrieved
+                               if (jtask_global == NULL)
+                               {
+                                       return JNI_TRUE;
+                               }
 
 
-                       (*env)->SetObjectField(env, jcomm, idTask, jtask_local);
+                               //Make sure the data will be correctly gc.
+                               jobject jtask_local = (*env)->NewLocalRef(env, jtask_global);
+                               (*env)->DeleteGlobalRef(env, jtask_global);
 
 
-                       MSG_task_set_data(task, NULL);
+                               (*env)->SetObjectField(env, jcomm, idTask, jtask_local);
 
 
+                               MSG_task_set_data(task, NULL);
+                       }
                        return JNI_TRUE;
                }
                else {
                        return JNI_TRUE;
                }
                else {
index c66babe..4800dd7 100644 (file)
@@ -43,20 +43,36 @@ jboolean jtask_is_valid(jobject jtask, JNIEnv * env)
   return (*env)->GetLongField(env, jtask, id) ? JNI_TRUE : JNI_FALSE;
 }
 
   return (*env)->GetLongField(env, jtask, id) ? JNI_TRUE : JNI_FALSE;
 }
 
-JNIEXPORT void JNICALL
-Java_org_simgrid_msg_Task_irecvBind(JNIEnv * env, jclass cls, jobject jcomm, jstring jmailbox) {
+JNIEXPORT jobject JNICALL
+Java_org_simgrid_msg_Task_irecv(JNIEnv * env, jclass cls, jstring jmailbox) {
        msg_comm_t comm;
        const char *mailbox;
        msg_comm_t comm;
        const char *mailbox;
+       jclass comm_class;
+       jmethodID cid;
+       jfieldID id;
+       jfieldID id_task;
+       jfieldID id_receiving;
        //pointer to store the task object pointer.
        m_task_t *task = xbt_new(m_task_t,1);
        *task = NULL;
        /* There should be a cache here */
        //pointer to store the task object pointer.
        m_task_t *task = xbt_new(m_task_t,1);
        *task = NULL;
        /* There should be a cache here */
-       jfieldID id = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J");
-       jfieldID id_task = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bindTask", "J");
+       comm_class = (*env)->FindClass(env, "org/simgrid/msg/Comm");
+       cid = (*env)->GetMethodID(env, comm_class, "<init>", "()V");
+       id = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J");
+       id_task = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bindTask", "J");
+       id_receiving = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "receiving", "Z");
 
 
-       if (!id || !id_task)
-               return;
 
 
+       if (!id || !id_task || !comm_class || !cid || !id_receiving) {
+               jxbt_throw_native(env,bprintf("fieldID or methodID or class not found."));
+               return NULL;
+       }
+
+       jobject jcomm = (*env)->NewObject(env, comm_class, cid);
+       if (!jcomm) {
+               jxbt_throw_native(env,bprintf("Can't create a Comm object."));
+               return NULL;
+       }
 
        mailbox = (*env)->GetStringUTFChars(env, jmailbox, 0);
 
 
        mailbox = (*env)->GetStringUTFChars(env, jmailbox, 0);
 
@@ -64,6 +80,52 @@ Java_org_simgrid_msg_Task_irecvBind(JNIEnv * env, jclass cls, jobject jcomm, jst
 
        (*env)->SetLongField(env, jcomm, id, (jlong) (long)(comm));
        (*env)->SetLongField(env, jcomm, id_task, (jlong) (long)(task));
 
        (*env)->SetLongField(env, jcomm, id, (jlong) (long)(comm));
        (*env)->SetLongField(env, jcomm, id_task, (jlong) (long)(task));
+       (*env)->SetBooleanField(env, jcomm, id_receiving, JNI_TRUE);
+
+       (*env)->ReleaseStringUTFChars(env, jmailbox, mailbox);
+
+       return jcomm;
+}
+
+JNIEXPORT jobject JNICALL
+Java_org_simgrid_msg_Task_isend(JNIEnv *env, jobject jtask, jstring jmailbox) {
+       jclass comm_class;
+       jmethodID cid;
+       jfieldID id_bind;
+       jfieldID id_bind_task;
+       jfieldID id_receiving;
+       jobject jcomm;
+       const char *mailbox;
+       m_task_t task;
+       msg_comm_t comm;
+
+       comm_class = (*env)->FindClass(env, "org/simgrid/msg/Comm");
+       cid = (*env)->GetMethodID(env, comm_class, "<init>", "()V");
+       id_bind = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J");
+       id_bind_task = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bindTask", "J");
+       id_receiving = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "receiving", "Z");
+
+       if (!comm_class || !cid || !id_bind || !id_bind_task || !id_receiving) return NULL;
+
+       jcomm = (*env)->NewObject(env, comm_class, cid);
+       mailbox = (*env)->GetStringUTFChars(env, jmailbox, 0);
+
+       task = jtask_to_native_task(jtask, env);
+
+       if (!task) {
+    (*env)->ReleaseStringUTFChars(env, jmailbox, mailbox);
+    (*env)->DeleteLocalRef(env, jcomm);
+    jxbt_throw_notbound(env, "task", jtask);
+               return NULL;
+       }
+  MSG_task_set_data(task, (void *) (*env)->NewGlobalRef(env, jtask));
+       comm = MSG_task_isend(task,mailbox);
+
+       (*env)->SetLongField(env, jcomm, id_bind, (jlong) (long)(comm));
+       (*env)->SetLongField(env, jcomm, id_bind_task, (jlong) (long)(NULL));
+       (*env)->SetBooleanField(env, jcomm, id_receiving, JNI_FALSE);
 
        (*env)->ReleaseStringUTFChars(env, jmailbox, mailbox);
 
        (*env)->ReleaseStringUTFChars(env, jmailbox, mailbox);
+
+       return jcomm;
 }
 }
index 32e803a..7b9c3d8 100644 (file)
@@ -75,8 +75,10 @@ m_task_t jtask_to_native_task(jobject jtask, JNIEnv * env);
  */
 jboolean jtask_is_valid(jobject jtask, JNIEnv * env);
 
  */
 jboolean jtask_is_valid(jobject jtask, JNIEnv * env);
 
-JNIEXPORT void JNICALL
-Java_org_simgrid_msg_Task_irecvBind(JNIEnv * env, jclass cls, jobject jcomm, jstring jmailbox);
+JNIEXPORT jobject JNICALL
+Java_org_simgrid_msg_Task_irecv(JNIEnv * env, jclass cls, jstring jmailbox);
 
 
+JNIEXPORT jobject JNICALL
+Java_org_simgrid_msg_Task_isend(JNIEnv *env, jobject jtask, jstring jmailbox);
 
 #endif                          /* !MSG_JTASK_H */
 
 #endif                          /* !MSG_JTASK_H */