Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Process kills now seems to work. May need some additional checking ...
authorLaurent Bobelin <lbobelin@mintcar.lip.ens-lyon.fr>
Thu, 16 Feb 2012 15:09:42 +0000 (16:09 +0100)
committerLaurent Bobelin <lbobelin@mintcar.lip.ens-lyon.fr>
Thu, 16 Feb 2012 15:09:42 +0000 (16:09 +0100)
examples/master_slave_kill/Master.java
examples/master_slave_kill/Slave.java
org/simgrid/msg/Host.java
org/simgrid/msg/Process.java
org/simgrid/msg/Task.java
src/smx_context_java.c

index f9d16a3..e3dc16e 100644 (file)
@@ -41,12 +41,8 @@ public class Master extends Process {
                                }
                }
                
-//             FinalizeTask task = new FinalizeTask();
-//             Msg.info("Send Mail2!");
-//             task.send("mail2");
-               
-               Process.waitFor(10);
                Process.kill(process2);
-               
+
+               Msg.info("Process2 is now killed, should exit now");
        }
 }
index 86fff0b..41b97cb 100644 (file)
@@ -27,6 +27,6 @@ public class Slave extends Process {
        task.send("mail1");
        
        Task task2 = Task.receive("mail2");
-       Msg.info("Receive Mail2!");
+        Msg.info("Receive Mail2!");
        }
