Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Correction and modification of save mechanisms.
[hpcvm.git] / src / and / hpcvm / Server.java
1 package and.hpcvm ;
2
3 import java.io.BufferedReader;
4 import java.io.File;
5 import java.io.IOException;
6 import java.io.InputStreamReader;
7 import java.rmi.Naming;
8 import java.rmi.RemoteException;
9 import java.rmi.registry.LocateRegistry;
10 import java.rmi.registry.Registry;
11 import java.rmi.server.UnicastRemoteObject;
12 import java.util.ArrayList;
13 import java.util.Iterator;
14 import java.util.concurrent.Semaphore;
15
16
17 public class Server extends UnicastRemoteObject implements ServicesServer
18 {
19         private class DiscCount
20         {
21                 private int nb ;
22                 
23                 DiscCount() { nb = 0 ; }
24                 
25                 protected void inc() { nb++ ; }
26                 
27                 protected void dec() { 
28                         if( nb > 0 )
29                         {
30                                 nb-- ;
31                         }
32                 }
33                 
34                 protected int getNb() { return nb ; }
35         }       
36         
37         
38         private class IPAssociation
39         {
40                 private String vmIP ;
41                 private String hostIP ;
42                 
43                 IPAssociation()
44                 {
45                         vmIP = "" ;
46                         hostIP = "" ;
47                 }
48                 
49                 protected void setVmIP( String _vmIP )
50                 {
51                         vmIP = _vmIP ;
52                 }
53                 
54                 protected void setHostIP( String _hostIP )
55                 {
56                         hostIP = _hostIP ;
57                 }
58                 
59                 protected String getVmIP()
60                 {
61                         return vmIP ;
62                 }
63                 
64                 protected String getHostIP()
65                 {
66                         return hostIP ;
67                 }
68         }
69         
70         
71         
72         private static final long serialVersionUID = 1L ;
73         private int port ;
74         private ArrayList<ConnectedClient> clients ;
75         private ArrayList<ComputingClient> computingClients ;
76         private ArrayList<RunningApplication> applications ;
77         private int max_timeout ;
78         private ConnectedMonitor monitor ;
79         private DiscCount counter ;
80         private ArrayList<IPAssociation> vmIPs ;
81         private String working_directory ;
82         private long save_interleave ;
83         private Semaphore semaSave ;
84         private OperatingClients startingClients ;
85         private OperatingClients deployingClients ;
86         private LimitThread limitThread ;
87         private int maxThread ;
88         private int ind ;
89         private boolean running ;
90         
91         
92         protected Server() throws RemoteException 
93         {
94                 super() ;
95         }
96
97         
98         public void setSaveTime( int _saveTime )
99         {
100                 save_interleave = _saveTime  * 60 * 1000 ;
101         }
102         
103
104         @Override
105         public Integer register( ServicesClient _stub ) 
106         {
107                 if( _stub != null )
108                 {
109                         synchronized( clients )
110                         {
111                                 String ip = "" ;
112                                 try {
113                                         ip = _stub.getIPHost() ;
114                                 } catch (RemoteException e) {
115                                         e.printStackTrace() ;
116                                         return 1 ;
117                                 }
118
119                                 boolean exists = false ;
120                                 int i ;
121
122                                 for( i = 0 ; i < clients.size() ; i++ )
123                                 {
124                                         if( ip.equals( clients.get( i ).getIP() ) )
125                                         {
126                                                 exists = true ;
127                                                 System.out.println( "Client already connected!" ) ;
128                                                 break ;
129                                         }
130                                 }
131
132                                 if( exists )
133                                 {
134                                         System.out.println( "The client stub will be replaced." ) ;
135                                         clients.get( i ).setStub( _stub ) ;
136                                         System.out.println( "(reconnection of " + clients.get( i ).getName() + ")" ) ;
137                                         return 2 ;
138                                 } else {
139                                         System.out.println( "New connection!" ) ;
140                                         clients.add( new ConnectedClient( _stub ) ) ;
141                                         System.out.println( "(connection of " + clients.get( clients.size() - 1 ).getName() + ")" ) ;
142                                         generateVmIP( ip ) ;
143
144                                         if( clients.size() == 0 )
145                                         {
146                                                 System.out.println( "There is no client connected." ) ;
147                                         } else if( clients.size() == 1 ) {
148                                                 System.out.println( "There is one client connected." ) ;
149                                         } else {
150                                                 System.out.println( "There are " + clients.size() + " clients connected." ) ;
151                                         }
152
153                                         return 0 ;
154                                 }
155                         }
156                 }
157                 
158                 return 1 ;
159         }
160
161         
162         private void generateVmIP( String _ip ) 
163         {
164                 if( _ip != null && ! _ip.equals( "" ) ) 
165                 {       
166                         for( int i = 0 ; i < vmIPs.size() ; i++ )
167                         {
168                                 if( vmIPs.get( i ).getHostIP().equalsIgnoreCase( "" ) )
169                                 {
170                                         vmIPs.get( i ).setHostIP( _ip ) ;
171                                         
172                                         break ;
173                                 }
174                         }
175                 }
176         }
177         
178
179         @Override
180         public void ping( String _ip ) 
181         {
182                 if( _ip != null )
183                 {
184                         for( int i = 0 ; i < clients.size() ; i++ )
185                         {
186                                 if( _ip.equals( clients.get( i ).getIP() ) ) 
187                                 {
188                                         clients.get( i ).resetTimeout() ;
189                                         break ;
190                                 }
191                         }
192                 }               
193         }
194
195         
196         @Override
197         public void changeStatus( String _ip, String _status ) 
198         {
199                 if( _ip != null && _status != null )
200                 {
201                         for( int i = 0 ; i < clients.size() ; i++ )
202                         {
203                                 if( _ip.equals( clients.get( i ).getIP() ) ) 
204                                 {                                       
205                                         clients.get( i ).setStatus( _status ) ;
206                                         System.out.println( "Client " + clients.get( i ).getName() + " changed its status to: " + _status ) ;
207                                         
208                                         break ;
209                                 }
210                         }
211                 }
212         }
213         
214
215         public void init( int _port ) 
216         {       
217                 port = _port ;
218                 max_timeout = 4 ;
219                 
220                 clients = new ArrayList<ConnectedClient>() ;
221                 computingClients = new ArrayList<ComputingClient>() ;
222                 applications = new ArrayList<RunningApplication>() ;
223                 monitor = null ;
224                 
225                 startingClients = new OperatingClients() ;
226                 deployingClients = new OperatingClients() ;
227                 limitThread = new LimitThread() ;
228                 maxThread = 20 ;
229                 
230                 ind = -1 ;
231                 running = false ;
232                 
233                 working_directory = "/localhome/vmware" ;
234                 
235                 save_interleave  = 30 * 60 * 1000 ;
236                 
237                 semaSave = new Semaphore( 1 ) ;
238                 
239                 exportObject() ;
240
241                 vmIPs = new ArrayList<IPAssociation>() ;
242                 // TODO initialisation of VM IPs
243                 for( int i = 2 ; i < 101 ; i++ )
244                 {
245                         vmIPs.add( new IPAssociation() ) ;
246                         vmIPs.get( vmIPs.size() - 1 ).setVmIP( "10.11.10." + i ) ;
247                 }
248                 
249                 clients = new ArrayList<ConnectedClient>() ;
250                 
251                 counter = new DiscCount() ;
252                 
253                 monitor = new ConnectedMonitor() ;
254                 monitor.start() ;
255                 
256                 // TODO
257                 // Check if there are running applications ... and restart them :)
258         }
259         
260         
261         public void stop()
262         {
263                 if( monitor != null ) { monitor.stopMonitor() ; }
264                 
265                 for( int i = 0 ; i < clients.size() ; i++ )
266                 {
267                         try {
268                                 clients.get( i ).getStub().emergencyStop() ;
269                                 clients.get( i ).getStub().stop() ;
270                         } catch (RemoteException e) {
271                                 System.err.println( "Unable to send stop signal to " + clients.get( i ).getName() ) ;
272                                 e.printStackTrace() ;
273                         }
274                 }
275                 
276                 // unexportObject ?
277                 
278                 System.exit( 0 ) ;
279         }
280
281         
282         private void exportObject() 
283         {
284                 ServicesServer ref = null ;
285                 Registry reg = null ;
286                 
287                 try 
288                 {       
289                         while( true )
290                         {
291                                 reg = LocateRegistry.getRegistry( port ) ;
292
293                                 String tab[] = reg.list() ;
294                         
295                                 System.out.println( "There is an existing RMI Registry on port " +
296                                                 port + " with " + tab.length + " entries!" ) ;
297                                 for( int i = 0 ; i < tab.length ; i++ )
298                                 {
299                                         try {
300                                                 if( UnicastRemoteObject.unexportObject( Naming.lookup(tab[i]), true ) )
301                                                 {
302                                                         System.out.println( "Register successfuly deleted!" ) ;
303                                                 } else {
304                                                         System.err.println( "Register undeleted !!!" ) ;
305                                                 }
306                                         } catch( Exception e ) {
307                                                 e.printStackTrace() ;
308                                         }
309                                 } 
310                         }
311                 } catch( RemoteException e ) {
312                 }       
313                                 
314                 try {
315                         if ( System.getSecurityManager() == null ) 
316                         {
317                     System.setSecurityManager( new SecurityManager() ) ;
318                 }
319                         
320                         LocateRegistry.createRegistry( port ) ;
321                         LocateRegistry.getRegistry( port ).rebind( "Server", this ) ;
322                         ref = (ServicesServer) Naming.lookup( "rmi://"
323                                         + LocalHost.Instance().getIP() + ":" + port
324                                         + "/Server" ) ;
325                 } catch ( Exception e ) {
326                         System.err.println( "Error in Server.exportObject() when creating local services:" + e ) ;
327                         System.err.println( "Exit from Server.exportObject" ) ;
328                         System.exit( 1 ) ;
329                 }
330
331                 LocalHost.Instance().setServerStub( ref ) ;
332                 
333                 System.out.println( "Server launched on IP " + LocalHost.Instance().getIP() + 
334                                 " on port " + port + "." ) ;
335         }
336         
337         /** Fault manager thread **/
338         private class FaultManager extends Thread
339         {
340                 ConnectedClient cl ;
341                 
342                 FaultManager( ConnectedClient _cl )
343                 {
344                         cl = _cl ;
345                 }
346                 
347                 @Override
348                 public void run() 
349                 {
350                         if( cl != null && cl.getStatus().equalsIgnoreCase( "running" ) ||
351                                         cl.getStatus().equalsIgnoreCase( "saving" ) )
352                         {
353                                 ComputingClient cc = cl.getComputingClient() ;
354 //                              ServicesClient dead = cc.getClient().getStub() ;
355                                 String ipDead = cc.getClient().getIP() ;
356                                 SaveNeighbor snDead = null ;
357                                 for( int i = 0 ; i < computingClients.size() ; i++ )
358                                 {
359                                         if( computingClients.get( i ).getSaveNeighbor().getIPHost().equalsIgnoreCase( ipDead ) ) 
360                                         {
361                                                 snDead = computingClients.get( i ).getSaveNeighbor() ;
362                                                 break ;
363                                         }
364                                 }
365                                                                 
366                                 boolean ok = false ;
367                                 
368                                 for( int i = 0 ; i < clients.size() ; i++ )
369                                 {
370                                         if( clients.get( i ).getStatus().equalsIgnoreCase( "connected" ) ) 
371                                         {
372 //                                              int res = 1 ;
373 //                                              try {
374 //                                                      res = clients.get( i ).getStub().startVM() ;
375 //                                              } catch( RemoteException e ) {
376 //                                                      e.printStackTrace();
377 //                                              }
378 //                                              
379 //                                              if( res == 0 )
380 //                                              {
381                                                         //clients.get(i).setStatus( "running" ) ;
382                                                 
383                                                 int pos = computingClients.indexOf( cc )  ;
384                                                 if( pos == -1 )
385                                                 {
386                                                         System.err.println( "Dead client not found in the computing clients list!" ) ;
387                                                 } else {
388                                                         System.out.println( "Trying to replace " + cc.getClient().getName() + " with " +
389                                                                         clients.get(i).getName() + " ... " ) ;
390                                                         
391                                                         String save_name = computingClients.get( pos ).getLastSave() ;
392                                                         
393                                                         ComputingClient ccl = new ComputingClient( clients.get(i) ) ;
394                                                         clients.get( i ).setComputingClient( ccl ) ;
395                                                         SaveNeighbor sn = computingClients.get( pos ).getSaveNeighbor() ;
396                                                         ccl.setSaveNeighbor( sn ) ;
397                                                         computingClients.set( pos, ccl ) ;
398
399                                                                 
400                                                         int res = 1 ;
401                                                         try {
402                                                                 res = computingClients.get( pos ).getClient().getStub().
403                                                                       retrieveSave( save_name ) ;
404                                                         } catch( RemoteException e ) {
405                                                                 System.err.println( "Unable to indicate to client to retrieve last save!" ) ;
406                                                                 e.printStackTrace() ;
407                                                                 yield() ;
408                                                         }
409                                                                 
410                                                         if( res == 0 )
411                                                         {
412                                                                 ok = true ;
413                                                                 
414                                                                 boolean ok_new = false, ok_old = false ;
415                                                                         
416                                                                 // replace dead client in vmIPs
417                                                                 for( int j = 0 ; j < vmIPs.size() ; j++ )
418                                                                 {
419                                                                         if( vmIPs.get( j ).getHostIP().equalsIgnoreCase( computingClients.get( pos ).getClient().getIP() ) )
420                                                                         {
421                                                                                 vmIPs.get( j ).setHostIP( "" ) ;
422                                                                                 ok_new = true ;
423                                                                         }
424                                                                         if( vmIPs.get( j ).getHostIP().equalsIgnoreCase( ipDead ) )
425                                                                         {
426                                                                                 String vmIP = vmIPs.get( j ).getVmIP() ;
427                                                                                 vmIPs.get( j ).setHostIP( computingClients.get( pos ).getClient().getIP() ) ;
428                                                                                 ok_old = true ;
429                                                                                 
430                                                                                 try {
431                                                                                         computingClients.get( pos ).getClient().getStub().setIPVM( vmIP ) ;
432                                                                                 } catch( RemoteException e ) {
433                                                                                         System.err.println( "Unable to set the new VM IP on the replacing client!" ) ;
434                                                                                         e.printStackTrace() ;
435                                                                                         yield() ;
436                                                                                 }
437                                                                                 
438                                                                                 if( ok_new && ok_old )
439                                                                                 {
440                                                                                         break ;
441                                                                                 }
442                                                                         }
443                                                                 }
444                                                                 
445                                                                 // Replacing in RunningApplication
446                                                                 applications.get( ind ).replaceComputingClient( cc, ccl ) ;
447                                                                 
448                                                                 for( int l = 0 ; l < applications.get( ind ).getComputingClients().size() ; l++ )
449                                                                 {
450                                                                         applications.get( ind ).getComputingClients().get( l ).setSaveRequest( false ) ;
451                                                                 }
452                                                                 
453                                                                         
454                                                                 System.out.println( "Successful redeployment of the VM." ) ;
455                                                         } else {
456                                                                 System.err.println( "Unable to deploy the save on the new computing client!" ) ;
457                                                         }
458                                                 }
459                                                         
460                                                 if( ok )
461                                                 {
462                                                         for( int k = 0 ; k < computingClients.size() ; k++ )
463                                                         {
464                                                                 try {
465                                                                         computingClients.get( k ).getClient().getStub().
466                                                                                 replaceSaveNeighbor( snDead, new SaveNeighbor( clients.get( i ).getStub() ) ) ;
467                                                                 } catch( RemoteException e ) {
468                                                                         System.err.println( "Unable to inform " + computingClients.get( k ).getClient().getName() +
469                                                                                         " of the replacement of a save neighbor!" ) ;
470                                                                         e.printStackTrace() ;
471                                                                 }
472                                                         }
473                                                         
474                                                         System.out.println( "Dead client successfully replaced." ) ;
475
476                                                         break ;
477                                                 } else {
478                                                         System.err.println( "Dead client not replaced!!" ) ;
479                                                 }
480                                         }
481                                 }
482                         }
483                         
484                         try {
485                                 synchronized( counter ) {
486                                         counter.dec() ;
487                                         counter.notifyAll() ;}
488                         } catch( Exception e ) {}
489                 }
490         }
491         
492         
493         /** Monitoring thread **/
494         private class ConnectedMonitor extends Thread
495         {
496                 boolean run ;
497                 
498                 ConnectedMonitor()
499                 {
500                         run = true ;
501                 }
502                 
503                 protected void stopMonitor() { run = false ; }
504                 
505                 @Override
506                 public void run() 
507                 {
508                         boolean change, dead ;
509                         
510                         while( run )
511                         {
512                                 change = false ;
513                                 Iterator<ConnectedClient> it = clients.iterator() ;
514                                 int nb_disconnections = 0 ;
515                                 int nb_disconnections_computing = 0 ;
516                                 
517                                 while( it.hasNext() )
518                                 {
519                                         ConnectedClient cl = it.next() ;
520                                         cl.incTimeout() ;
521                                         dead = false ;
522                                         
523                                         if( cl.getTimeout() > max_timeout || cl.getFail() )
524                                         {
525                                                 dead = true ;
526                                                 if( ! cl.getFail() )
527                                                 {
528                                                         try {
529                                                                 cl.getStub().echo() ;
530                                                                 cl.resetTimeout() ;
531                                                                 dead = false ;
532                                                         } catch( RemoteException e ) {
533                                                                 dead = true ;
534                                                         }
535                                                 } 
536                                                 
537                                                 if( dead )
538                                                 {
539                                                         System.out.println( "Disconnection of " + cl.getName() ) ;
540                                                         if( cl.getStatus().equalsIgnoreCase( "running" ) || cl.getStatus().equalsIgnoreCase( "saving" ) )
541                                                         {
542                                                                 System.out.println( "A VM was running on it!!" ) ;
543                                                                 System.out.println( "I will redeploy a save and restart all VM ..." ) ;
544                                 
545 //                                                              for( int i = 0 ; i < computingClients.size() ; i++ )
546 //                                                              {
547 //                                                                      if( computingClients.get( i ).getClient().getIP().equals( cl.getIP() ) )
548 //                                                                      {
549 //                                                                              computingClients.remove( i ) ;
550 //                                                                              break ;
551 //                                                                      }
552 //                                                              }
553                                                                 synchronized( counter )
554                                                                 {
555                                                                         counter.inc() ;
556                                                                 }
557                                                                 
558                                                                 new Server.FaultManager( cl ).start() ;
559                                                                 nb_disconnections_computing++ ;
560                                                         } else {
561                                                                 System.out.println( "There was no VM running on it." ) ;
562                                                                 System.out.println( "Maybe it will come back later :)" ) ;
563                                                         }
564                                                 
565                                                         synchronized( clients )
566                                                         {
567                                                                 it.remove() ;
568                                                         }
569                                                         nb_disconnections++ ;
570                                                         change = true ;
571                                                 }
572                                         }
573                                 }
574                                 
575                                 if( change )
576                                 {
577                                         synchronized( clients )
578                                         {
579                                                 if( clients.size() == 0 )
580                                                 {
581                                                         System.out.println( "There is no client connected." ) ;
582                                                 } else if( clients.size() == 1 ) {
583                                                         System.out.println( "There is one client connected." ) ;
584                                                 } else {
585                                                         System.out.println( "There are " + clients.size() + " clients connected." ) ;
586                                                 }
587                                         }
588                                 }
589                                 
590                                 
591                                 if( nb_disconnections_computing > 0 )
592                                 {
593                                         System.out.println( "Sending emergency stop signal to all computing nodes ... " ) ;
594                                         
595                                         for( int i = 0 ; i < clients.size() ; i++ )
596                                         {
597                                                 if( clients.get( i ).getStatus().equalsIgnoreCase( "running" ) 
598                                                         || clients.get( i ).getStatus().equalsIgnoreCase( "saving" ) )
599                                                 {
600                                                         try {
601                                                                 clients.get( i ).getStub().emergencyStop() ;
602                                                         } catch( RemoteException e ) {
603                                                                 System.err.println( "Unable to invoke emergency stop signal on " + clients.get( i ).getName() ) ;
604                                                                 e.printStackTrace() ;
605                                                                 yield() ;
606                                                         }
607                                                 }
608                                         }
609                                         
610                                         System.out.println( "I will redeploy save and restart VMs ... " ) ;
611                                         
612                                         synchronized( counter ) 
613                                         {
614                                                 if( counter.getNb() > 0 )
615                                                 {
616                                                         System.out.println( "... waiting all redeployments done ..." ) ;
617                                                 }
618                                         
619                                                 while( counter.getNb() != 0 )
620                                                 {
621                                                         try {
622                                                                 counter.wait() ;
623                                                         } catch( InterruptedException e ) {
624                                                                 e.printStackTrace() ;
625                                                                 yield() ;
626                                                         }
627                                                 }
628                                         }
629                                         
630                                         for( int i = 0 ; i < applications.get( ind ).getComputingClients().size() ; i++ )
631                                         {
632                                                 applications.get( ind ).getComputingClients().get( i ).setRestartOk( false ) ;
633                                                 
634                                                 new RestartVM( applications.get( ind ).getComputingClients().get( i ).getClient() ).start() ;
635                                                 
636 //                                              final ServicesClient sc = applications.get( ind ).getComputingClients().get( i ).getClient().getStub() ;
637                                                 
638 //                                              new Thread( new Runnable() {
639 //                                                      
640 //                                                      @Override
641 //                                                      public void run() 
642 //                                                      {
643 //                                                              try {
644 //                                                                      if( sc.restartVMAfterCrash() != 0 )
645 //                                                                      {
646 //                                                                              System.err.println( "Problem while restarting VM on " +sc.getName() + "!" ) ;
647 //                                                                      }
648 //                                                              } catch( RemoteException e ) {
649 //                                                                      try {
650 //                                                                              System.err.println( "Problem while restarting VM on " + sc.getName() + "!" ) ;
651 //                                                                      } catch( RemoteException e1 ) {
652 //                                                                              System.err.println( "Problem while restarting a VM!" ) ;
653 //                                                                              e1.printStackTrace() ;
654 //                                                                      }
655 //                                                                      e.printStackTrace() ;
656 //                                                                      yield() ;
657 //                                                              }
658 //                                                      }
659 //                                              } ).start() ;
660                                         }
661                                 }
662                                 
663                                 try 
664                                 {
665                                         Thread.sleep( 2000 ) ;
666                                 } catch( InterruptedException e ) {
667                                         e.printStackTrace() ;
668                                         yield() ;
669                                 }
670                         }
671                 }
672         }
673
674         @Override
675         public Integer saveOk( String _ip, String _saveName ) 
676         {
677                 int i ;
678                 for( i = 0 ; i < computingClients.size() ; i ++ )
679                 {
680                         if( computingClients.get( i ).getClient().getIP().equalsIgnoreCase( _ip ) ) 
681                         {
682                                 computingClients.get( i ).setLastSave( _saveName ) ;
683                                 computingClients.get( i ).setSaveStatus( true ) ;
684                                 break ;
685                         }
686                 }
687                 
688                 
689                 boolean all_ok = true ;
690                 i = 0 ;
691                 
692                 while( all_ok && i < computingClients.size() )
693                 {
694                         all_ok = all_ok & computingClients.get( i ).getSaveStatus() ;
695                         i++ ;
696                 }
697                 
698                 if( all_ok )
699                 {
700                         for( i = 0 ; i < computingClients.size() ; i++ )
701                         {
702                                 try {
703                                         computingClients.get( i ).getClient().getStub().saveOk() ;
704                                 } catch (RemoteException e) {
705                                         System.err.println( "Unable to invoke the saveOk method on " + computingClients.get( i ).getClient().getName() ) ;
706                                         e.printStackTrace() ;
707                                 }
708                                 computingClients.get( i ).setSaveStatus( false ) ;
709                         }
710                         
711                         applications.get( ind ).setLastSaveDate( System.currentTimeMillis() ) ;
712                 }
713                 
714                 return 0 ;
715         }
716         
717         
718         public Integer changeSaveName( String _ip, String _saveName )
719         {
720                 if( _ip != null && _ip.length() > 0 && _saveName != null && _saveName.length() > 0 )
721                 {
722                         for( int i = 0 ; i < computingClients.size() ; i ++ )
723                         {
724                                 if( computingClients.get( i ).getClient().getIP().equalsIgnoreCase( _ip ) ) 
725                                 {
726                                         computingClients.get( i ).setLastSave( _saveName ) ;
727                                         System.out.println( "Save name successfully changed on " + computingClients.get( i ).getClient().getName() ) ;
728                                         return 0 ;
729                                 }
730                         }
731                         
732                         System.err.println( "Unable to found computing client with IP " + _ip + "!" ) ;
733                         return 1 ;
734                 }
735                 
736                 System.err.println( "Unable to change save name. IP or save name empty ! (IP: " + _ip + " ; save name: " + _saveName +")" ) ;
737                 
738                 return 1 ;
739         }
740
741         
742         @Override
743         public ArrayList<ServicesClient> startApplication( int _nb ) 
744         {
745                 int nb = clients.size() - computingClients.size() ;
746                 
747                 if( nb > _nb && ! running )
748                 {
749                         running = true ;
750                         
751                         final ArrayList<ServicesClient> ac = new ArrayList<ServicesClient>() ;
752                         final ArrayList<ComputingClient> tmp = new ArrayList<ComputingClient>() ;
753                         
754                         RunningApplication app = new RunningApplication( "Test" ) ;
755                         
756                         int i = 0 ;
757                         boolean ok ;
758                         
759                         while( i < clients.size() && ac.size() < _nb )
760                         {
761                                 ok = false ;
762                                 if( clients.get( i ).getStatus().equalsIgnoreCase( "connected" ) ) 
763                                 {
764                                         synchronized( startingClients )
765                                         {
766                                                 while( ac.size() + startingClients.getNb() >= _nb )
767                                                 {
768                                                         if( ac.size() == _nb ) break ;
769                                                         
770                                                         try {
771                                                                 startingClients.wait() ;
772                                                         } catch( InterruptedException e ) {
773                                                                 e.printStackTrace() ;
774                                                         }
775                                                 }
776                                                 
777                                                 if( ac.size() < _nb )
778                                                 {
779                                                         startingClients.inc() ;
780                                                         ok = true ;
781                                                 }
782                                         }
783                                         
784                                         if( ok )
785                                         {
786                                                 synchronized( limitThread )
787                                                 {
788                                                         while( limitThread.getNb() >= maxThread )
789                                                         {
790                                                                 try {
791                                                                         limitThread.wait() ;
792                                                                 } catch (InterruptedException e) {
793                                                                         e.printStackTrace();
794                                                                 }
795                                                         } 
796                                                         
797                                                         limitThread.inc() ;
798                                                 }
799                                                 
800                                                 final int indice = i ;
801                                                 new Thread( new Runnable()
802                                                 {       
803                                                         @Override
804                                                         public void run() 
805                                                         {
806                                                                 int res = 1 ;
807                                                                 try {
808                                                                         res = clients.get( indice ).getStub().startVM( 0 ) ;
809                                                                 } catch( RemoteException e ) {
810                                                                         e.printStackTrace() ;
811                                                                 }
812                                                                 
813                                                                 if( res == 0 )
814                                                                 {
815                                                                         ac.add( clients.get( indice ).getStub() ) ;
816                                                                         clients.get( indice ).setStatus( "running" ) ;
817                                                                         ComputingClient cl = new ComputingClient( clients.get( indice ) ) ;
818                                                                         clients.get( indice ).setComputingClient( cl ) ;
819                                                                         computingClients.add( cl ) ;
820                                                                         tmp.add( cl ) ;
821                                                                 } else {
822                                                                         System.err.println( "Problem while launching the VM on " 
823                                                                                         + clients.get( indice ).getName() + "!" ) ;
824                                                                 }
825                                                                 
826                                                                 synchronized( limitThread )
827                                                                 {
828                                                                         limitThread.dec() ;
829                                                                         limitThread.notifyAll() ;
830                                                                 }
831                                                                 
832                                                                 synchronized( startingClients )
833                                                                 {
834                                                                         startingClients.dec() ;
835                                                                         startingClients.notifyAll() ;
836                                                                 }
837                                                         }
838                                                 }).start() ;
839                                         }
840                                 }
841                                 
842                                 i++ ;
843                         }
844                         
845                         if( ac.size() == _nb )
846                         {
847                                 app.setComputingClients( tmp ) ;
848                                 app.setRunning( true ) ;
849                                 app.setStartTime( System.currentTimeMillis() ) ;
850                                 
851                                 int index, index2 ;
852                                 /* Choosing save neighbors */
853                                 for( i = 0 ; i < tmp.size() ; i++ )
854                                 {
855                                         if( i == tmp.size() - 1 )
856                                         {
857                                                 index = computingClients.indexOf( tmp.get( i ) ) ;
858                                                 index2 = computingClients.indexOf( tmp.get( 0 ) ) ;
859                                                 
860                                                 if( index == -1 || index2 == -1 )
861                                                 {
862                                                         System.err.println( "Problem in ComputingClients list!" ) ;
863                                                 } else {
864                                                         try {
865                                                                 computingClients.get( index ).setSaveNeighbor( new SaveNeighbor( computingClients.get( index2 ).getClient().getStub() )) ;
866                                                         } catch( RemoteException e ) {
867                                                                 System.err.println( "Unable to create the save neighbor!" ) ;
868                                                                 e.printStackTrace() ;
869                                                         }
870                                                 }
871                                         } else {
872                                                 index = computingClients.indexOf( tmp.get( i ) ) ;
873                                                 index2 = computingClients.indexOf( tmp.get( i + 1 ) ) ;
874                                                 
875                                                 if( index == -1 || index2 == -1 )
876                                                 {
877                                                         System.err.println( "Problem in ComputingClients list!" ) ;
878                                                 } else {
879                                                         try {
880                                                                 computingClients.get( index ).setSaveNeighbor( new SaveNeighbor( computingClients.get( index2 ).getClient().getStub() ) ) ;
881                                                         } catch( RemoteException e ) {
882                                                                 System.err.println( "Unable to create the save neighbor!" ) ;
883                                                                 e.printStackTrace() ;
884                                                         }
885                                                 }
886                                         }
887                                 }
888                         }
889                         
890                         applications.add( app ) ;
891                         
892                         ind = applications.indexOf( app ) ;
893                         
894                         return ac ;
895                 }
896                 
897                 return null ;
898         }
899         
900         
901         @Override
902         public void requestSave( String _ip )
903         {
904                 try {
905                         semaSave.acquire() ;
906                 } catch( InterruptedException e ) {
907                         System.err.println( "Unable to obtain the semaphore for semaSave!" ) ;
908                         e.printStackTrace() ;
909                 }
910                 
911                 final String ip = _ip ;
912                 
913                 new Thread( new Runnable() {
914                         
915                         @Override
916                         public void run() 
917                         {
918                                 treatRequestSave( ip ) ;
919                         }
920                 } ).start() ;
921         }
922
923         
924         public void treatRequestSave( String _ip )
925         {
926                 if( applications.size() > 0 && _ip != null && _ip.length() > 0 )
927                 {
928                         if( (System.currentTimeMillis() - applications.get( ind ).getLastSaveDate()) > save_interleave )
929                         {
930                                 // Mark it as a requester
931                                 for( int i = 0 ; i < applications.get( ind ).getComputingClients().size() ; i++ )
932                                 {
933                                         if( applications.get( ind ).getComputingClients().get( i ).getClient().getIP().equalsIgnoreCase( _ip ) )
934                                         {
935                                                 applications.get( ind ).getComputingClients().get( i ).setSaveRequest( true ) ;
936                                                                                                 
937                                                 break ;
938                                         }
939                                 }
940                                 
941                                 semaSave.release() ;
942                                 
943                                 // Is every body ok?
944                                 boolean ok = false ;
945                                 for( int i = 0 ; i < applications.get( ind ).getComputingClients().size() ; i++ )
946                                 {
947                                         if( i == 0 )
948                                         {
949                                                 ok = applications.get( ind ).getComputingClients().get( i ).getSaveRequest() ;
950                                         } else {
951                                                 ok = ok & applications.get( ind ).getComputingClients().get( i ).getSaveRequest() ;     
952                                         }
953                                         
954                                         if( ! ok )
955                                         {
956                                                 break ;
957                                         }
958                                 }
959                                 
960                                 if( ok )
961                                 {
962 //                                      try {
963 //                                              Thread.sleep( 5000 ) ;
964 //                                      } catch( InterruptedException e1 ) {
965 //                                              e1.printStackTrace() ;
966 //                                      }
967                                         
968                                         for( int i = 0 ; i < applications.get( ind ).getComputingClients().size() ; i++ )
969                                         {
970                                                 try {
971                                                         applications.get( ind ).getComputingClients().get( i ).getClient().getStub().responseSave( true ) ;
972                                                         applications.get( ind ).getComputingClients().get( i ).setSaveRequest( false ) ;                                                        
973                                                 } catch( RemoteException e ) {
974                                                         System.err.println( "Unable to send the save request response to " +
975                                                                         applications.get( ind ).getComputingClients().get( i ).getClient().getName() + "!" ) ;
976                                                         e.printStackTrace() ; 
977                                                 }
978                                         }
979                                         
980                                         applications.get( ind ).setLastSaveDate( System.currentTimeMillis() ) ;
981                                 }
982                                 
983                         } else {
984                                 semaSave.release() ;
985                                 
986                                 for( int i = 0 ; i < applications.get( ind ).getComputingClients().size() ; i++ )
987                                 {
988                                         if( applications.get( ind ).getComputingClients().get( i ).getClient().getIP().equalsIgnoreCase( _ip ) )
989                                         {
990                                                 try {
991                                                         applications.get( ind ).getComputingClients().get( i ).getClient().getStub().responseSave( false ) ;
992                                                 } catch( RemoteException e ) {
993                                                         System.err.println( "Unable to send the save request response to " +
994                                                                         applications.get( ind ).getComputingClients().get( i ).getClient().getName() + "!" ) ;
995                                                         e.printStackTrace() ; 
996                                                 }
997                                                 break ;
998                                         }
999                                 }
1000                         }
1001                 } else {
1002                         semaSave.release() ;
1003                         System.err.println( "!! Serious problem in treatRequestSave method!!" ) ;
1004                 }
1005         }       
1006         
1007         
1008         @Override
1009         public void restartOk( String _ip )
1010         {
1011                 if( applications.size() > 0 && _ip != null && _ip.length() > 0 )
1012                 {
1013                         System.out.println( "Client " + _ip + " has finished to restart ("+applications.get( ind ).getComputingClients().size()+") ... " ) ;
1014 //                      if( (System.currentTimeMillis() - applications.get( ind ).getLastSaveDate()) > save_interleave )
1015                         {
1016                                 // Has it already finished?
1017                                 for( int i = 0 ; i < applications.get( ind ).getComputingClients().size() ; i++ )
1018                                 {
1019                                         if( applications.get( ind ).getComputingClients().get( i ).getClient().getIP().equalsIgnoreCase( _ip ) )
1020                                         {
1021                                                 applications.get( ind ).getComputingClients().get( i ).setRestartOk( true ) ;
1022                                                                                                 
1023                                                 break ;
1024                                         }
1025                                 }
1026                                 
1027                                 // Is everybody ok?
1028                                 boolean ok = false ;
1029                                 for( int i = 0 ; i < applications.get( ind ).getComputingClients().size() ; i++ )
1030                                 {
1031                                         if( i == 0 )
1032                                         {
1033                                                 ok = applications.get( ind ).getComputingClients().get( i ).getRestartOk() ;
1034                                         } else {
1035                                                 ok = ok & applications.get( ind ).getComputingClients().get( i ).getRestartOk() ;       
1036                                         }
1037                                         
1038                                         if( ! ok )
1039                                         {
1040                                                 break ;
1041                                         }
1042                                 }
1043                                 
1044                                 if( ok )
1045                                 {
1046                                         applications.get( ind ).setLastSaveDate( System.currentTimeMillis() ) ;
1047                                         
1048                                         for( int i = 0 ; i < applications.get( ind ).getComputingClients().size() ; i++ )
1049                                         {
1050                                                 applications.get( ind ).getComputingClients().get( i ).setRestartOk( false ) ;
1051                                         }
1052                                 }
1053                                 
1054                         }
1055                 }
1056         }       
1057         
1058         
1059         @Override
1060         public void goApplication()
1061         {
1062                 synchronized( applications ) {
1063                         if( running && ! applications.get( ind ).getStartMark() )
1064                         {
1065                                 System.out.println( "Application is starting." ) ;
1066                                 applications.get( ind ).setStartMark() ;
1067                                 applications.get( ind ).setStartTime( System.currentTimeMillis() ) ;
1068                                 applications.get( ind ).setLastSaveDate( System.currentTimeMillis() ) ;
1069                         }
1070                 }
1071         }
1072         
1073
1074         @Override
1075         public void endApplication() 
1076         {
1077                 synchronized( applications )
1078                 {
1079                         if( running )
1080                         {
1081                                 applications.get( ind ).setEndTime( System.currentTimeMillis() ) ;
1082                                 applications.get( ind ).setRunning( false ) ;
1083                                 applications.get( ind ).clear() ;
1084                         
1085                                 Iterator<ComputingClient> it = computingClients.iterator() ;
1086                 
1087                                 while( it.hasNext() )
1088                                 {
1089                                         ComputingClient cl = it.next() ;
1090
1091                                         try {
1092                                                 cl.getClient().getStub().emergencyStop() ;
1093                                         } catch (RemoteException e) {
1094                                                 e.printStackTrace();
1095                                         }
1096                         
1097                                         cl.getClient().setStatus( "connected" ) ;
1098                                         cl.getClient().setComputingClient( null ) ;
1099                                         it.remove() ;
1100                                         cl = null ;
1101                                 }
1102                 
1103                         
1104                                 applications.get( ind ).clear() ;
1105                         
1106                                 running = false ;
1107                         
1108                                 System.out.println( "Application " + applications.get( ind ).getName() + " ends in " + 
1109                                         applications.get( ind ).getExecutionTime() + " seconds." ) ;
1110                         }
1111                 }
1112         }
1113
1114
1115
1116         @Override
1117         public String getAssociatedIP( String _ip )
1118         {
1119                 String ret = null ;
1120                 
1121                 for( int i = 0 ; i < vmIPs.size() ; i++ )
1122                 {
1123                         if( vmIPs.get( i ).getHostIP().equalsIgnoreCase( _ip ) )
1124                         {
1125                                 ret = vmIPs.get( i ).getVmIP() ;
1126                                 break ;
1127                         }
1128                 }
1129                 
1130                 return ret ;
1131         }
1132         
1133         
1134         public Integer deployVM( final String _name, final String _archive, final String _directory )
1135         {
1136                 int nb = 0, pb = 0 ;
1137                 
1138                 if( _name != null && _name.length() > 0 && _archive != null && _name.length() > 0 
1139                                 && _directory != null && _directory.length() > 0 )
1140                 {
1141                         System.out.println( "Deploying the VM " + _name + " (" + _archive + ") ... " ) ;
1142                         
1143                         File file = new File( working_directory + "/" + _archive ) ;
1144                         if( ! file.exists() )
1145                         {
1146                                 System.err.println( "There is no archive named " + _archive + " in my working directory!" ) ;
1147                                 file = null ;
1148                                 return 1 ;
1149                         } else if( file.isDirectory() ) {
1150                                 System.err.println( _archive + " is a directory!" ) ;
1151                                 file = null ;
1152                                 return 1 ;
1153                         }
1154                         
1155                         file = null ;
1156                         
1157                         // TODO do a better deployment !!
1158 //                      int ret ;
1159 //                      boolean error, ok, server ;
1160 //                      ArrayList<ConnectedClient> deployed = new ArrayList<ConnectedClient>() ;
1161                         
1162 //                      boolean server = true ;
1163 //                      boolean ok ;
1164                         
1165                         for( int i = 0 ; i < clients.size() ; i++ )
1166                         {
1167 //                              ret = 1 ;
1168                                 nb++ ;
1169 //                              error = false ;
1170 //                              ok = false ;
1171                                 if( clients.get( i ).getStatus().equalsIgnoreCase( "connected" ) )
1172                                 {                                       
1173                                         synchronized( limitThread )
1174                                         {
1175                                                 while( limitThread.getNb() >= maxThread )
1176                                                 {
1177                                                         try {
1178                                                                 limitThread.wait() ;
1179                                                         } catch (InterruptedException e) {
1180                                                                 e.printStackTrace();
1181                                                         }
1182                                                 } 
1183         
1184                                                 limitThread.inc() ;
1185                                         }
1186                                         
1187                                         final int indice = i ;
1188                                         new Thread( new Runnable() {
1189
1190                                                 @Override
1191                                                 public void run() {
1192                                                         int ret = -1 ;
1193                                                         boolean error = true  ;
1194                                                         
1195                                                         try {
1196                                                                 ret = clients.get( indice ).getStub().deployVM( _name, _archive, _directory ) ;
1197                                                         } catch( RemoteException e ) {
1198                                                                 System.err.println( "Unable to deploy the VM on " + clients.get( indice ).getName() + "!" ) ;
1199                                                                 e.printStackTrace() ;
1200                                                         }
1201                                         
1202                                                         // The client does not have the archive, we have to send it.
1203                                                         if( ret == 2 )
1204                                                         {
1205                                                                 // Attention au multi-envois !!!
1206                                                                 System.out.print( "Sending VM archive to " + clients.get( indice ).getName() + " ... " ) ;
1207                                         
1208                                                                 String wd = "" ;
1209                                                                 String snIP = "" ;
1210                                                                 error = false ;
1211                                         
1212                                                                 try {
1213                                                                         wd = clients.get( indice ).getStub().getWorkingDirectory() ;
1214                                                                         snIP = clients.get( indice ).getStub().getIPHost() ;
1215                                                                 } catch (RemoteException e2) {
1216                                                                         System.err.println( "Unable to retrieve information on " + clients.get( indice ).getName() + "!" ) ;
1217                                                                         e2.printStackTrace() ;
1218                                                                         error = true ;
1219                                                                 }
1220                                         
1221                                                                 String[] command = new String[]{ "/usr/bin/scp", working_directory + "/" + _archive,
1222                                                                                 snIP + ":" + wd } ;
1223                                 
1224                                                                 if( ! error )
1225                                                                 {
1226                                                                         try {
1227                                                                                 Process proc = Runtime.getRuntime().exec( command ) ;
1228                                                                                 proc.waitFor() ;
1229                         
1230                                                                                 if( proc.exitValue() == 0 )
1231                                                                                 {
1232                                                                                         System.out.println( "Initial VM archive successfully sent." ) ;
1233                                                                                 } else {
1234                                                                                         System.err.println( "Initial VM archive not sent!" ) ;
1235                                                                                         System.err.println( "Error: " + proc.exitValue() ) ;
1236                                                                                         BufferedReader b = new BufferedReader( new InputStreamReader( proc.getErrorStream() ) ) ;
1237                                                         
1238                                                                                         String l ;
1239                                                                                         try {
1240                                                                                                 while( (l = b.readLine()) != null )
1241                                                                                                 {
1242                                                                                                         System.err.println( l ) ;
1243                                                                                                 }
1244                                                                                         } catch( IOException e ) {
1245                                                                                                 e.printStackTrace() ;
1246                                                                                         }
1247                                                         
1248                                                                                         error = true ;
1249                                                                                 }
1250                                                                         } catch( IOException e ) {
1251                                                                                 System.err.println( "Error during initial VM archive send command: " ) ;
1252                                                                                 e.printStackTrace() ;
1253                                                                                 error = true ;
1254                                                                         } catch( InterruptedException e ) {
1255                                                                                 e.printStackTrace() ;
1256                                                                                 error = true ;
1257                                                                         }
1258                                                                 }
1259                                 
1260                                                                 
1261                                                                 if( ! error )
1262                                                                 {                               
1263                                                                         // Second try ...
1264                                                                         ret = 1 ;
1265                                                                         try {
1266                                                                                 ret = clients.get( indice ).getStub().deployVM( _name, _archive, _directory ) ;
1267                                                                         } catch( RemoteException e ) {
1268                                                                                 System.err.println( "Unable to deploy the VM on " + clients.get( indice ).getName() + "!" ) ;
1269                                                                                 e.printStackTrace() ;
1270                                                                         }
1271                                                                 }
1272                                                         }
1273                                 
1274                                                         if( ret == 0 )
1275                                                         {
1276                                                                 System.out.println( "Initial VM archive successfully deployed on " + clients.get( indice ).getName() + "." ) ;
1277                                                                 
1278                                                                 synchronized( deployingClients )
1279                                                                 {
1280                                                                         deployingClients.inc() ;
1281                                                                 }
1282                                                         }
1283                                                         
1284                                                         synchronized( limitThread )
1285                                                         {
1286                                                                 limitThread.dec() ;
1287                                                                 limitThread.notifyAll() ;
1288                                                         }
1289                                                 }
1290                                         }).start() ;            
1291                                 }
1292                         }
1293                 }
1294                 
1295                 synchronized( limitThread )
1296                 {
1297                         while( limitThread.getNb() > 0  )
1298                         {
1299                                 try {
1300                                         limitThread.wait() ;
1301                                 } catch( InterruptedException e ) {
1302                                         e.printStackTrace() ;
1303                                 }
1304                         }
1305                 }
1306                 
1307                 if( nb - deployingClients.getNb() > 0 )
1308                 {
1309                         if( pb == 1 )
1310                                 System.err.println( "** " + pb + " machine is not deployed!" ) ;
1311                         if( pb > 1 )
1312                                 System.err.println( "** " + pb + " machines are not deployed!" ) ;
1313                 }
1314                 
1315                 return nb ;
1316         }
1317         
1318         
1319         public String getWorkingDirectory()
1320         {
1321                 return working_directory ;
1322         }
1323         
1324         
1325         private class OperatingClients
1326         {
1327                 private int nb ;
1328                 
1329                 OperatingClients() { nb = 0 ; }
1330                 
1331                 protected void inc() { nb++ ; }
1332                 
1333                 protected void dec() { nb-- ; }
1334                 
1335                 protected int getNb() { return nb ; }
1336         }
1337         
1338         
1339         private class LimitThread
1340         {
1341                 private int nb ;
1342                 
1343                 LimitThread() { nb = 0 ; }
1344                 
1345                 protected void inc() { nb++ ; }
1346                 
1347                 protected void dec() { nb-- ; }
1348                 
1349                 protected int getNb() { return nb ; }
1350         }
1351         
1352         
1353         private class RestartVM extends Thread
1354         {
1355                 private ConnectedClient cc ;
1356                 
1357                 protected RestartVM( ConnectedClient _cc )
1358                 {
1359                         cc = _cc ;
1360                 }
1361                         
1362                 public void run()
1363                 {
1364                         boolean error = false ;
1365                         if( cc != null )
1366                         {
1367                                 try {
1368                                         if( cc.getStub().restartVMAfterCrash() != 0 )
1369                                         {
1370                                                 System.err.println( "Problem while restarting VM on " + cc.getName() + "!" ) ;
1371                                                 error = true ;
1372                                         }
1373                                 } catch( RemoteException e ) {
1374                                         e.printStackTrace() ;
1375                                         error = true ;
1376                                         yield() ;
1377                                 }
1378                         } else {
1379                                 System.err.println( "The client to restart is null!" ) ;
1380                         }
1381                         
1382                         if( error )
1383                         {
1384                                 cc.setFail( true ) ;
1385                                 
1386                                 try {
1387                                         System.out.print( "Trying to stop the client ... " ) ;
1388                                         cc.getStub().stop() ;
1389                                         System.out.println( "successful client stop." );
1390                                 } catch( RemoteException e ) {
1391                                         System.out.println( "unsuccessful client stop!" ) ;
1392                                         e.printStackTrace() ;
1393                                 }
1394                         }
1395                 }
1396         }
1397         
1398 }
1399
1400 /** La programmation est un art, respectons ceux qui la pratiquent !! **/
1401