Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add asynchronous API (except wait) for communications
authorSamuel Lepetit <samuel.lepetit@inria.fr>
Thu, 3 May 2012 17:23:33 +0000 (19:23 +0200)
committerSamuel Lepetit <samuel.lepetit@inria.fr>
Thu, 3 May 2012 17:23:33 +0000 (19:23 +0200)
CMakeLists.txt
examples/async/Master.java
examples/async/Slave.java
org/simgrid/msg/Comm.java [new file with mode: 0644]
org/simgrid/msg/Task.java
src/jmsg.c
src/jmsg_comm.c [new file with mode: 0644]
src/jmsg_comm.h [new file with mode: 0644]
src/jmsg_task.c
src/jmsg_task.h

index 72f289e..86c1853 100644 (file)
@@ -78,6 +78,8 @@ set(JMSG_C_SRC
        src/jxbt_utilities.h
        src/jmsg.c 
        src/jmsg.h
+       src/jmsg_comm.c
+       src/jmsg_comm.h
        src/jmsg_host.c
        src/jmsg_host.h
        src/jmsg_process.c
@@ -108,6 +110,7 @@ set(JMSG_JAVA_SRC
        org/simgrid/msg/TimeoutException.java
        org/simgrid/msg/TransferFailureException.java   
        org/simgrid/msg/Mutex.java
+       org/simgrid/msg/Comm.java
 )
 
 set(JAVA_EXAMPLES
index 88d6d12..e7a89f2 100644 (file)
@@ -33,6 +33,7 @@ public class Master extends Process {
                        //Msg.info("Sending \"" + task.getName()+ "\" to \"slave_" + i % slavesCount + "\"");
                        task.send("slave_"+(i%slavesCount));
                }
+               simulatedSleep(1);
 
                Msg.info("All tasks have been dispatched. Let's tell (asynchronously) everybody the computation is over, and sleep 20s so that nobody gets a message from a terminated process.");
 
index 3d2270b..e2865a7 100644 (file)
@@ -5,6 +5,7 @@
  * under the terms of the license (GNU LGPL) which comes with this package. 
  */
 package async;
+import org.simgrid.msg.Comm;
 import org.simgrid.msg.HostFailureException;
 import org.simgrid.msg.Msg;
 import org.simgrid.msg.Task;
@@ -21,23 +22,40 @@ public class Slave extends Process {
                }
 
                int num = Integer.valueOf(args[0]).intValue();
-               //Msg.info("Receiving on 'slave_"+num+"'");
-
-               while(true) {  
-                       Task task = Task.receive("slave_"+num); 
-
-                       if (task instanceof FinalizeTask) {
-                               break;
+               
+               Comm comm = null;
+               boolean slaveFinished = false;
+               while(!slaveFinished) {  
+                       try
+                       {
+                               if (comm == null) {
+                                       Msg.info("Receiving on 'slave_" + num + "'");
+                                       comm = Task.irecv("slave_" + num);
+                               }
+                               else {
+                                       if (comm.test()) {
+                                               Task task = comm.getTask();
+       
+                                               if (task instanceof FinalizeTask) {
+                                                       break;
+                                               }
+                                               Msg.info("Received \"" + task.getName() +  "\". Processing it.");
+                                               try {
+                                                       task.execute();
+                                               } catch (TaskCancelledException e) {
+                                                       
+                                               }
+                                       }
+                                       else {
+                                               simulatedSleep(1);
+                                       }
+                               }
                        }
-                       Msg.info("Received \"" + task.getName() +  "\". Processing it.");
-                       try {
-                               task.execute();
-                       } catch (TaskCancelledException e) {
+                       catch (Exception e) {
 
                        }
-               //      Msg.info("\"" + task.getName() + "\" done ");
                }
-
                Msg.info("Received Finalize. I'm done. See you!");
+               simulatedSleep(20);
        }
 }
\ No newline at end of file
diff --git a/org/simgrid/msg/Comm.java b/org/simgrid/msg/Comm.java
new file mode 100644 (file)
index 0000000..2865757
--- /dev/null
@@ -0,0 +1,60 @@
+package org.simgrid.msg;
+/**
+* Copyright 2012 The SimGrid team. All right reserved. 
+*
+* This program is free software; you can redistribute 
+* it and/or modify it under the terms of the license 
+* (GNU LGPL) which comes with this package.
+*
+*/
+/**
+ * Communication action, representing an ongoing communication
+ * between processes.
+ */
+public class Comm {
+       /**
+        * Represents the bind between the java comm and the
+        * native C comm. You must never access it, since it is 
+        * automatically set.
+        */
+       public long bind = 0;
+       /**
+        * Represents the bind for the task object pointer. Don't touch it.
+        */
+       public long bindTask = 0;
+       /**
+        * Task associated with the comm. Beware, it can be null 
+        */
+       protected Task task = null;
+       /**
+        * Protected constructor, used by Comm factories
+        * in Task.
+        */
+       protected Comm() {
+
+       }
+       /**
+        * Finalize the communication object, destroying it.
+        */
+       protected void finalize() {
+               unbind();
+       }
+       /**
+        * Unbind the communication object
+        */
+       public native void unbind();
+       /**
+        * Returns if the communication is finished or not.
+        * If the communication has finished and there was an error,
+        * raise an exception.
+        */
+       public native boolean test() throws TransferFailureException, HostFailureException, TimeoutException ;
+       /**
+        * Returns the task associated with the communication.
+        * if the communication isn't finished yet, will return null.
+        */
+       public Task getTask() {
+               return task;
+       }
+       
+}
index 0e00aff..5a08ec6 100644 (file)
@@ -137,6 +137,7 @@ public class Task {
                        MsgNative.taskDestroy(this);
        }
 
+
        /** Send the task asynchronously on the mailbox identified by the specified name, 
         *  with no way to retrieve whether the communication succeeded or not
         * 
@@ -183,7 +184,17 @@ public class Task {
        public void sendBounded(String alias, double maxrate) throws TransferFailureException, HostFailureException, TimeoutException {
                MsgNative.taskSendBounded(alias, this, maxrate);
        } 
-
+       /**
+        * Starts listening for receiving a task from an asynchronous communication
+        * @param mailbox
+        * @return
+        */
+       public static Comm irecv(String mailbox) {
+               Comm comm = new Comm();
+               irecvBind(comm,mailbox);
+               return comm;
+       }
+       public static native void irecvBind(Comm comm, String mailbox);
        /**
         * Retrieves next task from the mailbox identified by the specified name
         *
index b268837..9bf60c8 100644 (file)
@@ -547,7 +547,7 @@ Java_org_simgrid_msg_MsgNative_taskCreate(JNIEnv * env, jclass cls,
   task =
       MSG_task_create(name, (double) jcomputeDuration,
                       (double) jmessageSize, NULL);
-
+  XBT_INFO("Name: %s %p",name,task);
   if (jname)
     (*env)->ReleaseStringUTFChars(env, jname, name);
 
diff --git a/src/jmsg_comm.c b/src/jmsg_comm.c
new file mode 100644 (file)
index 0000000..17a714e
--- /dev/null
@@ -0,0 +1,81 @@
+/* Functions related to the java comm instances                                                                                                                                */
+
+/* Copyright (c) 2012. The SimGrid Team. All rights reserved.                   */
+#include "jmsg_comm.h"
+#include "jxbt_utilities.h"
+#include <msg/msg.h>
+XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(jmsg);
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_Comm_unbind(JNIEnv *env, jobject jcomm) {
+       jfieldID id = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J");
+       jfieldID id_task = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bindTask", "J");
+       msg_comm_t comm;
+       m_task_t *task_received;
+
+       if (!id || !id_task)
+               return;
+
+       task_received = (m_task_t*)  (long) (*env)->GetLongField(env, jcomm, id_task);
+       xbt_free(task_received);
+
+       comm = (msg_comm_t) (long) (*env)->GetLongField(env, jcomm, id);
+       MSG_comm_destroy(comm);
+}
+
+JNIEXPORT jboolean JNICALL
+Java_org_simgrid_msg_Comm_test(JNIEnv *env, jobject jcomm) {
+       msg_comm_t comm;
+
+       jfieldID idComm = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J");
+       jclass jclass = jxbt_get_class(env,"org/simgrid/msg/Comm");
+       jfieldID idTask = jxbt_get_jfield(env, jclass, "task", "Lorg/simgrid/msg/Task;");
+       if (!idComm || !idTask) {
+               jxbt_throw_native(env,bprintf("idTask or idComm is null"));
+               return JNI_FALSE;
+       }
+
+       comm = (msg_comm_t) (long) (*env)->GetLongField(env, jcomm, idComm);
+       if (!comm) {
+               jxbt_throw_native(env,bprintf("comm is null"));
+               return JNI_FALSE;
+       }
+       if (MSG_comm_test(comm)) {
+               MSG_error_t status = MSG_comm_get_status(comm);
+
+               if (status == MSG_OK) {
+                       //bind the task object.
+                       m_task_t task = MSG_comm_get_task(comm);
+                       jobject jtask_global = MSG_task_get_data(task);
+                       jobject jtask_local = (*env)->NewLocalRef(env, jtask_global);
+
+                       (*env)->DeleteGlobalRef(env, jtask_global);
+
+                       (*env)->SetObjectField(env, jcomm, idTask, jtask_local);
+
+                       MSG_task_set_data(task, NULL);
+
+                       return JNI_TRUE;
+               }
+               else {
+                       //send the correct exception
+                       switch (status) {
+                               case MSG_TIMEOUT:
+                                       jxbt_throw_time_out_failure(env,NULL);
+                               break;
+                               case MSG_TRANSFER_FAILURE:
+                                       jxbt_throw_transfer_failure(env,NULL);
+                               break;
+                               case MSG_HOST_FAILURE:
+                                       jxbt_throw_host_failure(env,NULL);
+                               break;
+                               default:
+                                       jxbt_throw_native(env,bprintf("receive failed"));
+                       }
+                       return JNI_FALSE;
+               }
+       }
+       else {
+               return JNI_FALSE;
+       }
+}
diff --git a/src/jmsg_comm.h b/src/jmsg_comm.h
new file mode 100644 (file)
index 0000000..1d5a342
--- /dev/null
@@ -0,0 +1,14 @@
+/* Functions related to the java comm instances                                                                                                                                */
+
+/* Copyright (c) 2012. The SimGrid Team. All rights reserved.                   */
+
+#ifndef MSG_JCOMM_H
+#define MSG_JCOMM_H
+#include <jni.h>
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_Comm_unbind(JNIEnv *env, jobject jcomm);
+
+JNIEXPORT jboolean JNICALL
+Java_org_simgrid_msg_Comm_test(JNIEnv *env, jobject jcomm);
+#endif /* MSG_JCOMM_H */
index 2c149a5..c66babe 100644 (file)
@@ -10,6 +10,9 @@
 #include "jmsg_task.h"
 #include "jxbt_utilities.h"
 
+#include <msg/msg.h>
+XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(jmsg);
+
 void jtask_bind(jobject jtask, m_task_t task, JNIEnv * env)
 {
   jfieldID id = jxbt_get_sfield(env, "org/simgrid/msg/Task", "bind", "J");
@@ -39,3 +42,28 @@ jboolean jtask_is_valid(jobject jtask, JNIEnv * env)
 
   return (*env)->GetLongField(env, jtask, id) ? JNI_TRUE : JNI_FALSE;
 }
+
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_Task_irecvBind(JNIEnv * env, jclass cls, jobject jcomm, jstring jmailbox) {
+       msg_comm_t comm;
+       const char *mailbox;
+       //pointer to store the task object pointer.
+       m_task_t *task = xbt_new(m_task_t,1);
+       *task = NULL;
+       /* There should be a cache here */
+       jfieldID id = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bind", "J");
+       jfieldID id_task = jxbt_get_sfield(env, "org/simgrid/msg/Comm", "bindTask", "J");
+
+       if (!id || !id_task)
+               return;
+
+
+       mailbox = (*env)->GetStringUTFChars(env, jmailbox, 0);
+
+       comm = MSG_task_irecv(task,mailbox);
+
+       (*env)->SetLongField(env, jcomm, id, (jlong) (long)(comm));
+       (*env)->SetLongField(env, jcomm, id_task, (jlong) (long)(task));
+
+       (*env)->ReleaseStringUTFChars(env, jmailbox, mailbox);
+}
index 7b07a55..32e803a 100644 (file)
@@ -75,4 +75,8 @@ m_task_t jtask_to_native_task(jobject jtask, JNIEnv * env);
  */
 jboolean jtask_is_valid(jobject jtask, JNIEnv * env);
 
+JNIEXPORT void JNICALL
+Java_org_simgrid_msg_Task_irecvBind(JNIEnv * env, jclass cls, jobject jcomm, jstring jmailbox);
+
+
 #endif                          /* !MSG_JTASK_H */