-}
\ No newline at end of file
+}
index 186a0ab..e5548b9 100644 (file)
@@ -76,6 +76,7 @@ public class Host {
         */ 
        public static Host getByName(String name) 
        throws HostNotFoundException {
+       Process.ifInterruptedStop();
                if (name==null)
                        throw new NullPointerException("No host can have a null name");
                return MsgNative.hostGetByName(name);
@@ -88,6 +89,7 @@ public class Host {
         *
         */ 
        public static int getCount() {
+       Process.ifInterruptedStop();
                return MsgNative.hostGetCount();
        }
 
@@ -97,6 +99,7 @@ public class Host {
         * @return                      The host on which the current process is executed.
         */ 
        public static Host currentHost() {
+       Process.ifInterruptedStop();
                return MsgNative.hostSelf();
        }
 
@@ -107,6 +110,7 @@ public class Host {
         *
         */ 
        public static Host[] all()  {
+       Process.ifInterruptedStop();
                return MsgNative.allHosts();
        }
 
@@ -117,6 +121,7 @@ public class Host {
         *
         */ 
        public String getName()  {
+       Process.ifInterruptedStop();
                return MsgNative.hostGetName(this);
        }
 
@@ -127,6 +132,7 @@ public class Host {
      * @param data
      */
        public void setData(Object data) {
+       Process.ifInterruptedStop();
                this.data = data;
        } 
        /**
@@ -135,6 +141,7 @@ public class Host {
      * @return
      */
        public Object getData() {
+       Process.ifInterruptedStop();
                return this.data;
        }
 
@@ -144,6 +151,7 @@ public class Host {
      * @return
      */
        public boolean hasData() {
+       Process.ifInterruptedStop();
                return null != this.data;
        }
 
@@ -154,6 +162,7 @@ public class Host {
         * @return                      The number of tasks currently running on a host.
         */ 
        public int getLoad() {
+       Process.ifInterruptedStop();
                return MsgNative.hostGetLoad(this);
        }
 
@@ -165,6 +174,7 @@ public class Host {
         *
         */ 
        public double getSpeed() {
+       Process.ifInterruptedStop();
                return MsgNative.hostGetSpeed(this);
        }
 
@@ -172,6 +182,7 @@ public class Host {
      * @return
      */
        public boolean isAvail() {
+       Process.ifInterruptedStop();
                return MsgNative.hostIsAvail(this);
        }
 } 
index 454566e..94d6bf6 100644 (file)
@@ -94,6 +94,7 @@ public abstract class Process extends Thread {
      *
      */
     protected Sem schedBegin, schedEnd;
+    private boolean nativeStop = false;
 
        /**
         * Default constructor (used in ApplicationHandler to initialize it)
@@ -190,14 +191,50 @@ public abstract class Process extends Thread {
                return MsgNative.processKillAll(resetPID);
        }
 
+       /**
+        * This method sets a flag to indicate that this thread must be killed. End user must use static method kill
+        *
+        * @return                              
+        *                      
+        */ 
+       public void nativeStop()
+       {
+       nativeStop = true;
+       }
+       /**
+        * getter for the flag that indicates that this thread must be killed
+        *
+        * @return                              
+        *                      
+        */ 
+       public boolean getNativeStop()
+       {
+               return nativeStop;
+       }
+       /**
+        * checks  if the flag that indicates that this thread must be killed is set to true; if true, starts to kill it. End users should not have to deal with it
+        * If you develop a new MSG native call, please include a call to interruptedStop() at the beginning of your method code, so as the process can be killed if he call 
+        * your method. 
+        *
+        * @return                              
+        *                      
+        */ 
+       public static void ifInterruptedStop() {
+         if ( (Thread.currentThread() instanceof Process) &&((Process) Thread.currentThread()).getNativeStop()) {                              
+                       throw new RuntimeException("Interrupted");
+               }
+       }
+
 
        /**
-        * This method kill the current process.
+        * This method kill a process.
         * @param process  the process to be killed.
         *
         */
        public static void kill(Process process) {
-                MsgNative.processKill(process);
+                process.nativeStop();
+                Msg.info("Process " + process.msgName() + " will be killed.");                         
+                                
        }
        /**
         * This method adds an argument in the list of the arguments of the main function
@@ -218,6 +255,7 @@ public abstract class Process extends Thread {
         *
         */
        public void pause() {
+               Process.ifInterruptedStop();
                MsgNative.processSuspend(this);
        }
        /**
@@ -227,6 +265,7 @@ public abstract class Process extends Thread {
         *
         */ 
        public void restart()  {
+               Process.ifInterruptedStop();
                MsgNative.processResume(this);
        }
        /**
@@ -236,6 +275,7 @@ public abstract class Process extends Thread {
         *                                              Otherwise the method returns false.
         */ 
        public boolean isSuspended() {
+               Process.ifInterruptedStop();
                return MsgNative.processIsSuspended(this);
        }
        /**
@@ -246,6 +286,7 @@ public abstract class Process extends Thread {
         *
         */ 
        public Host getHost() {
+               Process.ifInterruptedStop();
                return MsgNative.processGetHost(this);
        }
        /**
@@ -258,6 +299,7 @@ public abstract class Process extends Thread {
         * @exception                   NativeException on error in the native SimGrid code
         */ 
        public static Process fromPID(int PID) throws NativeException {
+               Process.ifInterruptedStop();
                return MsgNative.processFromPID(PID);
        }
        /**
@@ -267,6 +309,7 @@ public abstract class Process extends Thread {
         *
         */ 
        public int getPID()  {
+               Process.ifInterruptedStop();
                return MsgNative.processGetPID(this);
        }
        /**
@@ -276,6 +319,7 @@ public abstract class Process extends Thread {
         *
         */ 
        public int getPPID()  {
+               Process.ifInterruptedStop();
                return MsgNative.processGetPPID(this);
        }
        /**
@@ -285,6 +329,7 @@ public abstract class Process extends Thread {
         *
         */ 
        public static Process currentProcess()  {
+               Process.ifInterruptedStop();
                return MsgNative.processSelf();
        }
        /**
@@ -295,6 +340,7 @@ public abstract class Process extends Thread {
         *
         */
        public static void migrate(Process process, Host host)  {
+               Process.ifInterruptedStop();
                MsgNative.processMigrate(process, host);
        }
        /**
@@ -305,12 +351,14 @@ public abstract class Process extends Thread {
         * @exception                   HostFailureException on error in the native SimGrid code
         */ 
        public static void waitFor(double seconds) throws HostFailureException {
+               Process.ifInterruptedStop();
                MsgNative.processWaitFor(seconds);
        } 
     /**
      *
      */
     public void showArgs() {
+               Process.ifInterruptedStop();
                Msg.info("[" + this.name + "/" + this.getHost().getName() + "] argc=" +
                                this.args.size());
                for (int i = 0; i < this.args.size(); i++)
@@ -346,6 +394,19 @@ public abstract class Process extends Thread {
                        Msg.info("Unexpected behavior. Stopping now");
                        System.exit(1);
                }
+                catch(RuntimeException re) {
+                       if (nativeStop)                 
+                       {
+                       MsgNative.processExit(this);
+                       Msg.info(" Process " + ((Process) Thread.currentThread()).msgName() + " has been killed.");                                             
+                       schedEnd.release();                     
+                       }
+                       else {
+                       re.printStackTrace();
+                       Msg.info("Unexpected behavior. Stopping now");
+                       System.exit(1);
+                       }
+               }       
        }
 
        /**
@@ -361,12 +422,11 @@ public abstract class Process extends Thread {
      *
      */
     public void unschedule() {
+               //Process.ifInterruptedStop();
                try {
                        schedEnd.release();
                        schedBegin.acquire();
-               } catch (InterruptedException e) {
-                       /* stopped by jprocess_exit: I must terminate right now */
-                       /* FIXME: how to do that? */
+               } catch (InterruptedException e) {                      
                }
        }
 
@@ -375,6 +435,7 @@ public abstract class Process extends Thread {
      */
     public void schedule() {
           //System.err.println("Scheduling process in Java");
+               //Process.ifInterruptedStop();
                try {
                        schedBegin.release();
                        schedEnd.acquire();
@@ -392,6 +453,7 @@ public abstract class Process extends Thread {
         * @throws HostFailureException 
         * @throws TransferFailureException */
        public void taskSend(String mailbox, Task task, double timeout) throws TransferFailureException, HostFailureException, TimeoutException {
+               Process.ifInterruptedStop();
                MsgNative.taskSend(mailbox, task, timeout);
        }
 
@@ -402,6 +464,7 @@ public abstract class Process extends Thread {
         * @throws HostFailureException 
         * @throws TransferFailureException */
        public void taskSend(String mailbox, Task task) throws  TransferFailureException, HostFailureException, TimeoutException {
+               Process.ifInterruptedStop();
                MsgNative.taskSend(mailbox, task, -1);
        }
 
@@ -413,6 +476,7 @@ public abstract class Process extends Thread {
      * @throws TimeoutException
      */
        public Task taskReceive(String mailbox) throws TransferFailureException, HostFailureException, TimeoutException {
+               Process.ifInterruptedStop();
                return MsgNative.taskReceive(mailbox, -1.0, null);
        }
 
@@ -425,6 +489,7 @@ public abstract class Process extends Thread {
      * @throws TimeoutException
      */
        public Task taskReceive(String mailbox, double timeout) throws  TransferFailureException, HostFailureException, TimeoutException {
+               Process.ifInterruptedStop();
                return MsgNative.taskReceive(mailbox, timeout, null);
        }
 
@@ -438,6 +503,7 @@ public abstract class Process extends Thread {
      * @throws TimeoutException
      */
        public Task taskReceive(String mailbox, double timeout, Host host) throws  TransferFailureException, HostFailureException, TimeoutException {
+               Process.ifInterruptedStop();
                return MsgNative.taskReceive(mailbox, timeout, host);
        }
 
@@ -450,6 +516,7 @@ public abstract class Process extends Thread {
      * @throws TimeoutException
      */
        public Task taskReceive(String mailbox, Host host) throws  TransferFailureException, HostFailureException, TimeoutException {
+               Process.ifInterruptedStop();
                return MsgNative.taskReceive(mailbox, -1.0, host);
        }
 }
index 830805e..c151961 100644 (file)
@@ -68,28 +68,33 @@ public class Task {
      * @return
      */
        public String getName() {
+               Process.ifInterruptedStop();
                return MsgNative.taskGetName(this);
        }
        /** Gets the sender of the task */ 
        Process getSender() {
+               Process.ifInterruptedStop();
                return MsgNative.taskGetSender(this);
        }
     /** Gets the source of the task
      * @return
      */
        public Host getSource()  {
+               Process.ifInterruptedStop();
                return MsgNative.taskGetSource(this);
        }
     /** Gets the computing amount of the task
      * @return
      */
        public double getComputeDuration() {
+               Process.ifInterruptedStop();
                return MsgNative.taskGetComputeDuration(this);
        }
     /** Gets the remaining computation of the task
      * @return
      */
        public double getRemainingDuration() {
+               Process.ifInterruptedStop();
                return MsgNative.taskGetRemainingDuration(this);
        }
        /**
@@ -101,6 +106,7 @@ public class Task {
         * @param priority      The new priority of the task.
         */ 
        public void setPriority(double priority) {
+               Process.ifInterruptedStop();
                MsgNative.taskSetPriority(this, priority);
        }
        /* *                       * *
@@ -119,6 +125,7 @@ public class Task {
      * @throws TaskCancelledException
      */
        public void execute() throws HostFailureException,TaskCancelledException {
+               Process.ifInterruptedStop();
                MsgNative.taskExecute(this);
        }
        /**
@@ -126,6 +133,7 @@ public class Task {
         *
         */ 
        public void cancel()  {
+               Process.ifInterruptedStop();
                MsgNative.taskCancel(this);
        }
        /** Deletes a task.
@@ -133,6 +141,7 @@ public class Task {
         * @exception                   NativeException if the destruction failed.
         */ 
        protected void finalize() throws NativeException {
+               Process.ifInterruptedStop();
                if (this.bind != 0)
                        MsgNative.taskDestroy(this);
        }
@@ -146,6 +155,7 @@ public class Task {
         * @throws TransferFailureException 
         */
        public void send(String mailbox) throws TransferFailureException, HostFailureException, TimeoutException {
+               Process.ifInterruptedStop();
                MsgNative.taskSend(mailbox, this, -1);
        } 
 
@@ -160,6 +170,7 @@ public class Task {
         * @throws TransferFailureException 
         */
        public void send(String mailbox, double timeout) throws NativeException, TransferFailureException, HostFailureException, TimeoutException {
+               Process.ifInterruptedStop();
                MsgNative.taskSend(mailbox, this, timeout);
        } 
 
@@ -173,6 +184,7 @@ public class Task {
      * @throws TimeoutException
         */
        public void sendBounded(String alias, double maxrate) throws TransferFailureException, HostFailureException, TimeoutException {
+               Process.ifInterruptedStop();
                MsgNative.taskSendBounded(alias, this, maxrate);
        } 
 
@@ -187,6 +199,7 @@ public class Task {
         */
 
        public static Task receive(String mailbox) throws TransferFailureException, HostFailureException, TimeoutException {
+               Process.ifInterruptedStop();
                return MsgNative.taskReceive(mailbox, -1.0, null);
        }
 
@@ -201,6 +214,7 @@ public class Task {
      * @throws TimeoutException
         */
        public static Task receive(String mailbox, double timeout) throws  TransferFailureException, HostFailureException, TimeoutException {
+               Process.ifInterruptedStop();
                return MsgNative.taskReceive(mailbox, timeout, null);
        }
 
@@ -216,6 +230,7 @@ public class Task {
         */
 
        public static Task receive(String mailbox, Host host) throws TransferFailureException, HostFailureException, TimeoutException {
+               Process.ifInterruptedStop();
                return MsgNative.taskReceive(mailbox, -1.0, host);
        }
 
@@ -231,6 +246,7 @@ public class Task {
      * @throws TimeoutException
         */
        public static Task receive(String mailbox, double timeout, Host host) throws TransferFailureException, HostFailureException, TimeoutException {
+               Process.ifInterruptedStop();
                return MsgNative.taskReceive(mailbox, timeout, host);
        }
 
@@ -242,6 +258,7 @@ public class Task {
      * @return
      */
        public static int listenFrom(String mailbox)  {
+               Process.ifInterruptedStop();
                return MsgNative.taskListenFrom(mailbox);
        }
        /**
@@ -252,6 +269,7 @@ public class Task {
      * @return
      */
        public static boolean listen(String mailbox)   {
+               Process.ifInterruptedStop();
                return MsgNative.taskListen(mailbox);
        }
 
@@ -264,6 +282,7 @@ public class Task {
      * @return
      */
        public static int listenFromHost(String alias, Host host)   {
+               Process.ifInterruptedStop();
                return MsgNative.taskListenFromHost(alias, host);
        }
 }
index aa2d0c2..c91e1d9 100644 (file)
@@ -97,11 +97,13 @@ static void smx_ctx_java_free(smx_context_t context)
   smx_ctx_base_free(context);
 }
 
+
 void smx_ctx_java_stop(smx_context_t context)
 {
 xbt_assert(context == my_current_context,
-      "The context to stop must be the current one");
+ xbt_assert(context == my_current_context,
+     "The context to stop must be the current one");
   /* I am the current process and I am dying */
+  
   smx_ctx_base_stop(context);
 
   XBT_DEBUG("I am dying");
@@ -109,7 +111,7 @@ void smx_ctx_java_stop(smx_context_t context)
   /* suspend myself again, smx_ctx_java_free() will destroy me later
    * from maeastro */
   jprocess_unschedule(context);
-  xbt_die("This function was not supposed to return");
+  XBT_INFO("Java stop finished");
 }
 
 static void smx_ctx_java_suspend(smx_context_t context)