Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
implement the ability to kill processes without relying on ifInterruptedStop
[simgrid.git] / org / simgrid / msg / Process.java
1 /*
2  * $Id$
3  *
4  * Copyright 2006,2007 Martin Quinson, Malek Cherier           
5  * All right reserved. 
6  *
7  * This program is free software; you can redistribute 
8  * it and/or modify it under the terms of the license 
9  *(GNU LGPL) which comes with this package. 
10  */
11
12 package org.simgrid.msg;
13  
14 import java.util.Arrays;
15 import java.util.Hashtable;
16 import java.util.Vector;
17
18 /**
19  * A process may be defined as a code, with some private data, executing 
20  * in a location (host). All the process used by your simulation must be
21  * declared in the deployment file (XML format).
22  * To create your own process you must inherit your own process from this 
23  * class and override the method "main()". For example if you want to use 
24  * a process named Slave proceed as it :
25  *
26  * (1) import the class Process of the package simgrid.msg
27  * import simgrid.msg.Process;
28  * 
29  * public class Slave extends simgrid.msg.Process {
30  *
31  *  (2) Override the method function
32  * 
33  *  \verbatim
34  *      public void main(String[] args) {
35  *              System.out.println("Hello MSG");
36  *      }
37  *  \endverbatim
38  * }
39  * The name of your process must be declared in the deployment file of your simulation.
40  * For the example, for the previous process Slave this file must contains a line :
41  * <process host="Maxims" function="Slave"/>, where Maxims is the host of the process
42  * Slave. All the process of your simulation are automatically launched and managed by Msg.
43  * A process use tasks to simulate communications or computations with another process. 
44  * For more information see Task. For more information on host concept 
45  * see Host.
46  * 
47  */
48
49 public abstract class Process extends Thread {
50         /**
51          * This attribute represents a bind between a java process object and
52          * a native process. Even if this attribute is public you must never
53          * access to it. It is set automatically during the build of the object.
54          */
55         public long bind;
56
57         /**
58          * Even if this attribute is public you must never access to it.
59          * It is used to compute the id of an MSG process.
60          */
61         public static long nextProcessId = 0;
62
63         /**
64          * Even if this attribute is public you must never access to it.
65          * It is compute automatically during the creation of the object. 
66          * The native functions use this identifier to synchronize the process.
67          */
68         public long id;
69
70     /**
71      *
72      */
73     public Hashtable<String,String> properties;
74
75         /**
76          * The name of the process.                                                     
77          */
78         protected String name;
79         /**
80           * The PID of the process
81           */
82         protected int pid = -1;
83         /**
84          * The PPID of the process 
85          */
86         protected int ppid = -1;
87         /**
88          * The host of the process
89          */
90         protected Host host = null;
91     /**
92      *
93      * @return
94      */
95     public String msgName() {
96                 return this.name;
97         }
98         /** The arguments of the method function of the process. */     
99         public Vector<String> args;
100
101         /* process synchronization tools */
102     /**
103      *
104      */
105     /**
106      *
107      */
108     protected Sem schedBegin, schedEnd;
109     private boolean nativeStop = false;
110
111         /**
112          * Default constructor (used in ApplicationHandler to initialize it)
113          */
114         protected Process() {
115                 super();
116                 this.id = nextProcessId++;
117                 this.name = null;
118                 this.bind = 0;
119                 this.args = new Vector<String>();
120                 this.properties = null;
121                 schedBegin = new Sem(0);
122                 schedEnd = new Sem(0);
123         }
124
125
126         /**
127          * Constructs a new process from the name of a host and his name. The method
128          * function of the process doesn't have argument.
129          *
130          * @param hostname              The name of the host of the process to create.
131          * @param name                  The name of the process.
132          *
133          * @exception                   HostNotFoundException  if no host with this name exists.
134          *                      
135          *
136          */
137         public Process(String hostname, String name) throws HostNotFoundException {
138                 this(Host.getByName(hostname), name, null);
139         }
140         /**
141          * Constructs a new process from the name of a host and his name. The arguments
142          * of the method function of the process are specified by the parameter args.
143          *
144          * @param hostname              The name of the host of the process to create.
145          * @param name                  The name of the process.
146          * @param args                  The arguments of the main function of the process.
147          *
148          * @exception                   HostNotFoundException  if no host with this name exists.
149      *                      NativeException
150      * @throws NativeException
151          *
152          */ 
153         public Process(String hostname, String name, String args[]) throws HostNotFoundException, NativeException {
154                 this(Host.getByName(hostname), name, args);
155         }
156         /**
157          * Constructs a new process from a host and his name. The method function of the 
158          * process doesn't have argument.
159          *
160          * @param host                  The host of the process to create.
161          * @param name                  The name of the process.
162          *
163          */
164         public Process(Host host, String name) {
165                 this(host, name, null);
166         }
167         /**
168          * Constructs a new process from a host and his name, the arguments of here method function are
169          * specified by the parameter args.
170          *
171          * @param host                  The host of the process to create.
172          * @param name                  The name of the process.
173          * @param args                  The arguments of main method of the process.
174          *
175          */
176         public Process(Host host, String name, String[]args) {
177                 /* This is the constructor called by all others */
178                 this();
179                 
180                 if (name == null)
181                         throw new NullPointerException("Process name cannot be NULL");
182                 this.name = name;
183
184                 this.args = new Vector<String>();
185                 if (null != args)
186                         this.args.addAll(Arrays.asList(args));
187
188                 MsgNative.processCreate(this, host);
189                 
190         }
191
192
193         /**
194          * This method kills all running process of the simulation.
195          *
196          * @param resetPID              Should we reset the PID numbers. A negative number means no reset
197          *                                              and a positive number will be used to set the PID of the next newly
198          *                                              created process.
199          *
200          * @return                              The function returns the PID of the next created process.
201          *                      
202          */ 
203         public static int killAll(int resetPID) {
204                 return MsgNative.processKillAll(resetPID);
205         }
206
207         /**
208          * This method sets a flag to indicate that this thread must be killed. End user must use static method kill
209          *
210          * @return                              
211          *                      
212          */ 
213         public void nativeStop()
214         {
215         nativeStop = true;
216         }
217         /**
218          * getter for the flag that indicates that this thread must be killed
219          *
220          * @return                              
221          *                      
222          */ 
223         public boolean getNativeStop()
224         {
225                 return nativeStop;
226         }
227         /**
228          * checks  if the flag that indicates that this thread must be killed is set to true; if true, starts to kill it. End users should not have to deal with it
229          * If you develop a new MSG native call, please include a call to interruptedStop() at the beginning of your method code, so as the process can be killed if he call 
230          * your method. 
231          *
232          * @return                              
233          *                      
234          */ 
235         @Deprecated
236         public static void ifInterruptedStop() {
237                 /* This function does nothing anymore and will get removed very soon */
238         }
239
240
241         /**
242          * This method kill a process.
243          * @param process  the process to be killed.
244          *
245          */
246         public void kill() {
247                  nativeStop();
248                  Msg.info("Process " + msgName() + " will be killed.");                         
249                                  
250         }
251
252         /**
253          * Suspends the process by suspending the task on which it was
254          * waiting for the completion.
255          *
256          */
257         public void pause() {
258                 Process.ifInterruptedStop();
259                 MsgNative.processSuspend(this);
260         }
261         /**
262          * Resumes a suspended process by resuming the task on which it was
263          * waiting for the completion.
264          *
265          *
266          */ 
267         public void restart()  {
268                 Process.ifInterruptedStop();
269                 MsgNative.processResume(this);
270         }
271         /**
272          * Tests if a process is suspended.
273          *
274          * @return                              The method returns true if the process is suspended.
275          *                                              Otherwise the method returns false.
276          */ 
277         public boolean isSuspended() {
278                 Process.ifInterruptedStop();
279                 return MsgNative.processIsSuspended(this);
280         }
281         /**
282          * Returns the host of a process.
283          *
284          * @return                              The host instance of the process.
285          *
286          *
287          */ 
288         public Host getHost() {
289                 Process.ifInterruptedStop();
290                 if (this.host == null) {
291                         this.host = MsgNative.processGetHost(this);
292                 }
293                 return this.host;
294         }
295         /**
296          * This static method gets a process from a PID.
297          *
298          * @param PID                   The process identifier of the process to get.
299          *
300          * @return                              The process with the specified PID.
301          *
302          * @exception                   NativeException on error in the native SimGrid code
303          */ 
304         public static Process fromPID(int PID) throws NativeException {
305                 Process.ifInterruptedStop();
306                 return MsgNative.processFromPID(PID);
307         }
308         /**
309          * This method returns the PID of the process.
310          *
311          * @return                              The PID of the process.
312          *
313          */ 
314         public int getPID()  {
315                 Process.ifInterruptedStop();
316                 if (pid == -1) {
317                         pid = MsgNative.processGetPID(this);
318                 }
319                 return pid;
320         }
321         /**
322          * This method returns the PID of the parent of a process.
323          *
324          * @return                              The PID of the parent of the process.
325          *
326          */ 
327         public int getPPID()  {
328                 Process.ifInterruptedStop();
329                 if (ppid == -1) {
330                         ppid = MsgNative.processGetPPID(this);
331                 }
332                 return ppid;
333         }
334         /**
335          * This static method returns the currently running process.
336          *
337          * @return                              The current process.
338          *
339          */ 
340         public static Process currentProcess()  {
341                 Process.ifInterruptedStop();
342                 return MsgNative.processSelf();
343         }
344         /**
345          * Migrates a process to another host.
346          *
347          * @param process               The process to migrate.
348          * @param host                  The host where to migrate the process.
349          *
350          */
351         public static void migrate(Process process, Host host)  {
352                 Process.ifInterruptedStop();
353                 MsgNative.processMigrate(process, host);
354                 process.host = null;
355         }
356         /**
357          * Makes the current process sleep until time seconds have elapsed.
358          *
359          * @param seconds               The time the current process must sleep.
360          *
361          * @exception                   HostFailureException on error in the native SimGrid code
362          */ 
363         public static void waitFor(double seconds) throws HostFailureException {
364                 Process.ifInterruptedStop();
365                 MsgNative.processWaitFor(seconds);
366         } 
367     /**
368      *
369      */
370     public void showArgs() {
371                 Process.ifInterruptedStop();
372                 Msg.info("[" + this.name + "/" + this.getHost().getName() + "] argc=" +
373                                 this.args.size());
374                 for (int i = 0; i < this.args.size(); i++)
375                         Msg.info("[" + this.msgName() + "/" + this.getHost().getName() +
376                                         "] args[" + i + "]=" + (String) (this.args.get(i)));
377         }
378     /**
379      * Let the simulated process sleep for the given amount of millisecond in the simulated world.
380      * 
381      *  You don't want to use sleep instead, because it would freeze your simulation 
382      *  run without any impact on the simulated world.
383      * @param millis
384      */
385     public native void simulatedSleep(double seconds);
386
387         /**
388          * This method runs the process. Il calls the method function that you must overwrite.
389          */
390         public void run() {
391
392                 String[]args = null;      /* do not fill it before the signal or this.args will be empty */
393
394                 //waitSignal(); /* wait for other people to fill the process in */
395
396
397                 try {
398                         schedBegin.acquire();
399                 } catch(InterruptedException e) {
400                 }
401
402                 try {
403                         args = new String[this.args.size()];
404                         if (this.args.size() > 0) {
405                                 this.args.toArray(args);
406                         }
407
408                         this.main(args);
409                         MsgNative.processExit(this);
410                         schedEnd.release();
411                 } catch(MsgException e) {
412                         e.printStackTrace();
413                         Msg.info("Unexpected behavior. Stopping now");
414                         System.exit(1);
415                 }
416                  catch(ProcessKilled pk) {
417                         if (nativeStop) {
418                                 try {
419                                         MsgNative.processExit(this);
420                                 } catch (ProcessKilled pk2) {
421                                         /* Ignore that other exception that *will* occur all the time. 
422                                          * This is because the C mechanic gives the control to the now-killed process 
423                                          * so that it does some garbage collecting on its own. When it does so here, 
424                                          * the Java thread checks when starting if it's supposed to be killed (to inform 
425                                          * the C world). To avoid the infinite loop or anything similar, we ignore that 
426                                          * exception now. This should be ok since we ignore only a very specific exception 
427                                          * class and not a generic (such as any RuntimeException).
428                                          */
429                                         System.err.println(currentThread().getName()+": I ignore that other exception");                                        
430                                 }
431                         Msg.info(" Process " + ((Process) Thread.currentThread()).msgName() + " has been killed.");                                             
432                         schedEnd.release();                     
433                         }
434                         else {
435                         pk.printStackTrace();
436                         Msg.info("Unexpected behavior. Stopping now");
437                         System.exit(1);
438                         }
439                 }       
440         }
441
442         /**
443          * The main function of the process (to implement).
444      *
445      * @param args
446      * @throws MsgException
447      */
448         public abstract void main(String[]args) throws MsgException;
449
450
451     /** @brief Gives the control from the given user thread back to the maestro 
452      * 
453      * schedule() and unschedule() are the basis of interactions between the user threads 
454      * (executing the user code), and the maestro thread (executing the platform models to decide 
455      * which user thread should get executed when. Once it decided which user thread should be run 
456      * (because the blocking action it were blocked onto are terminated in the simulated world), the 
457      * maestro passes the control to this uthread by calling uthread.schedule() in the maestro thread 
458      * (check its code for the simple semaphore-based synchronization schema). 
459      * 
460      * The uthread executes (while the maestro is blocked), until it starts another blocking 
461      * action, such as a communication or so. In that case, uthread.unschedule() gets called from 
462      * the user thread.    
463      *
464      * As other complications, these methods are called directly by the C through a JNI upcall in 
465      * response to the JNI downcalls done by the Java code. For example, you have this (simplified) 
466      * execution path: 
467      *   - a process calls the Task.send() method in java
468      *   - this calls Java_org_simgrid_msg_MsgNative_taskSend() in C through JNI
469      *   - this ends up calling jprocess_unschedule(), still in C
470      *   - this calls the java method "org/simgrid/msg/Process/unschedule()V" through JNI
471      *   - that is to say, the unschedule() method that you are reading the documentation of.
472      *   
473      * To understand all this, you must keep in mind that there is no difference between the C thread 
474      * describing a process, and the Java thread doing the same. Most of the time, they are system 
475      * threads from the kernel anyway. In the other case (such as when using green java threads when 
476      * the OS does not provide any thread feature), I'm unsure of what happens: it's a very long time 
477      * that I didn't see any such OS. 
478      * 
479      * The synchronization itself is implemented using simple semaphores in Java, as you can see by
480      * checking the code of these functions (and run() above). That's super simple, and thus welcome
481      * given the global complexity of the synchronization architecture: getting C and Java cooperate
482      * with regard to thread handling in a portable manner is very uneasy. A simple and straightforward 
483      * implementation of each synchronization point is precious. 
484      *  
485      * But this kinda limits the system scalability. It may reveal difficult to simulate dozens of 
486      * thousands of processes this way, both for memory limitations and for hard limits pushed by the 
487      * system on the amount of threads and semaphores (we have 2 semaphores per user process).
488      * 
489      * At time of writing, the best source of information on how to simulate large systems within the 
490      * Java bindings of simgrid is here: http://tomp2p.net/dev/simgrid/
491      * 
492      */
493     public void unschedule() {
494         /* this function is called from the user thread only */
495                 try {     
496                         
497                         /* unlock the maestro before going to sleep */
498                         schedEnd.release();
499                         /* Here, the user thread is locked, waiting for the semaphore, and maestro executes instead */
500                         schedBegin.acquire();
501                         /* now that the semaphore is acquired, it means that maestro gave us the control back */
502                         
503                         /* the user thread is starting again after giving the control to maestro. 
504                          * Let's check if we were asked to die in between */
505                         if ( (Thread.currentThread() instanceof Process) &&((Process) Thread.currentThread()).getNativeStop()) {                                
506                                 throw new ProcessKilled();
507                         }
508                         
509                 } catch (InterruptedException e) {
510                         /* ignore this exception because this is how we get killed on process.kill or end of simulation.
511                          * I don't like hiding exceptions this way, but fail to see any other solution 
512                          */
513                 }
514                 
515         }
516
517     /** @brief Gives the control from the maestro back to the given user thread 
518      * 
519      * Must be called from the maestro thread -- see unschedule() for details.
520      *
521      */
522     public void schedule() {
523                 try {
524                         /* unlock the user thread before going to sleep */
525                         schedBegin.release();
526                         /* Here, maestro is locked, waiting for the schedEnd semaphore to get signaled by used thread, that executes instead */
527                         schedEnd.acquire();
528                         /* Maestro now has the control back and the user thread went to sleep gently */
529                         
530                 } catch(InterruptedException e) {
531                         throw new RuntimeException("The impossible did happend once again: I got interrupted in schedEnd.acquire()",e);
532                 }
533         }
534
535         /** Send the given task in the mailbox associated with the specified alias  (waiting at most given time) 
536      * @param mailbox
537      * @param task 
538      * @param timeout
539      * @throws TimeoutException
540          * @throws HostFailureException 
541          * @throws TransferFailureException */
542         public void taskSend(String mailbox, Task task, double timeout) throws TransferFailureException, HostFailureException, TimeoutException {
543                 Process.ifInterruptedStop();
544                 MsgNative.taskSend(mailbox, task, timeout);
545         }
546
547         /** Send the given task in the mailbox associated with the specified alias
548      * @param mailbox
549      * @param task
550      * @throws TimeoutException
551          * @throws HostFailureException 
552          * @throws TransferFailureException */
553         public void taskSend(String mailbox, Task task) throws  TransferFailureException, HostFailureException, TimeoutException {
554                 Process.ifInterruptedStop();
555                 MsgNative.taskSend(mailbox, task, -1);
556         }
557
558     /** Receive a task on mailbox associated with the specified mailbox
559      * @param mailbox
560      * @return
561      * @throws TransferFailureException
562      * @throws HostFailureException
563      * @throws TimeoutException
564      */
565         public Task taskReceive(String mailbox) throws TransferFailureException, HostFailureException, TimeoutException {
566                 Process.ifInterruptedStop();
567                 return MsgNative.taskReceive(mailbox, -1.0, null);
568         }
569
570     /** Receive a task on mailbox associated with the specified alias (waiting at most given time)
571      * @param mailbox
572      * @param timeout
573      * @return
574      * @throws TransferFailureException
575      * @throws HostFailureException
576      * @throws TimeoutException
577      */
578         public Task taskReceive(String mailbox, double timeout) throws  TransferFailureException, HostFailureException, TimeoutException {
579                 Process.ifInterruptedStop();
580                 return MsgNative.taskReceive(mailbox, timeout, null);
581         }
582
583     /** Receive a task on mailbox associated with the specified alias from given sender
584      * @param mailbox
585      * @param host
586      * @param timeout
587      * @return
588      * @throws TransferFailureException
589      * @throws HostFailureException
590      * @throws TimeoutException
591      */
592         public Task taskReceive(String mailbox, double timeout, Host host) throws  TransferFailureException, HostFailureException, TimeoutException {
593                 Process.ifInterruptedStop();
594                 return MsgNative.taskReceive(mailbox, timeout, host);
595         }
596
597     /** Receive a task on mailbox associated with the specified alias from given sender
598      * @param mailbox
599      * @param host
600      * @return
601      * @throws TransferFailureException
602      * @throws HostFailureException
603      * @throws TimeoutException
604      */
605         public Task taskReceive(String mailbox, Host host) throws  TransferFailureException, HostFailureException, TimeoutException {
606                 Process.ifInterruptedStop();
607                 return MsgNative.taskReceive(mailbox, -1.0, host);
608         }
609 }