From de28e13f31d33f1741cd941ba8103684979c226f Mon Sep 17 00:00:00 2001 From: =?utf8?q?S=C3=A9bastien=20Miqu=C3=A9e?= Date: Fri, 7 May 2010 10:55:54 +0200 Subject: [PATCH] New development version. - Correct interaction with the Mapping library. - Correction of some bugs. - Cleaning (partially) the code. - /!\ Some code is used for tests with fault tolerance ! This code is fully dependent on my account on Grid'5000 ! To be cleaned /!\ --- src/jaceP2P/JaceDaemon.java | 140 ++++-- src/jaceP2P/JaceInterface.java | 4 +- src/jaceP2P/JaceP2P.java | 30 +- src/jaceP2P/JaceServer.java | 64 ++- src/jaceP2P/JaceSpawner.java | 447 +++++++++++++----- src/jaceP2P/JaceSpawnerServer.java | 24 +- src/jaceP2P/JaceSuperNode.java | 7 +- src/jaceP2P/JaceSuperNodeInterface.java | 11 +- src/jaceP2P/JaceSuperNodeServer.java | 396 ++++++++++------ src/jaceP2P/ListeTask.java | 13 +- src/jaceP2P/LocalHost.java | 3 + src/jaceP2P/Register.java | 52 +- src/jaceP2P/ScanThread.java | 6 +- src/jaceP2P/SuperNodeListe.java | 64 ++- src/jaceP2P/Task.java | 10 +- src/jaceP2P/TaskId.java | 10 +- .../UpdateRegisterConcernedThread.java | 247 ++++++---- src/jaceP2P/UpdateRegisterThread.java | 25 +- 18 files changed, 1064 insertions(+), 489 deletions(-) diff --git a/src/jaceP2P/JaceDaemon.java b/src/jaceP2P/JaceDaemon.java index 21a8c17..f894793 100644 --- a/src/jaceP2P/JaceDaemon.java +++ b/src/jaceP2P/JaceDaemon.java @@ -3,30 +3,36 @@ package jaceP2P; import java.net.ServerSocket; import java.net.Socket; import java.rmi.Naming; +import java.rmi.RemoteException; +import java.rmi.registry.LocateRegistry; +import java.rmi.registry.Registry; +import java.rmi.server.UnicastRemoteObject; +import and.Mapping.GNode; import and.Mapping.Utils; public class JaceDaemon { // attributes public static JaceDaemon Instance = null; private String snode_IP = null; - private int snode_port = 1098; - private int daemonPort = 1098; + private int snode_port = 1099; + private int daemonPort = 1099; private int heartTime; // HeartBeat frequency private String protocol; private boolean running = false; - public JaceDaemon(String superNode, int port, String comProtocol) { - if (!superNode.equals("-d")) { + public JaceDaemon( String superNode, int port, String comProtocol ) { + if( !superNode.equals( "-d" ) ) + { // snode_IP = LocalHost.Instance().resolve(superNode); // get IP of // the super node - snode_IP = superNode; + snode_IP = superNode ; } - daemonPort = port; - protocol = comProtocol; - running = true; - Instance = this; + daemonPort = port ; + protocol = comProtocol ; + running = true ; + Instance = this ; // initialize(); } @@ -66,20 +72,14 @@ public class JaceDaemon { * **/ public void initialize() { - LocalHost.Instance().setPort(daemonPort); - exportObject(); // Iinstanciate the JaceServer localy - reconnectSuperNode(); // Connect to one of the SuperNodes - if (protocol.equals("socket")) { - listenForRequests(); + LocalHost.Instance().setPort( daemonPort ) ; + exportObject() ; // Instanciate the JaceServer localy + reconnectSuperNode() ; // Connect to one of the SuperNodes + + if( protocol.equals( "socket" ) ) + { + listenForRequests() ; } - - /* - * System.out.println("sleep de 5 secondes avt tuer Daemon"); try { - * Thread.sleep(5000); } catch(Exception e1) {} - * - * - * sortir(); - */ } /** @@ -112,20 +112,54 @@ public class JaceDaemon { * **/ private void exportObject() { - // if( protocol.equals( "rmi" ) ) { - JaceInterface ref = null; - JaceServer myServer = null; - - //System.out.println("Name of local machine is : " - // + LocalHost.Instance().getName()); - //System.out.println("IP of local machine is : " - // + LocalHost.Instance().getIP()); + JaceInterface ref = null ; + JaceServer myServer = null ; + Registry reg ; + try { + + while( true ) + { + reg = LocateRegistry.getRegistry( daemonPort ) ; + + String tab[] = reg.list() ; + + System.out.println( "There is an existing RMI Registry on port "+daemonPort+" with "+tab.length+" entries!" ) ; + for( int i = 0 ; i < tab.length ; i++ ) + { + try { + if( UnicastRemoteObject.unexportObject( Naming.lookup(tab[i]), true ) ) + { + System.out.println( "Register successfuly deleted!" ) ; + } else { + System.err.println( "Register undeleted !!!" ) ; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + if( UnicastRemoteObject.unexportObject( myServer, true ) ) + { + System.out.println( "Register successfuly deleted!" ) ; + } else { + System.err.println( "Register undeleted !!!" ) ; + } + } + } catch( RemoteException e ) { + } + + + try { + if (System.getSecurityManager() == null) { + System.setSecurityManager(new SecurityManager()); + } + // launch the JaceServer myServer = new JaceServer( /* this */); - java.rmi.registry.LocateRegistry.createRegistry(daemonPort); - java.rmi.registry.LocateRegistry.getRegistry(daemonPort).rebind( + LocateRegistry.createRegistry(daemonPort); + LocateRegistry.getRegistry(daemonPort).rebind( "JaceServer", myServer); ref = (JaceInterface) Naming.lookup("rmi://" + LocalHost.Instance().getIP() + ":" + daemonPort @@ -139,7 +173,6 @@ public class JaceDaemon { } LocalHost.Instance().setStub(ref); - // } } /** @@ -160,7 +193,8 @@ public class JaceDaemon { } } - + + /** * **/ @@ -189,11 +223,11 @@ public class JaceDaemon { } catch (Exception e) { //System.out // .println("Snode not launched, try another one (1/2s)"); - try { - Thread.sleep(500); - } catch (Exception e1) { - // nothing - } +// try { +// Thread.sleep(500); +// } catch (Exception e1) { +// // nothing +// } } } @@ -254,10 +288,26 @@ public class JaceDaemon { // LocalHost.Instance().getIP(), // LocalHost.Instance().getName(), // daemonPort ) ; + + GNode me = Utils.createGNode() ; + + if( me == null ) + { + System.err.println( "Problem during the creation of my GNode !!" ) ; + System.exit( 1 ) ; + } else { + if( me.getIP().equals( "" ) || me.getIP() == null + || me.getIP().isEmpty() + || ! me.getIP().equals( LocalHost.Instance().getIP())) + { + System.err.println( "Problem with my IP address in my GNode !!" ) ; + System.exit( 2 ) ; + } + } snodeStub.workerRegistering(LocalHost.Instance().getStub(), LocalHost.Instance().getIP(), LocalHost.Instance() - .getName(), daemonPort, Utils.createGNode()); + .getName(), daemonPort, me ) ; // Launching the heart beat thread HeartBeatThread.Instance().setHeartTime(heartTime); @@ -317,13 +367,13 @@ public class JaceDaemon { LocalHost.Instance().setStartedThreads(false); System.out.println("Reinitialization of the Daemon"); - System.out.println("I kill the application if any exists"); + System.out.println( "I kill the application if any exists" ) ; // ScanThread.Instance().setScanning( false ) ; // System.out.println( "Set ScanThread off" ) ; // Sender.Instance().kill() ; try { - Thread.sleep(2500); + Thread.sleep( 2500 ) ; } catch (Exception e) { // nothing } @@ -339,11 +389,11 @@ public class JaceDaemon { Register.Instance().purge(); } catch (Exception e) { - System.err.println("Crashed killing the application : " + e); + System.err.println( "Crashed killing the application : " + e ) ; } - System.out.println("Daemon reinitialized"); - // Runtime.getRuntime().gc() ; + System.out.println( "Daemon reinitialized" ) ; + System.gc() ; reconnectSuperNode(); } diff --git a/src/jaceP2P/JaceInterface.java b/src/jaceP2P/JaceInterface.java index f49b598..e6694df 100644 --- a/src/jaceP2P/JaceInterface.java +++ b/src/jaceP2P/JaceInterface.java @@ -73,7 +73,9 @@ public interface JaceInterface extends Remote { public void setSpawner(JaceSpawnerInterface spawnerStub) throws RemoteException; - public void updateRegister(Node oldNode, Node node) throws RemoteException; + public int updateRegister(Node oldNode, Node node, int rank) throws RemoteException; public void getBackupForNewNode(int rank) throws RemoteException; + + public void suicide2(String string)throws RemoteException ; } diff --git a/src/jaceP2P/JaceP2P.java b/src/jaceP2P/JaceP2P.java index b089874..3322d0a 100644 --- a/src/jaceP2P/JaceP2P.java +++ b/src/jaceP2P/JaceP2P.java @@ -29,8 +29,8 @@ public class JaceP2P { if (entity.equals("-Daemon")) { System.out .println("\n*************** LAUNCHING ---- DAEMON *****"); - JaceDaemon daemon = new JaceDaemon(superNodeName, port, - comProtocol); + JaceDaemon daemon = new JaceDaemon( superNodeName, port, + comProtocol ) ; daemon.initialize(); } else if (entity.equals("-Spawner")) { if (args.length > 9) { @@ -40,28 +40,34 @@ public class JaceP2P { int nbDaemonPerThread = -1; int nbSavingNodes = -1; int algoMapping = -1; + int test = -1 ; double paramAlgo = 0.5 ; + int nbFault = 0 ; + int faultTime = 0 ; try { nbDaemonPerSpawner = new Integer(args[4]).intValue(); nbDaemonPerThread = new Integer(args[5]).intValue(); nbSavingNodes = new Integer(args[6]).intValue(); - algoMapping = new Integer(args[7]).intValue(); - paramAlgo = new Double(args[8]).doubleValue(); + test = new Integer(args[7]).intValue(); + nbFault = new Integer(args[8]).intValue(); + faultTime = new Integer(args[9]).intValue(); + algoMapping = new Integer(args[10]).intValue(); + paramAlgo = new Double(args[11]).doubleValue(); } catch (NumberFormatException e) { System.err.println("The number of Daemons per spawner and the number of daemons per thread must be integers: " + e); System.exit(0); } //System.out.println("=====> " + algoMapping); - param = new String[args.length - 9]; - for (int i = 9; i < args.length; i++) { - param[i - 9] = args[i]; + param = new String[args.length - 12]; + for (int i = 12; i < args.length; i++) { + param[i - 12] = args[i]; System.out.println("=> " + args[i]); } JaceSpawner spawner = new JaceSpawner(superNodeName, port, comProtocol, param, nbDaemonPerSpawner, - nbDaemonPerThread, nbSavingNodes, algoMapping, paramAlgo); + nbDaemonPerThread, nbSavingNodes, algoMapping, paramAlgo, test, nbFault, faultTime); spawner.initialize(); } @@ -100,19 +106,19 @@ public class JaceP2P { System.out .println("-Daemon :