if( ! lastSaveOk )
{
- String arch1 = "", arch2 = "" ;
- File file = new File( working_directory + "/" + machine.getName() + "_new_" + machine.getComputationId() + ".tar" ) ;
+// String arch1 = "", arch2 = "" ;
+// File file = new File( working_directory + "/" + machine.getName() + "_new_" + machine.getComputationId() + ".tar" ) ;
+// if( file.exists() )
+// {
+// arch1 = working_directory + "/" + machine.getName() + "_new_" + machine.getComputationId() + ".tar" ;
+// }
+//
+// file = null ;
+//
+ String arch = "" ;
+ File file = new File( working_directory + "/" + machine.getName() + "_new_" + machine.getComputationId() + ".tar.gz" ) ;
if( file.exists() )
{
- arch1 = working_directory + "/" + machine.getName() + "_new_" + machine.getComputationId() + ".tar" ;
+ arch = working_directory + "/" + machine.getName() + "_new_" + machine.getComputationId() + ".tar.gz" ;
}
file = null ;
- file = new File( working_directory + "/" + machine.getName() + "_new_" + machine.getComputationId() + ".tar.gz" ) ;
- if( file.exists() )
- {
- arch2 = working_directory + "/" + machine.getName() + "_new_" + machine.getComputationId() + ".tar.gz" ;
- }
-
- file = null ;
-
- if( arch1.length() > 0 || arch2.length() > 0 )
+ if( arch.length() > 0 )
{
System.out.println( "Deletion of last nok archive ... " ) ;
- command = new String[]{ "/bin/rm", "-rf",
- arch1, arch2 } ;
+ command = new String[]{ "/bin/rm", "-rf", arch } ;
try {
procSave = Runtime.getRuntime().exec( command ) ;
System.out.print( "Creation of the archive ... " ) ;
/** Archive creation **/
- command = new String[]{ "/bin/tar", "-cf",
- machine.getName() + "_new_" + machine.getComputationId() + ".tar",
- machine.getDirectory(), "-C", working_directory } ;
+ command = new String[]{ "/bin/tar", "-cz", "-C", working_directory,
+ "-f", working_directory + "/" + machine.getName() + "_new_" + machine.getComputationId() + ".tar.gz",
+ machine.getDirectory() } ;
if( emergencyStop )
{
error = true ;
}
- /** Compression of the archive **/
- if( ! error )
- {
- System.out.print( "Compression of the archive ... " ) ;
- command = new String[]{ "/bin/gzip",
- working_directory + "/" + machine.getName()
- + "_new_" + machine.getComputationId() + ".tar" } ;
-
- if( emergencyStop )
- {
- return 1 ;
- }
-
- try {
- procSave = Runtime.getRuntime().exec( command ) ;
- procSave.waitFor() ;
-
- if( procSave.exitValue() == 0 )
- {
- System.out.println( "Archive successfully compressed." ) ;
- } else {
- System.err.println( "Archive not compressed!" ) ;
- printProcessError( procSave ) ;
-
- error = true ;
- }
- } catch( IOException e ) {
- System.err.println( "Error during archive compression command: " ) ;
- e.printStackTrace() ;
- error = true ;
- } catch( InterruptedException e ) {
- e.printStackTrace() ;
- error = true ;
- }
- }
+// /** Compression of the archive **/
+// if( ! error )
+// {
+// System.out.print( "Compression of the archive ... " ) ;
+// command = new String[]{ "/bin/gzip",
+// working_directory + "/" + machine.getName()
+// + "_new_" + machine.getComputationId() + ".tar" } ;
+//
+// if( emergencyStop )
+// {
+// return 1 ;
+// }
+//
+// try {
+// procSave = Runtime.getRuntime().exec( command ) ;
+// procSave.waitFor() ;
+//
+// if( procSave.exitValue() == 0 )
+// {
+// System.out.println( "Archive successfully compressed." ) ;
+// } else {
+// System.err.println( "Archive not compressed!" ) ;
+// printProcessError( procSave ) ;
+//
+// error = true ;
+// }
+// } catch( IOException e ) {
+// System.err.println( "Error during archive compression command: " ) ;
+// e.printStackTrace() ;
+// error = true ;
+// } catch( InterruptedException e ) {
+// e.printStackTrace() ;
+// error = true ;
+// }
+// }
long fin = System.currentTimeMillis() ;
// date_last_save = 0 ;
isRestartedSave = false ;
- lastSaveOk = true ;
+ lastSaveOk = false ;
/** Connection to server **/
try {
private class PingServer extends Thread
{
- private boolean run ;
+ protected boolean go ;
PingServer()
{
- run = true ;
+ go = true ;
}
- protected void stopPing() { run = false ; }
+ protected void stopPing() { go = false ; }
@Override
public void run()
{
- while( run )
+ while( go )
{
try {
LocalHost.Instance().getServerStub().ping( LocalHost.Instance().getIP() ) ;
}
try {
- Thread.sleep( 2000 ) ;
+ sleep( 2000 ) ;
} catch( InterruptedException e ) {
e.printStackTrace() ;
}
private class DialogVMServer extends Thread
{
- private boolean run ;
+ protected boolean go ;
private Socket socket ;
- private ArrayList<DialogVM> dialogs = new ArrayList<DialogVM>() ;
+// private ArrayList<DialogVM> dialogs = new ArrayList<DialogVM>() ;
DialogVMServer()
{
- run = true ;
+ go = true ;
}
protected void stopDialogVMServer()
{
- run = false ;
+ go = false ;
if( serverSocket != null )
{
try {
serverSocket.close() ;
- for( int i = 0 ; i < dialogs.size() ; i++ )
- {
- dialogs.get( i ).stopDialogVM() ;
- }
+// for( int i = 0 ; i < dialogs.size() ; i++ )
+// {
+// dialogs.get( i ).stopDialogVM() ;
+// }
} catch( IOException e ) {
e.printStackTrace() ;
System.err.println( "Unable to launch the SocketServer on port " + dialog_port + "!" ) ;
e.printStackTrace() ;
- run = false ;
+ go = false ;
}
- while( run )
+ while( go )
{
try {
socket = serverSocket.accept() ;
- dialogs.add( new DialogVM( socket ) ) ;
- dialogs.get( dialogs.size() - 1 ).start() ;
+ new DialogVM( socket ).start() ;
+// dialogs.add( new DialogVM( socket ) ) ;
+// dialogs.get( dialogs.size() - 1 ).start() ;
} catch( IOException e ) {
System.err.println( "Problem with the accept function!" ) ;
e.printStackTrace() ;
private class DialogVM extends Thread
{
- private boolean run ;
+ protected boolean go ;
private Socket socket ;
private BufferedReader reader ;
private String line ;
- DialogVM( Socket _socket ) { run = true ; socket = _socket ; }
+ DialogVM( Socket _socket ) { go = true ; socket = _socket ; }
protected void stopDialogVM()
{
- run = false ;
+ go = false ;
try {
reader.close() ; reader = null ;
stopDialogVM() ;
}
- while( run )
+// while( go )
{
try {
line = null ;
}
/** VM is starting -- retrieving informations **/
- if( run && line != null && line.equalsIgnoreCase( "infos" ) )
+ if( go && line != null && line.equalsIgnoreCase( "infos" ) )
{
/* Receiving name */
machine.setName( reader.readLine() ) ;
reader.close() ; reader = null ;
socket.close() ; socket = null ;
- run = false ;
+ go = false ;
}
/** It's time to do a save **/
- if( run && line != null && line.equalsIgnoreCase( "save" ) )
+ if( go && line != null && line.equalsIgnoreCase( "save" ) )
{
try {
machine.setComputationId( Integer.parseInt( reader.readLine() ) ) ;
reader.close() ; reader = null ;
socket.close() ; socket = null ;
- run = false ;
+ go = false ;
saveRequest.setStatus( false ) ;
/** Computation is done, we can shutdown the VM **/
- if( run && line != null && line.equalsIgnoreCase( "quit" ) )
+ if( go && line != null && line.equalsIgnoreCase( "quit" ) )
{
try {
Thread.sleep( 5000 ) ;
reader.close() ; reader = null ;
socket.close() ; socket = null ;
- run = false ;
+ go = false ;
stopVM() ;
}
} catch( IOException e ) {
+ go = false ;
e.printStackTrace() ;
}
}
private String working_directory ;
private long save_interleave ;
private Semaphore semaSave ;
+ private OperatingClients startingClients ;
+ private OperatingClients deployingClients ;
+ private LimitThread limitThread ;
+ private int maxThread;
protected Server() throws RemoteException
{
if( _stub != null )
{
- String ip = "" ;
- try {
- ip = _stub.getIPHost() ;
- } catch (RemoteException e) {
- e.printStackTrace() ;
- return 1 ;
- }
-
- boolean exists = false ;
- int i ;
-
- for( i = 0 ; i < clients.size() ; i++ )
+ synchronized( clients )
{
- if( ip.equals( clients.get( i ).getIP() ) )
+ String ip = "" ;
+ try {
+ ip = _stub.getIPHost() ;
+ } catch (RemoteException e) {
+ e.printStackTrace() ;
+ return 1 ;
+ }
+
+ boolean exists = false ;
+ int i ;
+
+ for( i = 0 ; i < clients.size() ; i++ )
{
- exists = true ;
- System.out.println( "Client already connected!" ) ;
- break ;
+ if( ip.equals( clients.get( i ).getIP() ) )
+ {
+ exists = true ;
+ System.out.println( "Client already connected!" ) ;
+ break ;
+ }
}
- }
-
- if( exists )
- {
- System.out.println( "The client stub will be replaced." ) ;
- clients.get( i ).setStub( _stub ) ;
- System.out.println( "(reconnection of " + clients.get( i ).getName() + ")" ) ;
- return 2 ;
- } else {
- System.out.println( "New connection!" ) ;
- clients.add( new ConnectedClient( _stub ) ) ;
- System.out.println( "(connection of " + clients.get( clients.size() - 1 ).getName() + ")" ) ;
- generateVmIP( ip ) ;
-
- if( clients.size() == 0 )
+
+ if( exists )
{
- System.out.println( "There is no client connected." ) ;
- } else if( clients.size() == 1 ) {
- System.out.println( "There is one client connected." ) ;
+ System.out.println( "The client stub will be replaced." ) ;
+ clients.get( i ).setStub( _stub ) ;
+ System.out.println( "(reconnection of " + clients.get( i ).getName() + ")" ) ;
+ return 2 ;
} else {
- System.out.println( "There are " + clients.size() + " clients connected." ) ;
- }
+ System.out.println( "New connection!" ) ;
+ clients.add( new ConnectedClient( _stub ) ) ;
+ System.out.println( "(connection of " + clients.get( clients.size() - 1 ).getName() + ")" ) ;
+ generateVmIP( ip ) ;
- return 0 ;
+ if( clients.size() == 0 )
+ {
+ System.out.println( "There is no client connected." ) ;
+ } else if( clients.size() == 1 ) {
+ System.out.println( "There is one client connected." ) ;
+ } else {
+ System.out.println( "There are " + clients.size() + " clients connected." ) ;
+ }
+
+ return 0 ;
+ }
}
}
for( int i = 0 ; i < clients.size() ; i++ )
{
if( _ip.equals( clients.get( i ).getIP() ) )
- {
- clients.get( i ).setStatus( _status ) ;
- System.out.println( "Client " + clients.get( i ).getName() + " changed its status to: " + _status ) ;
+ {
+ if( _status.equalsIgnoreCase( "stopped" ) )
+ {
+ System.out.println( "Client " + clients.get( i ).getName() + " is stopped." ) ;
+ clients.get( i ).setStatus( "connected" ) ;
+ ComputingClient cc = clients.get( i ).getComputingClient() ;
+ computingClients.remove( cc ) ;
+ clients.get( i ).setComputingClient( null ) ;
+ } else {
+ clients.get( i ).setStatus( _status ) ;
+ System.out.println( "Client " + clients.get( i ).getName() + " changed its status to: " + _status ) ;
+ }
+
break ;
}
}
applications = new ArrayList<RunningApplication>() ;
monitor = null ;
+ startingClients = new OperatingClients() ;
+ deployingClients = new OperatingClients() ;
+ limitThread = new LimitThread() ;
+ maxThread = 20 ;
+
working_directory = "/localhome/vmware" ;
save_interleave = 30 * 60 * 1000 ;
System.out.println( "Maybe it will come back later :)" ) ;
}
- it.remove() ;
+ synchronized( clients )
+ {
+ it.remove() ;
+ }
nb_disconnections++ ;
change = true ;
}
if( change )
{
- if( clients.size() == 0 )
+ synchronized( clients )
{
- System.out.println( "There is no client connected." ) ;
- } else if( clients.size() == 1 ) {
- System.out.println( "There is one client connected." ) ;
- } else {
- System.out.println( "There are " + clients.size() + " clients connected." ) ;
+ if( clients.size() == 0 )
+ {
+ System.out.println( "There is no client connected." ) ;
+ } else if( clients.size() == 1 ) {
+ System.out.println( "There is one client connected." ) ;
+ } else {
+ System.out.println( "There are " + clients.size() + " clients connected." ) ;
+ }
}
}
if( computingClients.get( i ).getClient().getIP().equalsIgnoreCase( _ip ) )
{
computingClients.get( i ).setLastSave( _saveName ) ;
- System.out.println( "Save name successfully change for " + _ip ) ;
+ System.out.println( "Save name successfully changed for " + _ip ) ;
return 0 ;
}
}
if( nb > _nb )
{
- ArrayList<ServicesClient> ac = new ArrayList<ServicesClient>() ;
- ArrayList<ComputingClient> tmp = new ArrayList<ComputingClient>() ;
+ final ArrayList<ServicesClient> ac = new ArrayList<ServicesClient>() ;
+ final ArrayList<ComputingClient> tmp = new ArrayList<ComputingClient>() ;
RunningApplication app = new RunningApplication( "Test" ) ;
int i = 0 ;
+ boolean ok ;
while( i < clients.size() && ac.size() < _nb )
{
- if( clients.get(i).getStatus().equalsIgnoreCase( "connected" ) )
+ ok = false ;
+ if( clients.get( i ).getStatus().equalsIgnoreCase( "connected" ) )
{
- int res = 1 ;
- try {
- res = clients.get( i ).getStub().startVM( 0 ) ;
- } catch( RemoteException e ) {
- e.printStackTrace();
+ synchronized( startingClients )
+ {
+ while( ac.size() + startingClients.getNb() >= _nb )
+ {
+ if( ac.size() == _nb ) break ;
+
+ try {
+ startingClients.wait() ;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ if( ac.size() < _nb )
+ {
+ startingClients.inc() ;
+ ok = true ;
+ }
}
- if( res == 0 )
+ if( ok )
{
- ac.add( clients.get( i ).getStub() ) ;
- clients.get( i ).setStatus( "running" ) ;
- ComputingClient cl = new ComputingClient( clients.get( i ) ) ;
- clients.get( i ).setComputingClient( cl ) ;
- computingClients.add( cl ) ;
- tmp.add( cl ) ;
- } else {
- System.err.println( "Problem while launching the VM on "
- + clients.get(i).getName() + "!" ) ;
+ synchronized( limitThread )
+ {
+ while( limitThread.getNb() >= maxThread )
+ {
+ try {
+ limitThread.wait() ;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ limitThread.inc() ;
+ }
+
+ final int ind = i ;
+ new Thread( new Runnable(){
+
+ @Override
+ public void run()
+ {
+ int res = 1 ;
+ try {
+ res = clients.get( ind ).getStub().startVM( 0 ) ;
+ } catch( RemoteException e ) {
+ e.printStackTrace();
+ }
+
+ if( res == 0 )
+ {
+ ac.add( clients.get( ind ).getStub() ) ;
+ clients.get( ind ).setStatus( "running" ) ;
+ ComputingClient cl = new ComputingClient( clients.get( ind ) ) ;
+ clients.get( ind ).setComputingClient( cl ) ;
+ computingClients.add( cl ) ;
+ tmp.add( cl ) ;
+ } else {
+ System.err.println( "Problem while launching the VM on "
+ + clients.get(ind).getName() + "!" ) ;
+ }
+
+ synchronized( limitThread )
+ {
+ limitThread.dec() ;
+ limitThread.notifyAll() ;
+ }
+
+ synchronized( startingClients )
+ {
+ startingClients.dec() ;
+ startingClients.notifyAll() ;
+ }
+ }
+ }).start() ;
}
}
}
- public Integer deployVM( String _name, String _archive, String _directory )
+ public Integer deployVM( final String _name, final String _archive, final String _directory )
{
- int pb = -1 ;
- int nb = 0 ;
+ int nb = 0, pb = 0 ;
if( _name != null && _name.length() > 0 && _archive != null && _name.length() > 0
&& _directory != null && _directory.length() > 0 )
file = null ;
// TODO do a better deployment !!
- int ret ;
- boolean error ;
+// int ret ;
+// boolean error, ok, server ;
+// ArrayList<ConnectedClient> deployed = new ArrayList<ConnectedClient>() ;
- pb = 0 ;
+ boolean server = true ;
+ boolean ok ;
for( int i = 0 ; i < clients.size() ; i++ )
{
- ret = 1 ;
- error = false ;
+// ret = 1 ;
+ nb++ ;
+// error = false ;
+// ok = false ;
if( clients.get( i ).getStatus().equalsIgnoreCase( "connected" ) )
- {
- try {
- ret = clients.get( i ).getStub().deployVM( _name, _archive, _directory ) ;
- } catch( RemoteException e ) {
- System.err.println( "Unable to deploy the VM on " + clients.get( i ).getName() + "!" ) ;
- e.printStackTrace() ;
+ {
+ synchronized( limitThread )
+ {
+ while( limitThread.getNb() >= maxThread )
+ {
+ try {
+ limitThread.wait() ;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ limitThread.inc() ;
}
- // The client does not have the archive, we have to send it.
- if( ret == 2 )
- {
- System.out.print( "Sending VM archive to " + clients.get( i ).getName() + " ... " ) ;
+ final int ind = i ;
+ new Thread( new Runnable() {
+
+ @Override
+ public void run() {
+ int ret = -1 ;
+ boolean error = true ;
+
+ try {
+ ret = clients.get( ind ).getStub().deployVM( _name, _archive, _directory ) ;
+ } catch( RemoteException e ) {
+ System.err.println( "Unable to deploy the VM on " + clients.get( ind ).getName() + "!" ) ;
+ e.printStackTrace() ;
+ }
- String wd = "" ;
- String snIP = "" ;
- error = false ;
+ // The client does not have the archive, we have to send it.
+ if( ret == 2 )
+ {
+ // Attention au multi-envois !!!
+ System.out.print( "Sending VM archive to " + clients.get( ind ).getName() + " ... " ) ;
- try {
- wd = clients.get( i ).getStub().getWorkingDirectory() ;
- snIP = clients.get( i ).getStub().getIPHost() ;
- } catch (RemoteException e2) {
- System.err.println( "Unable to retrieve information on " + clients.get( i ).getName() + "!" ) ;
- e2.printStackTrace() ;
- error = true ;
- pb++ ;
- }
+ String wd = "" ;
+ String snIP = "" ;
+ error = false ;
- String[] command = new String[]{ "/usr/bin/scp", working_directory + "/" + _archive,
- snIP + ":" + wd } ;
+ try {
+ wd = clients.get( ind ).getStub().getWorkingDirectory() ;
+ snIP = clients.get( ind ).getStub().getIPHost() ;
+ } catch (RemoteException e2) {
+ System.err.println( "Unable to retrieve information on " + clients.get( ind ).getName() + "!" ) ;
+ e2.printStackTrace() ;
+ error = true ;
+ }
+
+ String[] command = new String[]{ "/usr/bin/scp", working_directory + "/" + _archive,
+ snIP + ":" + wd } ;
- if( ! error )
- {
- try {
- Process proc = Runtime.getRuntime().exec( command ) ;
- proc.waitFor() ;
-
- if( proc.exitValue() == 0 )
+ if( ! error )
{
- System.out.println( "Initial VM archive successfully sent." ) ;
- } else {
- System.err.println( "Initial VM archive not sent!" ) ;
- System.err.println( "Error: " + proc.exitValue() ) ;
- BufferedReader b = new BufferedReader( new InputStreamReader( proc.getErrorStream() ) ) ;
-
- String l ;
try {
- while( (l = b.readLine()) != null )
+ Process proc = Runtime.getRuntime().exec( command ) ;
+ proc.waitFor() ;
+
+ if( proc.exitValue() == 0 )
{
- System.err.println( l ) ;
+ System.out.println( "Initial VM archive successfully sent." ) ;
+ } else {
+ System.err.println( "Initial VM archive not sent!" ) ;
+ System.err.println( "Error: " + proc.exitValue() ) ;
+ BufferedReader b = new BufferedReader( new InputStreamReader( proc.getErrorStream() ) ) ;
+
+ String l ;
+ try {
+ while( (l = b.readLine()) != null )
+ {
+ System.err.println( l ) ;
+ }
+ } catch( IOException e ) {
+ e.printStackTrace() ;
+ }
+
+ error = true ;
}
} catch( IOException e ) {
+ System.err.println( "Error during initial VM archive send command: " ) ;
+ e.printStackTrace() ;
+ error = true ;
+ } catch( InterruptedException e ) {
e.printStackTrace() ;
+ error = true ;
}
-
- error = true ;
- pb++ ;
}
- } catch( IOException e ) {
- System.err.println( "Error during initial VM archive send command: " ) ;
- e.printStackTrace() ;
- error = true ;
- pb++ ;
- } catch( InterruptedException e ) {
- e.printStackTrace() ;
- error = true ;
- pb++ ;
- }
- }
-
- if( error )
- {
- continue ;
- }
+
+ if( ! error )
+ {
+ // Second try ...
+ ret = 1 ;
+ try {
+ ret = clients.get( ind ).getStub().deployVM( _name, _archive, _directory ) ;
+ } catch( RemoteException e ) {
+ System.err.println( "Unable to deploy the VM on " + clients.get( ind ).getName() + "!" ) ;
+ e.printStackTrace() ;
+ }
+ }
+ }
- // Second try ...
- ret = 1 ;
- try {
- ret = clients.get( i ).getStub().deployVM( _name, _archive, _directory ) ;
- } catch( RemoteException e ) {
- System.err.println( "Unable to deploy the VM on " + clients.get( i ).getName() + "!" ) ;
- e.printStackTrace() ;
- pb++ ;
+ if( ret == 0 )
+ {
+ System.out.println( "Initial VM archive successfully deployed on " + clients.get( ind ).getName() + "." ) ;
+
+ synchronized( deployingClients )
+ {
+ deployingClients.inc() ;
+ }
+ }
+
+ synchronized( limitThread )
+ {
+ limitThread.dec() ;
+ limitThread.notifyAll() ;
+ }
}
- }
-
- if( ret == 0 )
- {
- System.out.println( "Initial VM archive successfully deployed on " + clients.get( i ).getName() + "." ) ;
- nb++ ;
- }
+ }).start() ;
+ }
+ }
+ }
+
+ synchronized( limitThread )
+ {
+ while( limitThread.getNb() > 0 )
+ {
+ try {
+ limitThread.wait() ;
+ } catch( InterruptedException e ) {
+ e.printStackTrace() ;
}
}
}
- if( pb > 0 )
+ if( nb - deployingClients.getNb() > 0 )
{
if( pb == 1 )
System.err.println( "** " + pb + " machine is not deployed!" ) ;
return working_directory ;
}
+
+ private class OperatingClients
+ {
+ private int nb ;
+
+ OperatingClients() { nb = 0 ; }
+
+ protected void inc() { nb++ ; }
+
+ protected void dec() { nb-- ; }
+
+ protected int getNb() { return nb ; }
+ }
+
+
+ private class LimitThread
+ {
+ private int nb ;
+
+ LimitThread() { nb = 0 ; }
+
+ protected void inc() { nb++ ; }
+
+ protected void dec() { nb-- ; }
+
+ protected int getNb() { return nb ; }
+ }
+
}
/** La programmation est un art, respectons ceux qui la pratiquent !! **/