Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
fb69cd1db8a72034fbb99bd843c9506dc32b703b
[simgrid.git] / org / simgrid / msg / Process.java
1 /*
2  * $Id$
3  *
4  * Copyright 2006,2007 Martin Quinson, Malek Cherier           
5  * All right reserved. 
6  *
7  * This program is free software; you can redistribute 
8  * it and/or modify it under the terms of the license 
9  *(GNU LGPL) which comes with this package. 
10  */
11
12 package org.simgrid.msg;
13  
14 import java.util.Arrays;
15 import java.util.Hashtable;
16 import java.util.Vector;
17 import java.util.concurrent.Semaphore;
18
19 /**
20  * A process may be defined as a code, with some private data, executing 
21  * in a location (host). All the process used by your simulation must be
22  * declared in the deployment file (XML format).
23  * To create your own process you must inherit your own process from this 
24  * class and override the method "main()". For example if you want to use 
25  * a process named Slave proceed as it :
26  *
27  * (1) import the class Process of the package simgrid.msg
28  * import simgrid.msg.Process;
29  * 
30  * public class Slave extends simgrid.msg.Process {
31  *
32  *  (2) Override the method function
33  * 
34  *  \verbatim
35  *      public void main(String[] args) {
36  *              System.out.println("Hello MSG");
37  *      }
38  *  \endverbatim
39  * }
40  * The name of your process must be declared in the deployment file of your simulation.
41  * For the example, for the previous process Slave this file must contains a line :
42  * <process host="Maxims" function="Slave"/>, where Maxims is the host of the process
43  * Slave. All the process of your simulation are automatically launched and managed by Msg.
44  * A process use tasks to simulate communications or computations with another process. 
45  * For more information see Task. For more information on host concept 
46  * see Host.
47  * 
48  */
49
50 public abstract class Process extends Thread {
51         /**
52          * This attribute represents a bind between a java process object and
53          * a native process. Even if this attribute is public you must never
54          * access to it. It is set automatically during the build of the object.
55          */
56         public long bind;
57
58         /**
59          * Even if this attribute is public you must never access to it.
60          * It is used to compute the id of an MSG process.
61          */
62         public static long nextProcessId = 0;
63
64         /**
65          * Even if this attribute is public you must never access to it.
66          * It is compute automatically during the creation of the object. 
67          * The native functions use this identifier to synchronize the process.
68          */
69         public long id;
70
71     /**
72      *
73      */
74     public Hashtable<String,String> properties;
75
76         /**
77          * The name of the process.                                                     
78          */
79         protected String name;
80         /**
81           * The PID of the process
82           */
83         protected int pid = -1;
84         /**
85          * The PPID of the process 
86          */
87         protected int ppid = -1;
88         /**
89          * The host of the process
90          */
91         protected Host host = null;
92     /**
93      *
94      * @return
95      */
96     public String msgName() {
97                 return this.name;
98         }
99         /** The arguments of the method function of the process. */     
100         public Vector<String> args;
101
102         /* process synchronization tools */
103         
104         /* give the full path to semaphore to ensure that our own implementation don't get selected */
105     protected java.util.concurrent.Semaphore schedBegin, schedEnd;
106     private boolean nativeStop = false;
107
108         /**
109          * Default constructor (used in ApplicationHandler to initialize it)
110          */
111         protected Process() {
112                 super();
113                 this.id = nextProcessId++;
114                 this.name = null;
115                 this.bind = 0;
116                 this.args = new Vector<String>();
117                 this.properties = null;
118                 schedBegin = new java.util.concurrent.Semaphore(0);
119                 schedEnd = new java.util.concurrent.Semaphore(0);
120         }
121
122
123         /**
124          * Constructs a new process from the name of a host and his name. The method
125          * function of the process doesn't have argument.
126          *
127          * @param hostname              The name of the host of the process to create.
128          * @param name                  The name of the process.
129          *
130          * @exception                   HostNotFoundException  if no host with this name exists.
131          *                      
132          *
133          */
134         public Process(String hostname, String name) throws HostNotFoundException {
135                 this(Host.getByName(hostname), name, null);
136         }
137         /**
138          * Constructs a new process from the name of a host and his name. The arguments
139          * of the method function of the process are specified by the parameter args.
140          *
141          * @param hostname              The name of the host of the process to create.
142          * @param name                  The name of the process.
143          * @param args                  The arguments of the main function of the process.
144          *
145          * @exception                   HostNotFoundException  if no host with this name exists.
146      *                      NativeException
147      * @throws NativeException
148          *
149          */ 
150         public Process(String hostname, String name, String args[]) throws HostNotFoundException, NativeException {
151                 this(Host.getByName(hostname), name, args);
152         }
153         /**
154          * Constructs a new process from a host and his name. The method function of the 
155          * process doesn't have argument.
156          *
157          * @param host                  The host of the process to create.
158          * @param name                  The name of the process.
159          *
160          */
161         public Process(Host host, String name) {
162                 this(host, name, null);
163         }
164         /**
165          * Constructs a new process from a host and his name, the arguments of here method function are
166          * specified by the parameter args.
167          *
168          * @param host                  The host of the process to create.
169          * @param name                  The name of the process.
170          * @param args                  The arguments of main method of the process.
171          *
172          */
173         public Process(Host host, String name, String[]args) {
174                 /* This is the constructor called by all others */
175                 this();
176                 
177                 if (name == null)
178                         throw new NullPointerException("Process name cannot be NULL");
179                 this.name = name;
180
181                 this.args = new Vector<String>();
182                 if (null != args)
183                         this.args.addAll(Arrays.asList(args));
184
185                 try {
186                         MsgNative.processCreate(this, host.getName());
187                 } catch (HostNotFoundException e) {
188                         throw new RuntimeException("The impossible happend (yet again): the host that I have were not found",e);
189                 }
190                 
191         }
192
193
194         /**
195          * This method kills all running process of the simulation.
196          *
197          * @param resetPID              Should we reset the PID numbers. A negative number means no reset
198          *                                              and a positive number will be used to set the PID of the next newly
199          *                                              created process.
200          *
201          * @return                              The function returns the PID of the next created process.
202          *                      
203          */ 
204         public static int killAll(int resetPID) {
205                 return MsgNative.processKillAll(resetPID);
206         }
207
208         /**
209          * This method sets a flag to indicate that this thread must be killed. End user must use static method kill
210          *
211          * @return                              
212          *                      
213          */ 
214         public void nativeStop()
215         {
216         nativeStop = true;
217         }
218         /**
219          * getter for the flag that indicates that this thread must be killed
220          *
221          * @return                              
222          *                      
223          */ 
224         public boolean getNativeStop()
225         {
226                 return nativeStop;
227         }
228
229         /**
230          * This method kill a process.
231          * @param process  the process to be killed.
232          *
233          */
234         public void kill() {
235                  nativeStop();
236                  Msg.info("Process " + msgName() + " will be killed.");                         
237                                  
238         }
239
240         /**
241          * Suspends the process by suspending the task on which it was
242          * waiting for the completion.
243          *
244          */
245         public void pause() {
246                 MsgNative.processSuspend(this);
247         }
248         /**
249          * Resumes a suspended process by resuming the task on which it was
250          * waiting for the completion.
251          *
252          *
253          */ 
254         public void restart()  {
255                 MsgNative.processResume(this);
256         }
257         /**
258          * Tests if a process is suspended.
259          *
260          * @return                              The method returns true if the process is suspended.
261          *                                              Otherwise the method returns false.
262          */ 
263         public boolean isSuspended() {
264                 return MsgNative.processIsSuspended(this);
265         }
266         /**
267          * Returns the host of a process.
268          *
269          * @return                              The host instance of the process.
270          *
271          *
272          */ 
273         public Host getHost() {
274                 if (this.host == null) {
275                         this.host = MsgNative.processGetHost(this);
276                 }
277                 return this.host;
278         }
279         /**
280          * This static method gets a process from a PID.
281          *
282          * @param PID                   The process identifier of the process to get.
283          *
284          * @return                              The process with the specified PID.
285          *
286          * @exception                   NativeException on error in the native SimGrid code
287          */ 
288         public static Process fromPID(int PID) throws NativeException {
289                 return MsgNative.processFromPID(PID);
290         }
291         /**
292          * This method returns the PID of the process.
293          *
294          * @return                              The PID of the process.
295          *
296          */ 
297         public int getPID()  {
298                 if (pid == -1) {
299                         pid = MsgNative.processGetPID(this);
300                 }
301                 return pid;
302         }
303         /**
304          * This method returns the PID of the parent of a process.
305          *
306          * @return                              The PID of the parent of the process.
307          *
308          */ 
309         public int getPPID()  {
310                 if (ppid == -1) {
311                         ppid = MsgNative.processGetPPID(this);
312                 }
313                 return ppid;
314         }
315         /**
316          * This static method returns the currently running process.
317          *
318          * @return                              The current process.
319          *
320          */ 
321         public static Process currentProcess()  {
322                 return MsgNative.processSelf();
323         }
324         /**
325          * Migrates a process to another host.
326          *
327          * @param process               The process to migrate.
328          * @param host                  The host where to migrate the process.
329          *
330          */
331         public static void migrate(Process process, Host host)  {
332                 MsgNative.processMigrate(process, host);
333                 process.host = null;
334         }
335         /**
336          * Makes the current process sleep until time seconds have elapsed.
337          *
338          * @param seconds               The time the current process must sleep.
339          *
340          * @exception                   HostFailureException on error in the native SimGrid code
341          */ 
342         public static void waitFor(double seconds) throws HostFailureException {
343                 MsgNative.processWaitFor(seconds);
344         } 
345     /**
346      *
347      */
348     public void showArgs() {
349                 Msg.info("[" + this.name + "/" + this.getHost().getName() + "] argc=" +
350                                 this.args.size());
351                 for (int i = 0; i < this.args.size(); i++)
352                         Msg.info("[" + this.msgName() + "/" + this.getHost().getName() +
353                                         "] args[" + i + "]=" + (String) (this.args.get(i)));
354         }
355     /**
356      * Let the simulated process sleep for the given amount of millisecond in the simulated world.
357      * 
358      *  You don't want to use sleep instead, because it would freeze your simulation 
359      *  run without any impact on the simulated world.
360      * @param millis
361      */
362     public native void simulatedSleep(double seconds);
363
364         /**
365          * This method runs the process. Il calls the method function that you must overwrite.
366          */
367         public void run() {
368
369                 String[]args = null;      /* do not fill it before the signal or this.args will be empty */
370
371                 //waitSignal(); /* wait for other people to fill the process in */
372
373
374                 try {
375                         schedBegin.acquire();
376                 } catch(InterruptedException e) {
377                 }
378
379                 try {
380                         args = new String[this.args.size()];
381                         if (this.args.size() > 0) {
382                                 this.args.toArray(args);
383                         }
384
385                         this.main(args);
386                         MsgNative.processExit(this);
387                         schedEnd.release();
388                 } catch(MsgException e) {
389                         e.printStackTrace();
390                         Msg.info("Unexpected behavior. Stopping now");
391                         System.exit(1);
392                 }
393                  catch(ProcessKilled pk) {
394                         if (nativeStop) {
395                                 try {
396                                         MsgNative.processExit(this);
397                                 } catch (ProcessKilled pk2) {
398                                         /* Ignore that other exception that *will* occur all the time. 
399                                          * This is because the C mechanic gives the control to the now-killed process 
400                                          * so that it does some garbage collecting on its own. When it does so here, 
401                                          * the Java thread checks when starting if it's supposed to be killed (to inform 
402                                          * the C world). To avoid the infinite loop or anything similar, we ignore that 
403                                          * exception now. This should be ok since we ignore only a very specific exception 
404                                          * class and not a generic (such as any RuntimeException).
405                                          */
406                                         System.err.println(currentThread().getName()+": I ignore that other exception");                                        
407                                 }
408                         Msg.info(" Process " + ((Process) Thread.currentThread()).msgName() + " has been killed.");                                             
409                         schedEnd.release();                     
410                         }
411                         else {
412                         pk.printStackTrace();
413                         Msg.info("Unexpected behavior. Stopping now");
414                         System.exit(1);
415                         }
416                 }       
417         }
418
419         /**
420          * The main function of the process (to implement).
421      *
422      * @param args
423      * @throws MsgException
424      */
425         public abstract void main(String[]args) throws MsgException;
426
427
428     /** @brief Gives the control from the given user thread back to the maestro 
429      * 
430      * schedule() and unschedule() are the basis of interactions between the user threads 
431      * (executing the user code), and the maestro thread (executing the platform models to decide 
432      * which user thread should get executed when. Once it decided which user thread should be run 
433      * (because the blocking action it were blocked onto are terminated in the simulated world), the 
434      * maestro passes the control to this uthread by calling uthread.schedule() in the maestro thread 
435      * (check its code for the simple semaphore-based synchronization schema). 
436      * 
437      * The uthread executes (while the maestro is blocked), until it starts another blocking 
438      * action, such as a communication or so. In that case, uthread.unschedule() gets called from 
439      * the user thread.    
440      *
441      * As other complications, these methods are called directly by the C through a JNI upcall in 
442      * response to the JNI downcalls done by the Java code. For example, you have this (simplified) 
443      * execution path: 
444      *   - a process calls the Task.send() method in java
445      *   - this calls Java_org_simgrid_msg_MsgNative_taskSend() in C through JNI
446      *   - this ends up calling jprocess_unschedule(), still in C
447      *   - this calls the java method "org/simgrid/msg/Process/unschedule()V" through JNI
448      *   - that is to say, the unschedule() method that you are reading the documentation of.
449      *   
450      * To understand all this, you must keep in mind that there is no difference between the C thread 
451      * describing a process, and the Java thread doing the same. Most of the time, they are system 
452      * threads from the kernel anyway. In the other case (such as when using green java threads when 
453      * the OS does not provide any thread feature), I'm unsure of what happens: it's a very long time 
454      * that I didn't see any such OS. 
455      * 
456      * The synchronization itself is implemented using simple semaphores in Java, as you can see by
457      * checking the code of these functions (and run() above). That's super simple, and thus welcome
458      * given the global complexity of the synchronization architecture: getting C and Java cooperate
459      * with regard to thread handling in a portable manner is very uneasy. A simple and straightforward 
460      * implementation of each synchronization point is precious. 
461      *  
462      * But this kinda limits the system scalability. It may reveal difficult to simulate dozens of 
463      * thousands of processes this way, both for memory limitations and for hard limits pushed by the 
464      * system on the amount of threads and semaphores (we have 2 semaphores per user process).
465      * 
466      * At time of writing, the best source of information on how to simulate large systems within the 
467      * Java bindings of simgrid is here: http://tomp2p.net/dev/simgrid/
468      * 
469      */
470     public void unschedule() {
471         /* this function is called from the user thread only */
472                 try {     
473                         
474                         /* unlock the maestro before going to sleep */
475                         schedEnd.release();
476                         /* Here, the user thread is locked, waiting for the semaphore, and maestro executes instead */
477                         schedBegin.acquire();
478                         /* now that the semaphore is acquired, it means that maestro gave us the control back */
479                         
480                         /* the user thread is starting again after giving the control to maestro. 
481                          * Let's check if we were asked to die in between */
482                         if ( (Thread.currentThread() instanceof Process) &&((Process) Thread.currentThread()).getNativeStop()) {                                
483                                 throw new ProcessKilled();
484                         }
485                         
486                 } catch (InterruptedException e) {
487                         /* ignore this exception because this is how we get killed on process.kill or end of simulation.
488                          * I don't like hiding exceptions this way, but fail to see any other solution 
489                          */
490                 }
491                 
492         }
493
494     /** @brief Gives the control from the maestro back to the given user thread 
495      * 
496      * Must be called from the maestro thread -- see unschedule() for details.
497      *
498      */
499     public void schedule() {
500                 try {
501                         /* unlock the user thread before going to sleep */
502                         schedBegin.release();
503                         /* Here, maestro is locked, waiting for the schedEnd semaphore to get signaled by used thread, that executes instead */
504                         schedEnd.acquire();
505                         /* Maestro now has the control back and the user thread went to sleep gently */
506                         
507                 } catch(InterruptedException e) {
508                         throw new RuntimeException("The impossible did happend once again: I got interrupted in schedEnd.acquire()",e);
509                 }
510         }
511
512         /** Send the given task in the mailbox associated with the specified alias  (waiting at most given time) 
513      * @param mailbox
514      * @param task 
515      * @param timeout
516      * @throws TimeoutException
517          * @throws HostFailureException 
518          * @throws TransferFailureException */
519         public void taskSend(String mailbox, Task task, double timeout) throws TransferFailureException, HostFailureException, TimeoutException {
520                 MsgNative.taskSend(mailbox, task, timeout);
521         }
522
523         /** Send the given task in the mailbox associated with the specified alias
524      * @param mailbox
525      * @param task
526      * @throws TimeoutException
527          * @throws HostFailureException 
528          * @throws TransferFailureException */
529         public void taskSend(String mailbox, Task task) throws  TransferFailureException, HostFailureException, TimeoutException {
530                 MsgNative.taskSend(mailbox, task, -1);
531         }
532
533     /** Receive a task on mailbox associated with the specified mailbox
534      * @param mailbox
535      * @return
536      * @throws TransferFailureException
537      * @throws HostFailureException
538      * @throws TimeoutException
539      */
540         public Task taskReceive(String mailbox) throws TransferFailureException, HostFailureException, TimeoutException {
541                 return MsgNative.taskReceive(mailbox, -1.0, null);
542         }
543
544     /** Receive a task on mailbox associated with the specified alias (waiting at most given time)
545      * @param mailbox
546      * @param timeout
547      * @return
548      * @throws TransferFailureException
549      * @throws HostFailureException
550      * @throws TimeoutException
551      */
552         public Task taskReceive(String mailbox, double timeout) throws  TransferFailureException, HostFailureException, TimeoutException {
553                 return MsgNative.taskReceive(mailbox, timeout, null);
554         }
555
556     /** Receive a task on mailbox associated with the specified alias from given sender
557      * @param mailbox
558      * @param host
559      * @param timeout
560      * @return
561      * @throws TransferFailureException
562      * @throws HostFailureException
563      * @throws TimeoutException
564      */
565         public Task taskReceive(String mailbox, double timeout, Host host) throws  TransferFailureException, HostFailureException, TimeoutException {
566                 return MsgNative.taskReceive(mailbox, timeout, host);
567         }
568
569     /** Receive a task on mailbox associated with the specified alias from given sender
570      * @param mailbox
571      * @param host
572      * @return
573      * @throws TransferFailureException
574      * @throws HostFailureException
575      * @throws TimeoutException
576      */
577         public Task taskReceive(String mailbox, Host host) throws  TransferFailureException, HostFailureException, TimeoutException {
578                 return MsgNative.taskReceive(mailbox, -1.0, host);
579         }
580 }