X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/jaceP2P.git/blobdiff_plain/75b8c0f2d17fa325685084522752884ef794febf..HEAD:/src/jaceP2P/JaceSpawner.java diff --git a/src/jaceP2P/JaceSpawner.java b/src/jaceP2P/JaceSpawner.java index a6b1244..4c1672f 100644 --- a/src/jaceP2P/JaceSpawner.java +++ b/src/jaceP2P/JaceSpawner.java @@ -1,9 +1,19 @@ package jaceP2P; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; import java.rmi.Naming; +import java.rmi.RemoteException; +import java.rmi.UnmarshalException; +import java.util.ArrayList; import java.util.Calendar; import java.util.GregorianCalendar; -import java.util.Vector; +import java.util.Random; +import java.util.concurrent.Semaphore; + +import and.Mapping.Algo; +import and.Mapping.Utils; public class JaceSpawner { private Class c; @@ -12,8 +22,8 @@ public class JaceSpawner { public static JaceSpawner Instance; private static String superNode_IP = null; private int superNode_port = 1098; - private static int spawnerPort = 1099; - private static JaceSuperNodeInterface centralServer = null; + private static int spawnerPort = 1098; + protected static JaceSuperNodeInterface centralServer = null; private JaceSpawnerInterface spawnerRef = null; private int nbTasks; private String appliName; @@ -31,21 +41,37 @@ public class JaceSpawner { private int z = 0; private static int nbOfDaemonsPerSpawner; private static int nbOfDeamonsPerThread; - private Vector spawnersList; + private ArrayList spawnersList; private int rank; private int nbSavingNodes; // Variables for Mapping private int algo; private double paramAlgo ; + private String idAlgo ; + + private Semaphore sema ; + + // ** Test ** // + private int test; + private double dh ; + protected int nbFault ; + protected int faultTime ; public JaceSpawner(String superNode, int port, String comProtocol, String[] args, int nbDaemonPerSpawner, int nbDaemonPerThread, - int nbSavingNodes, int _algo, double _paramAlgo) { + int nbSavingNodes, int _algo, double _paramAlgo, int _test, int _nbF, int _fT) { // superNode_IP = LocalHost.Instance().resolve(superNode); algo = _algo; paramAlgo = _paramAlgo ; + + test = _test ; + nbFault = _nbF ; + faultTime = _fT ; + + sema = new Semaphore( 1, true ) ; + superNode_IP = superNode; spawnerPort = port; protocol = comProtocol; @@ -96,7 +122,7 @@ public class JaceSpawner { public JaceSpawner(String[] params, String appliName, Register reg, int nbTasks, JaceSuperNodeInterface snodeStub, int rank, int heartTime, int tag, int nbdc, int nbsdc, - int nbDaemonPerSpawner, int nbDaemonPerThread) { + int nbDaemonPerSpawner, int nbDaemonPerThread, String _idAlgo) { try { nbOfDaemonsPerSpawner = nbDaemonPerSpawner; nbOfDeamonsPerThread = nbDaemonPerThread; @@ -111,7 +137,11 @@ public class JaceSpawner { } catch (Exception e) { System.err.println("Error in copying the parameters: " + e ) ; } - // System.out.println("xxxxxxxxxxxxxxx reg size="+reg.getSize()+" xxxxxxxxxxxxxx"); + + idAlgo = _idAlgo ; + + sema = new Semaphore( 1, true ) ; + this.appliName = appliName; this.nbTasks = nbTasks; @@ -142,7 +172,7 @@ public class JaceSpawner { RunningApplication.Instance().setRunning(true); RunningApplication.Instance().setNumberOfDisconnections(nbdc); RunningApplication.Instance().setNumberOfSpawnerDisconnections(nbsdc); - // System.out.println("+++++++++++++++++++++++++"); + Instance = this; // if(tag==0) broadcastRegister(1); @@ -179,7 +209,7 @@ public class JaceSpawner { return nbOfDaemonsPerSpawner; } - public void startProcess(Vector spawnersList) { + public void startProcess(ArrayList spawnersList) { this.spawnersList = spawnersList; int is = spawnersList.indexOf((Object) Register.Instance() @@ -213,7 +243,7 @@ public class JaceSpawner { ScanThreadSpawner.Instance().start(); broadcastScanning(); -// System.out.println("apres broadcastScanning"); + new StartScanning().start(); } else { System.err.println("Cannot find myself in the spawnersList !"); @@ -228,6 +258,12 @@ public class JaceSpawner { public void initialize() { // if(protocol.equals("rmi")){ // launch the JaceSpawnerServer + if (System.getSecurityManager() == null) { + System.setSecurityManager(new SecurityManager()); + } + +// signal = false ; + exportObject(); connectSuperNode(); @@ -239,6 +275,8 @@ public class JaceSpawner { createAppli(); createSpawnerNetwork(); + // ** Tests ** // + new FaultMake().run() ; // } } @@ -262,96 +300,259 @@ public class JaceSpawner { } // /System.out.println("is running = false"); if (!JaceDaemon.Instance().isRunning()) - System.exit(0); + System.exit( 1 ) ; } - public synchronized void signalDeadNode(JaceInterface host, int rankOfDead) { + public synchronized void signalDeadNode(JaceInterface host, int rankOfDead ) + { + + try { + sema.acquire() ; + } catch (InterruptedException e3) { + System.err.println( "Problem while acquiring the semaphore in signalDeadNode!" ) ; + e3.printStackTrace() ; + } + + + TaskId myTaskId = null; + int nb = 0; + int nbC = 0; + long time = 0; + RunningApplication.Instance().incrementNumberOfDisconnections(); + + time = RunningApplication.Instance().getChrono().getValue() / 1000; + nb = RunningApplication.Instance().getNumberOfDisconnections(); + nbC = RunningApplication.Instance().getNumberOfProblems(); + System.out.println( "At time = " + time + "s, NbDisconnection = " + + nb + ", NbProblem = " + nbC ) ; + + if( host == null ) + System.err.println( "Signal of a node null!" ) ; + + // !!!!!!!!!!!!!!actualiser le ListeTask ds le Register try { - TaskId myTaskId = null; - int nb = 0; - int nbC = 0; - long time = 0; - RunningApplication.Instance().incrementNumberOfDisconnections(); - - time = RunningApplication.Instance().getChrono().getValue() / 1000; - nb = RunningApplication.Instance().getNumberOfDisconnections(); - nbC = RunningApplication.Instance().getNumberOfCouilles(); - System.out.println("At time = " + time + "s, NbDisconnection = " - + nb + ", NbProblem = " + nbC); - - // !!!!!!!!!!!!!!actualiser le ListeTask ds le Register myTaskId = Register.Instance().getListeOfTasks() - .getTaskIdOfHostStub(host); - if (myTaskId == null) { - Register.Instance.getListeOfTasks().viewAll(); - myTaskId = Register.Instance().getListeOfTasks() - .getTaskIdOfRank(rankOfDead); - JaceInterface deadStub = myTaskId.getHostStub(); - deadStub.suicide("Not doing a good work"); + .getTaskIdOfHostStub( host ) ; + } catch( Exception e ) {} + + if (myTaskId == null) + { + myTaskId = Register.Instance().getListeOfTasks() + .getTaskIdOfRank( rankOfDead ) ; + if( myTaskId == null ) + { + System.err.println( "Houston we have a serious problem!!" ) ; + sema.release() ; + return ; } - myTaskId.setHostIP(null); - myTaskId.setHostName(null); - Node noeud = Register.Instance().getNodeOfStub( - myTaskId.getHostStub()); - myTaskId.setHostStub(null); - int rankDeaD = myTaskId.getRank(); - - String nomNoeud = noeud.getName(); - // Register.Instance().removeNodeAt(i); - // Register.Instance().removeNode(host.getIP()); - // System.out.println("fait le remove : taille = " + - // Register.Instance().getSize()); - - boolean b = Register.Instance().removeNodeOfName(noeud.getName()); - - if (b == true) { - System.out.println("Removing Node of rank " - + rankDeaD + " : size = " - + Register.Instance().getSize()); + + JaceInterface deadStub = myTaskId.getHostStub() ; + + if( deadStub != null ) + { + try{ + deadStub.suicide2( "Not doing a good work" ) ; + }catch(Exception e){} } else { - System.err - .println("Cannot remove the Node, it doesn't exist anymore: size = " - + Register.Instance().getSize()); + System.err.println( "Dead node stub unavailable!" ) ; } + } + + Node noeud = Register.Instance().getNodeOfName( myTaskId.getHostName() ) ; + + int rankDeaD = myTaskId.getRank() ; + + String nomNoeud = "" ; + + if( noeud != null ) + { + nomNoeud = noeud.getName() ; + } - Calendar cal = new GregorianCalendar(); - System.out.println("At time=" + cal.get(Calendar.MINUTE) + ":" - + cal.get(Calendar.SECOND)); + boolean b = false ; + + if( ! nomNoeud.equals( "" ) ) + { + b = Register.Instance().removeNodeOfName( nomNoeud ) ; + } else { + System.err.println( "Dead node name unknown!!" ) ; + } + + if( ! b ) + { + b = Register.Instance().removeNode( noeud ) ; + } - // retrouver SI POSSIBLE un autre libre pr remplacer celui la pr - // cette tache + if( b == true ) + { + System.out.println( "Removing Node of rank " + + rankDeaD + " : size = " + + Register.Instance().getSize() ) ; + } else { + System.err.println( "Cannot remove the Node, it doesn't exist anymore: size = " + + Register.Instance().getSize() ) ; + } + + Calendar cal = new GregorianCalendar() ; + System.out.println( "At time=" + cal.get(Calendar.MINUTE) + ":" + + cal.get(Calendar.SECOND) ) ; + + /**** Sébastien Miquée **/ + Node tmpNode = null ; + boolean ok = false ; + int retry = 0, retryMax = 4 ; + + while( tmpNode == null && ! ok ) + { + ok = true ; + + while( tmpNode == null ) + { + tmpNode = foundToReplaceThisNode( rankDeaD, nomNoeud, noeud ) ; + + if( tmpNode == null ) + { + System.err.println( "I didn't receive a new Node! (" + retry + ")" ) ; + + try{ + Thread.sleep( 1000 ) ; + } catch( Exception e ) {} + + retry++ ; + + if( retry > retryMax ) + { + System.err.println( "Unable to replace the dead node " + nomNoeud ) ; + sema.release() ; + return ; + } + } + } + + System.out.println( "Using Node " + tmpNode.getName() + " in order to replace " + nomNoeud + "\n" ) ; + + tmpNode.setAliveFlag( true ) ; + tmpNode.setAliveTime() ; + tmpNode.setAppliName( RunningApplication.Instance().getName() ) ; + + // Replacing the node in the Register + int is = Register.Instance().existNode( tmpNode ) ; + + if( is != -1 ) + { + System.out.println( "The Node is already in the register! I don't add it." ) ; + System.out.println( "Node " + tmpNode.getName() + " not added!" ) ; + tmpNode = null ; + } else { + Register.Instance().addNode( tmpNode ) ; + + myTaskId = Register.Instance().getListeOfTasks().getTaskIdOfRank( rankDeaD ) ; + myTaskId.setHostIP( tmpNode.getIP() ) ; + myTaskId.setHostName( tmpNode.getName() ) ; + myTaskId.setHostStub( tmpNode.getStub() ) ; + + int neighborRank ; + + if( rankDeaD == 0 ) + { + neighborRank = Register.Instance().getSize() - 1 ; + } else { + neighborRank = rankDeaD - 1 ; + } + + TaskId neighborTask2 = Register.Instance().getListeOfTasks() + .getTaskIdOfRank( neighborRank ) ; + + int loop = 0, MAX_LOOP = 1 ; + boolean ook = false ; + JaceInterface jaceStub = null ; + + while( !ook && loop < MAX_LOOP ) + { + try { + jaceStub = neighborTask2.getHostStub() ; + jaceStub.updateHeart( tmpNode.getStub() ) ; + + ook = true ; + } catch( Exception e ) { + loop++ ; + + if( loop < MAX_LOOP ) + { + try { + Thread.sleep( loop * 2000 ) ; + } catch( InterruptedException e1 ) {} + } + + if( loop == MAX_LOOP ) + { + System.err.println( "Next node unreachable! " + e ) ; + } + } + } + + if( loop == MAX_LOOP && ! ok ) + { + + try { + jaceStub.suicide2( "Not reachable!" ) ; + } catch (RemoteException e1) { + System.err.println( "Unable to suicide the node!\n"+e1 ) ; + } + + try { + int pos = Register.Instance().existNodeOfStub( jaceStub ) ; + + if( pos != -1 ) + { + centralServer.delGNodeFromList( Register.Instance().getNodeAt( pos ), 0, LocalHost.Instance().getIP() ) ; + } else { + System.err.println( "The ping neighbor of " + tmpNode.getName() + " does not respond. It is probably dead ..." ) ; + } + } catch( RemoteException e ) { + System.err.println( "Unable to signal dead node to SuperNode!" ) ; + e.printStackTrace() ; + } + } + } - /**** Sébastien Miquée **/ - //Node tmpNode = foundToReplaceThisNode(rankDeaD, nomNoeud); - Node tmpNode = foundToReplaceThisNode(rankDeaD, nomNoeud, noeud); try { - // broadcastRegister(0); - updateConcernedNodes(rankDeaD, noeud, tmpNode); + updateConcernedNodes( rankDeaD, noeud, tmpNode ) ; - Thread.sleep(500); - System.out.println("Set scanning on %%%%%%"); - tmpNode.getStub().setScanning(true); - } catch (Exception e) { - System.err.println("Unable to setScannig on for the new node: " - + e); + Thread.sleep( 500 ) ; + tmpNode.getStub().setScanning( true ) ; + System.out.println( "Set scanning on " + tmpNode.getName() ) ; + } catch( Exception e ) { + try { + Thread.sleep( 2000 ) ; + + tmpNode.getStub().setScanning( true ) ; + } catch( InterruptedException e1 ) { + } catch( RemoteException e2 ) { + System.err.println( "Unable to setScanning on for the new node: " + e2 ) ; + ok = false ; + } } - // Register.Instance().getListeOfTasks().viewAll(); - for (int z = 0; z < spawnersList.size(); z++) - if (!((JaceSpawnerInterface) spawnersList.get(z)) - .equals(Register.Instance().getSpawnerStub())) + for( int z = 0 ; z < spawnersList.size() ; z++ ) + { + if( !( (JaceSpawnerInterface) spawnersList.get( z ) ) + .equals(Register.Instance().getSpawnerStub() ) ) + { try { ((JaceSpawnerInterface) spawnersList.get(z)) - .replaceDeamonBy(noeud, tmpNode, rankDeaD); + .replaceDeamonBy(noeud, tmpNode, rankDeaD); } catch (Exception e) { - System.err - .println("Unable to broadcast the modifications to all the spawners: " - + e); + System.err.println( "Unable to broadcast the modifications to all the spawners: " + + e ) ; + ok = false ; } - } catch (Exception ee) { - System.err.println("Error in signalDeadNode() :" + ee); + } + } } + + sema.release() ; } // verifie si les noeud notes vivant ds le Register.Instance() du SuperNode @@ -390,108 +591,171 @@ public class JaceSpawner { // pr les requisitionner /*** Sébastien Miquée ***/ - - //private synchronized Node foundToReplaceThisNode(int theRank, String nom) { - private synchronized Node foundToReplaceThisNode(int theRank, String nom, Node _n) { - // int i = 0; - boolean found = false; - Node node = null; + private synchronized Node foundToReplaceThisNode(int theRank, String nom, Node _n) + { + boolean found = false ; + Node node = null ; + int nbNull = 0 ; - while (found == false) { + while( found == false ) { try { - - //node = centralServer.getNewNode(LocalHost.Instance().getIP()); - node = centralServer.getNewNode(LocalHost.Instance().getIP(), _n); + node = null ; - if( node != null ) + if( centralServer == null ) { - found = true ; + centralServer = LocalHost.Instance().getSuperNodeStub() ; + + if( centralServer == null ) + { + System.err.println( "Connection lost to SuperNode!" ) ; + System.err.println( "Reconnecting..." ) ; + connectSuperNode() ; + } + } + + node = centralServer.getNewNode( idAlgo, theRank ) ; + + if( node != null && ! node.getIP().isEmpty() && node.getIP() != null + && ! node.getIP().equals( "" ) ) + { + if( Register.Instance().existNode( node ) != -1 ) + { + found = false ; + } else { + found = true ; + } } else { - Thread.sleep( 1000 ) ; - System.out.println("Pas de bon retour !"); + System.err.println( "Returned node is null!" ) ; + nbNull++ ; + try { + Thread.sleep( 2000 ) ; + } catch( Exception e ) {} } - } catch (Exception e) { + } catch (RemoteException e) { // trouver un autre superNode et lui demander le noeud a lui - - System.err.println("Cannot localize SuperNode ! " + e); - - connectSuperNode(); + System.err.println( "Cannot localize SuperNode ! " + e ) ; + connectSuperNode() ; } } + return node ; - if (node != null) { - System.out.println("Using Node " + node.getName() + " (" - + node.getIP() + ") in order to replace " + nom - + " size before add: " + Register.Instance().getSize() - + "\n\n"); - node.setAliveFlag(true); - node.setAliveTime(); - - // rajouter le noeud ds le Register - node.setAppliName(RunningApplication.Instance().getName()); - - // lui envoyer mon stub pr qu'il commence a me pinguer des - // maintenant - // TODO a mettre ds un thread ???? - - /* - * TaskId - * neighborTask=Register.Instance().getListeOfTasks().getTaskIdOfRank - * ((theRank+1)%Register.Instance().getListeOfTasks().getSize()); - * try{ node.getStub().updateHeart(neighborTask.getHostStub()); } - * catch(Exception e) { - * System.out.println("nvx noeud deja plu dispo2"); //node = null; } - */ - // TODO verif pourkoi superNode me le redonne - // alors qu'il fait deja du calcul - // int is = Register.Instance().existNode(node.getIP()); - int is = Register.Instance().existNode(node); - if (is != -1) { - System.out.println("The Node is already in the register ! I don't add it."); - System.out.println("Node " + node.getName() + " not added !") ; - node = null; - } else { - Register.Instance().addNode(node); - - // !!!!!!!!!!!!!!actualiser le ListeTask - TaskId myTaskId = Register.Instance().getListeOfTasks() - .getTaskIdOfRank(theRank); - myTaskId.setHostIP(node.getIP()); - myTaskId.setHostName(node.getName()); - myTaskId.setHostStub(node.getStub()); - - // Register.Instance().getListeOfTasks().viewAll(); - int neighborRank; - if (theRank == 0) - neighborRank = Register.Instance().getSize() - 1; - else - neighborRank = theRank - 1; - TaskId neighborTask2 = Register.Instance().getListeOfTasks() - .getTaskIdOfRank(neighborRank); - try { - JaceInterface jaceStub = neighborTask2.getHostStub(); - jaceStub.updateHeart(node.getStub()); - } catch (Exception e) { - System.err.println("Next node unreachable ! " + e); - // node = null; - } - - } - - } else { - System.out.println("I didn't receive a new Node !"); - } - return node; +// if( node != null ) +// { +// System.out.println( "Using Node " + node.getName() + " in order to replace " + nom + "\n" ) ; +// +// node.setAliveFlag(true); +// node.setAliveTime(); +// +// // rajouter le noeud ds le Register +// node.setAppliName(RunningApplication.Instance().getName()); +// +// // lui envoyer mon stub pr qu'il commence a me pinguer des +// // maintenant +// // TODO a mettre ds un thread ???? +// +// /* +// * TaskId +// * neighborTask=Register.Instance().getListeOfTasks().getTaskIdOfRank +// * ((theRank+1)%Register.Instance().getListeOfTasks().getSize()); +// * try{ node.getStub().updateHeart(neighborTask.getHostStub()); } +// * catch(Exception e) { +// * System.out.println("nvx noeud deja plu dispo2"); //node = null; } +// */ +// // TODO verif pourkoi superNode me le redonne +// // alors qu'il fait deja du calcul +// // int is = Register.Instance().existNode(node.getIP()); +// int is = Register.Instance().existNode( node ) ; +// +// if( is != -1 ) +// { +// System.out.println("The Node is already in the register ! I don't add it."); +// System.out.println("Node " + node.getName() + " not added !") ; +// node = null; +// } else { +// Register.Instance().addNode(node); +// +// // !!!!!!!!!!!!!!actualiser le ListeTask +// TaskId myTaskId = Register.Instance().getListeOfTasks() +// .getTaskIdOfRank(theRank); +// myTaskId.setHostIP(node.getIP()); +// myTaskId.setHostName(node.getName()); +// myTaskId.setHostStub(node.getStub()); +// +// int neighborRank; +// if (theRank == 0) +// { +// neighborRank = Register.Instance().getSize() - 1; +// } else { +// neighborRank = theRank - 1; +// } +// +// TaskId neighborTask2 = Register.Instance().getListeOfTasks() +// .getTaskIdOfRank( neighborRank ) ; +// +// int loop = 0, MAX_LOOP = 1 ; +// boolean ok = false ; +// JaceInterface jaceStub = null ; +// +// while( !ok && loop < MAX_LOOP ) +// { +// try { +// jaceStub = neighborTask2.getHostStub(); +// jaceStub.updateHeart(node.getStub()); +// +// ok = true ; +// } catch( Exception e ) { +// loop++ ; +// +// if( loop < MAX_LOOP ) +// { +// try { +// Thread.sleep( loop * 2000 ) ; +// } catch (InterruptedException e1) {} +// } +// +// if( loop == MAX_LOOP ) +// { +// System.err.println( "Next node unreachable! " + e ) ; +// } +// } +// } +// +// if( loop == MAX_LOOP ) +// { +// +// try { +// node.getStub().suicide2( "Not reachable!" ) ; +// } catch (RemoteException e1) { +// System.err.println( "Unable to suicide the node!\n"+e1 ) ; +// return null ; +// //signalDeadNode( null, neighborTask2.getRank(), 1 ) ; +// } +// +// try { +// centralServer.delGNodeFromList( node, 0, LocalHost.Instance().getIP() ) ; +// } catch( RemoteException e ) { +// System.err.println( "Unable to signal dead node to SuperNode!" ) ; +// e.printStackTrace() ; +// } +// } +// +// } +// +// } else { +// System.out.println( "I didn't receive a new Node !" ) ; +// } + +// return node ; } public void replaceBy(JaceSpawnerInterface oldStub, JaceSpawnerInterface stub) { - int index = spawnersList.indexOf((Object) oldStub); + int index = spawnersList.indexOf((Object) oldStub) ; if (index != -1) - spawnersList.setElementAt(stub, index); + spawnersList.set(index, stub ) ; else - System.err.println("Spawner's stub not foud in spawnersList !"); + System.err.println( "Spawner's stub not foud in spawnersList !" ) ; } public void getNewSpawner(JaceSpawnerInterface previousSpawner) { @@ -508,20 +772,13 @@ public class JaceSpawner { if (centralServer == null) { System.err.println("Central Server not localized !"); } - node = centralServer.getNewNode( LocalHost.Instance().getIP(), null ) ; + node = centralServer.getNewNode( idAlgo, -2 ) ; RunningApplication.Instance() .incrementNumberOfSpawnerDisconnections(); //found = true; } catch (Exception e) { // trouver un autre superNode et lui demander le noeud a lui System.err.println("Super Node not localized !\n " + e); -// System.out.println("pas localise le super node " + e); -// System.out.println("pas localise le super node " + e); -// System.out.println("pas localise le super node " + e); -// System.out.println("pas localise le super node " + e); -// System.out.println("pas localise le super node " + e); -// System.out.println("pas localise le super node " + e); -// System.out.println("pas localise le super node " + e); System.err.println("My IP : " + LocalHost.Instance().getIP()); if (centralServer == null) { System.err.println("CentralServer is NULL !"); @@ -537,8 +794,6 @@ public class JaceSpawner { + LocalHost.Instance().resolve(node.getName()) + ") to replace a dead spawner\n\n"); try { - // Register.Instance().viewAll(); - // Register.Instance().getListeOfTasks().viewAll(); spawnerStub = node.getStub().transformIntoSpawner( params, appliName, @@ -552,10 +807,13 @@ public class JaceSpawner { .getNumberOfDisconnections(), RunningApplication.Instance() .getNumberOfSpawnerDisconnections(), - nbOfDaemonsPerSpawner, nbOfDeamonsPerThread); - spawnersList.setElementAt(spawnerStub, index); + nbOfDaemonsPerSpawner, nbOfDeamonsPerThread + , idAlgo); + spawnersList.set( index, spawnerStub ) ; + + new StartProcessThread(index).start(); - // spawnerStub.startProcess( spawnersList); + } catch (Exception e) { System.err.println("Unable to reach the new spawner: " + e); } @@ -641,6 +899,7 @@ public class JaceSpawner { ListeTask t = Register.Instance().getListeOfTasks(); ScanThreadSpawner.Instance().kill(); HeartBeatSpawner.Instance().kill(); + for (int i = 0; i < s + 1; i++) { new KillThread(i, debut, nbOfDaemonsPerSpawner, @@ -657,17 +916,27 @@ public class JaceSpawner { int nbsdc = RunningApplication.Instance() .getNumberOfSpawnerDisconnections(); + System.out.println("Application finished successfully !"); -// System.out.println("Application finished successfully !!!!!!"); -// System.out.println("Application finished successfully !!!!!!"); -// System.out.println("Application finished successfully !!!!!!"); -// System.out.println("Application finished successfully !!!!!!"); -// System.out.println("Application finished successfully !!!!!!"); -// System.out -// .println("Application finished successfully !!!!!!\n"); + System.out.println("TOTAL TIME in s : " + (finalTime / 1000)); - System.out.println("nb of desconnections: " + nbe); - System.out.println("nb of spawners desconnections: " + nbsdc); + System.out.println("Number of node disconnections: " + nbe); + System.out.println("Number of spawner disconnections: " + nbsdc); + + // ** Tests ** // + String path = "/home/lyon/smiquee/resultats/execTime_"+algo+"_"+nbFault+"_"+test ; + PrintWriter ecrivain = null ; + ecrivain = new PrintWriter( new OutputStreamWriter( new FileOutputStream( path ), "UTF8" ) ) ; + + ecrivain.println( "TOTAL TIME in s : " + (finalTime / 1000)); + ecrivain.println( "Number of node disconnections: " + nbe); + ecrivain.println( "Number of spawner disconnections: " + nbsdc); + ecrivain.println( "DH = "+dh ) ; + + ecrivain.flush() ; + ecrivain.close() ; + + if (JaceDaemon.Instance().isRunning()) { JaceDaemon.Instance().reconnectSuperNode(); @@ -677,12 +946,13 @@ public class JaceSpawner { // purger l'appli RunningApplication.Instance().purge(); - // System.exit(1); } + + /** Suprresion of the mapping algorithm on the SuperNode **/ + centralServer.removeAlgo( idAlgo, 0 ) ; } - } catch (Exception e) { - System.out - .println("w aiiiiiiiiiiiiiirrrr" + e + " index=" + index); + } catch( Exception e ) { + System.err.println( "Error the application nodes scan!\n " + e ) ; z = index; } /* @@ -695,7 +965,7 @@ public class JaceSpawner { * //si mon appli a besoin d'un noeud //sinon, on fait rien if ((nbTasks * - Register.Instance().getSize()) > 0) { cptReplaced = 0; * - * //TODO demander des paquet de nodes, pas qu'un //on scanne toutes les + * //TO DO demander des paquet de nodes, pas qu'un //on scanne toutes les * taches de cette appli for (int ind = 0; ind < tskList.getSize(); * ind++) { //si 1 tache a pas de noeud, on trouve 1 remplacant * @@ -724,7 +994,7 @@ public class JaceSpawner { // Node node = null; // // while (found == false) { // try { -// // TODO : trouver l'erreur !!! +// // TO DO : trouver l'erreur !!! // // msg d'erreur : // // "pas localise le super node java.lang.NullPointerException" // if (centralServer == null) { @@ -763,7 +1033,7 @@ public class JaceSpawner { // // // lui envoyer mon stub pr qu'il commence a me pinguer des // // maintenant -// // TODO a mettre ds un thread ???? +// // TO DO a mettre ds un thread ???? // try { // TaskId neighborTask = Register.Instance().getListeOfTasks() // .getTaskIdOfRank( @@ -775,7 +1045,7 @@ public class JaceSpawner { // // // int is = Register.Instance().existNode(node.getIP()); // int is = Register.Instance().existNode(node); -// // TODO verif pourkoi superNode me le redonne +// // TO DO verif pourkoi superNode me le redonne // // alors qu'il fait deja du calcul // if (is != -1) { // System.out.println("j'ajoute pas le noeud, il y est deja"); @@ -831,7 +1101,7 @@ public class JaceSpawner { .println("JaceP2P_Error in JaceSpawner.exportObject() when creating the local JaceSpawnerServer " + e); // System.err.println("exit ds JaceSpawner.exportObject"); - System.exit(0); + System.exit( 1 ) ; } } @@ -841,6 +1111,9 @@ public class JaceSpawner { boolean connected = false; if (!(superNode_IP == null)) { try { + centralServer = null ; + System.gc() ; + System.out.println("Trying to invoke super node " + superNode_IP); centralServer = (JaceSuperNodeInterface) Naming.lookup("rmi://" @@ -915,47 +1188,75 @@ public class JaceSpawner { // completed with the required number of Daemons // or gets NULL public synchronized void getRegisterOnSuperNode() { - Register registerSpawner = null; - Node noeud = null; - boolean recieved = false; + Register registerSpawner = null ; + Node noeud = null ; + boolean recieved = false ; - System.out.println("Trying to get a Register on the SuperNode"); - int nbExtraSpawners = 0; + idAlgo = LocalHost.Instance().getIP() + ":" + LocalHost.Instance().getPort() ; + + System.out.println( "Trying to get a Register on the SuperNode" ) ; + int nbExtraSpawners = 0 ; if (nbTasks > nbOfDaemonsPerSpawner) { - nbExtraSpawners = (nbTasks - 1) / nbOfDaemonsPerSpawner; + nbExtraSpawners = (nbTasks - 1) / nbOfDaemonsPerSpawner ; } - while (!recieved) { + + /* Test */ + if( algo == 3 ) + { + paramAlgo = nbFault ; + } + + while( !recieved ) + { try { - registerSpawner = centralServer.getRegisterSpawner(LocalHost - .Instance().getIP(), nbTasks, (Task) tache, nbTasks + registerSpawner = centralServer.getRegisterSpawner(idAlgo, + nbTasks, (Task) tache, nbTasks + nbExtraSpawners, algo, paramAlgo); recieved = true; } catch (Exception e) { - System.err - .println("Unable to recieve a register from superNode " - + e); - connectSuperNode(); + System.err.println( "Unable to recieve a register from superNode: " + e ) ; + connectSuperNode() ; } } + + // ** Tests ** // + try { + Algo al = centralServer.getAlgo( idAlgo ) ; + + dh = al.getGrid().getHeterogenityDegre() ; + System.out.println( "### DH = " + dh ) ; + + Utils.writeGraph( al.getGraph(), "/home/lyon/smiquee/resultats/", + "graph_"+algo+"_"+nbFault+"_"+test+".xml" ) ; + Utils.writeGrid( al.getGrid(), "/home/lyon/smiquee/resultats/", + "grid_"+algo+"_"+nbFault+"_"+test+".xml") ; + Utils.writeMapping( al.getMapping(), "/home/lyon/smiquee/resultats/", + "mapping_"+algo+"_"+nbFault+"_"+test+".xml" ) ; + } catch( RemoteException e1 ) { + System.err.println( "Unable to retrieve Algo information in Spawner!" ) ; + e1.printStackTrace() ; + } + if (registerSpawner.getSize() != (nbTasks + nbExtraSpawners)) { System.err.println("I did not recieve enough nodes from superNode!!!! \n killing application !!!!"); for (int i = 0; i < registerSpawner.getSize(); i++) { try { - registerSpawner.getNodeAt(i).getStub().reconnectSuperNode(); + registerSpawner.getNodeAt(i).getStub().suicide( "Not enough nodes for the application" ) ; + } catch (Exception e) { System.err.println("The reserved node was unable to reconnect to the super node"); } } - System.exit(0); + System.exit( 1 ) ; } - spawnersList = new Vector(); + spawnersList = new ArrayList(); for (int i = 0; i < nbExtraSpawners && i < registerSpawner.getSize(); i++) { spawnersList.add(registerSpawner.getNodeAt(0)); -// * nbOfDaemonsPerSpawner)); + registerSpawner.removeNodeOfName(registerSpawner.getNodeAt(0).getName()); -// * nbOfDaemonsPerSpawner)); + } registerSpawner.setNbOfTasks(nbTasks); @@ -967,28 +1268,31 @@ public class JaceSpawner { * nbTasks); } catch(Exception e1) {} */ - if (registerSpawner != null) { - System.out.println("I received the register"); + if( registerSpawner.getSize() > 0 ) + { + System.out.println( "I received the register" ) ; // registerSpawner.setVersion(registerVersion); // registerVersion++; - Register.Instance().replaceBy(registerSpawner); - System.out.println("It contains " + Register.Instance().getSize() - + " Nodes" + " " + nbExtraSpawners + " ExtraSpawners"); + Register.Instance().replaceBy( registerSpawner ) ; + System.out.println( "It contains " + Register.Instance().getSize() + + " Nodes" + " " + nbExtraSpawners + " ExtraSpawners" ) ; // set each Node aliveTime value to the Spawner current time - for (int i = 0; i < Register.Instance().getSize(); i++) { - noeud = Register.Instance().getNodeAt(i); - noeud.setAliveFlag(true); - noeud.setAliveTime(); + for (int i = 0; i < Register.Instance().getSize(); i++) + { + noeud = Register.Instance().getNodeAt( i ) ; + noeud.setAliveFlag( true ) ; + noeud.setAliveTime() ; } } else { - System.err.println("\n---------------WARNING--------------"); - System.err.println("No Daemon available on the SuperNode dispo, try later, please"); - System.exit(0); + System.err.println( "\n---------------WARNING--------------" ) ; + System.err.println( "No Daemon available on the SuperNode, try later, please" ) ; + System.exit( 0 ) ; } } - + + public class TransformThread extends Thread { int i; Node n; @@ -1003,10 +1307,10 @@ public class JaceSpawner { try { System.out.println("Trying to transform the spawner (" + n.getName() + ") of rank " + i); - spawnersList.setElementAt(n.getStub().transformIntoSpawner( + spawnersList.set( i, n.getStub().transformIntoSpawner( params, appliName, Register.Instance(), nbTasks, centralServer, i, heartTime, 0, 0, 0, - nbOfDaemonsPerSpawner, nbOfDeamonsPerThread), i); + nbOfDaemonsPerSpawner, nbOfDeamonsPerThread, idAlgo) ) ; } catch (Exception e) { System.err.println("Error while contacting newly acquired spawner (" + n.getName() + "): " + e); @@ -1018,8 +1322,8 @@ public class JaceSpawner { System.err.println("The Super Node is maybe dead: " + e1) ; for (int z = 0; z < Register.Instance().getSize(); z++) { try { - Register.Instance().getNodeAt(z).getStub() - .reconnectSuperNode(); + Register.Instance().getNodeAt(z).getStub().suicide2("Problem") ; + } catch (Exception ez) { System.err.println("The reserved node was unable to reconnect to the super node: \n" + ez); @@ -1030,6 +1334,95 @@ public class JaceSpawner { } } } + + + public class FaultMake extends Thread { + + public FaultMake(){} + + public void run() + { + ListeTask t = null ; + Random r = null ; + int next, old = -1 ; + + System.out.println( "Starting faults simulation!" ) ; + + while( ! RunningApplication.Instance().isRunning() ) + { + /** Waiting some time the beginning of the computation **/ + try { + sleep( 1000 ) ; + } catch (InterruptedException e) { + } + } + + while( RunningApplication.Instance().isRunning() ) + { + /** Stop too long computation **/ + if( (RunningApplication.Instance().getChrono().getValue() / 1000) > 700 ) + { + try { + Register.Instance().getSpawnerStub().setFinished( true ) ; + } catch (RemoteException e) { + System.err.println( "Unable to stop the too long computation!" ) ; + e.printStackTrace() ; + } + } + + /** Waiting some time ... **/ + try { + sleep( faultTime * 1000 ) ; + } catch( InterruptedException e ) { + } + + /** ... and kill some daemons ! **/ + t = Register.Instance().getListeOfTasks() ; + r = new Random() ; + + for( int i = 0 ; i < nbFault ; i++ ) + { + next = r.nextInt( t.getSize() ) ; + + while( next == old ) + { + next = r.nextInt( t.getSize() ) ; + } + old = next ; + + Node noeud = null ; + try { + TaskId recev = null ; + System.out.println( "Making fault on Task" + next ) ; + + recev = t.getTaskIdOfRank( next ) ; + + JaceInterface stub = recev.getHostStub() ; + System.out.println( "name = " + recev.getHostName() ) ; + noeud = Register.Instance().getNodeOfStub( stub ) ; + + stub.suicide2( "Test fault tolerance" ) ; + + } catch( UnmarshalException ue ) { + } catch( Exception e) { + try { + System.err.println( "Error in FaultMake on node: " + + noeud.getName() + ". " + e ) ; + } catch (Exception e2) { + System.err.println( "(Fault) Error in error:" + e2 ) ; + } + } + + try { + sleep( 500 ) ; + } catch( InterruptedException e ) { + } + } + } + } + } + + public class StartProcessThread extends Thread { int i; @@ -1048,7 +1441,7 @@ public class JaceSpawner { */ // System.out.println("start process on spawner of rank "+i); JaceSpawnerInterface spawnerStub = (JaceSpawnerInterface) spawnersList.get(i); - spawnerStub.startProcess(spawnersList); + spawnerStub.startProcess( spawnersList ) ; } catch (Exception e) { e.printStackTrace(System.out); System.err.println("Unable to start the process on the spawner of rank " @@ -1061,7 +1454,7 @@ public class JaceSpawner { Node n; int i; for (i = 0; i < spawnersList.size(); i++) { - n = (Node) spawnersList.elementAt(i); + n = (Node) spawnersList.get( i ) ; // Register.Instance().getListeOfTasks().viewAll(); // spawnersList.setElementAt(n.getStub().transformIntoSpawner( @@ -1082,13 +1475,24 @@ public class JaceSpawner { for (int j = 0; j < spawnersList.size(); j++) { System.out.println("waiting till transform of spawner " + j + " is finished"); - while ((spawnersList.elementAt(j) instanceof Node)) + while ((spawnersList.get(j) instanceof Node)) + { try { Thread.sleep(20); } catch (Exception e) { } + } } + +// for (int k = 0; k < spawnersList.size(); k++) +// { +// try { +// ((JaceSpawnerInterface) spawnersList.get( k )).setIdAlgo( idAlgo ) ; +// } catch (Exception e) { +// System.err.println("Unable to propagate the mapping algorithm identifier:" + e) ; +// } +// } System.out.println("End Transformation of all spawners. Beginning the computing processes"); @@ -1155,7 +1559,7 @@ public class JaceSpawner { for (int j = count; j < nbTasks; j++) { tsk.addTask(new TaskId(appliName, j, null)); } - System.out.println("in Register, misses " + System.out.println("In Register, misses " + (nbTasks - Register.Instance().getSize()) + " nodes"); } @@ -1260,9 +1664,9 @@ public class JaceSpawner { // Register.Instance().getNode(workerIP).getWorkerStub().reconnectSuperNode(); // stub.reconnectSuperNode(); - stub.suicide("fin d'appli"); + stub.suicide("End of application"); } catch (Exception e) { - System.err.println("can't kill node " + name); + System.err.println("Cannot kill node " + name); } yield(); } @@ -1323,6 +1727,7 @@ public class JaceSpawner { } } + @SuppressWarnings("unchecked") private synchronized void broadcastScanning() { Register reg = Register.Instance(); while (broadcasting == true) @@ -1331,7 +1736,7 @@ public class JaceSpawner { } catch (Exception e) { } // Register.Instance().viewAll(); - Vector nodes = (Vector) Register.Instance().getListOfNodes().clone(); + ArrayList nodes = (ArrayList) Register.Instance().getListOfNodes().clone(); int x = reg.getListeOfTasks().getSize() / nbOfDaemonsPerSpawner; int s; if (rank == x) @@ -1353,43 +1758,50 @@ public class JaceSpawner { } - public Register getRegister(int rank) { + public synchronized Register getRegister( int rank ) { - ListeTask listOfTasks = Register.Instance().getListeOfTasks(); - Vector dependencies = getDependencies(rank, listOfTasks.getSize()); - Register g = new Register(); - ListeTask newListOfTasks = new ListeTask(); - g.setAppliName(Register.Instance().getAppliName()); - g.setParams(Register.Instance().getParams()); - g.setSpawnerStub(Register.Instance().getSpawnerStub()); - g.setNbOfTasks(Register.Instance().getNbOfTasks()); + ListeTask listOfTasks = Register.Instance().getListeOfTasks() ; + ArrayList dependencies = getDependencies( rank, listOfTasks.getSize() ) ; + Register g = new Register() ; + ListeTask newListOfTasks = new ListeTask() ; + + g.setAppliName( Register.Instance().getAppliName() ) ; + g.setParams( Register.Instance().getParams() ) ; + g.setSpawnerStub( Register.Instance().getSpawnerStub() ) ; + g.setNbOfTasks( Register.Instance().getNbOfTasks() ) ; // g.setVersion(reg.getVersion()); - for (int j = 0; j < dependencies.size(); j++) { - TaskId id = listOfTasks.getTaskIdOfRank(((Integer) dependencies - .elementAt(j)).intValue()); - newListOfTasks.addTask(id); - if (id.getHostStub() != null) { - Node noeud = Register.Instance() - .getNodeOfStub(id.getHostStub()); - g.addNode(noeud); + + for( int j = 0 ; j < dependencies.size() ; j++ ) + { + TaskId id = listOfTasks.getTaskIdOfRank( ( (Integer) dependencies + .get(j) ).intValue() ) ; + newListOfTasks.addTask( id ) ; + if( id.getHostStub() != null ) { + Node noeud = Register.Instance().getNodeOfName( id.getHostName() ) ;// (id.getHostStub()); + if( noeud != null ) + { + g.addNode( noeud ) ; + } else { + System.err.println( "PAS BON DU TOUT" ) ; + } } } - g.setListeOfTasks(newListOfTasks); - return g; + g.setListeOfTasks(newListOfTasks) ; + return g ; } private void updateConcernedNodes(int rank, Node oldNode, Node node) { ListeTask listOfTasks = Register.Instance().getListeOfTasks(); - Vector dependencies = getDependencies(rank, listOfTasks.getSize()); - System.out.println("la liste des voisins concernes de : " + rank); + ArrayList dependencies = getDependencies(rank, listOfTasks.getSize()); + System.out.println("List of concerned neighbors of task " + rank+": "); for (int z = 0; z < dependencies.size(); z++) - System.out.print(((Integer) dependencies.elementAt(z)).intValue() + System.out.print(((Integer) dependencies.get(z)).intValue() + " "); System.out.println(); // Register.Instance().setVersion(registerVersion); // registerVersion++; - Register.Instance() - .setSpawnerStub(Register.Instance().getSpawnerStub()); +// Register.Instance() +// .setSpawnerStub(Register.Instance().getSpawnerStub()); int s; if ((dependencies.size() % nbOfDeamonsPerThread) == 0) s = dependencies.size() / nbOfDeamonsPerThread; @@ -1403,19 +1815,22 @@ public class JaceSpawner { } } - private Vector getDependencies(int id, int jaceSize) { + private ArrayList getDependencies( int id, int jaceSize ) + { // get computing dependencies - Vector neighbors = new Vector(); - int[] dep = tache.getDependencies(id); - for (int z = 0; z < taille(dep); z++) - neighbors.add(dep[z]); + ArrayList neighbors = new ArrayList() ; + int[] dep = tache.getDependencies( id ) ; + for( int z = 0 ; z < taille(dep) ; z++ ) + { + neighbors.add( dep[z] ) ; + } // System.out.println("la liste des voisins de calcul de: "+id+" concerne"); // for(int z=0;z nodes; + ArrayList nodes; int nbOfDeamonsPerThread, nbOfDeamonsPerSpawner; - StartScanThread(int i, Vector nodes, int debut) { + StartScanThread(int i, ArrayList nodes, int debut) { this.i = i; this.nodes = nodes; this.debut = debut; @@ -1495,7 +1910,7 @@ class StartScanThread extends Thread { && index < debut + nbOfDeamonsPerSpawner && index < nodes.size(); index++) { - Node node = (Node) nodes.elementAt(index); + Node node = (Node) nodes.get(index); JaceInterface stub = node.getStub(); String name = node.getName(); try { @@ -1504,7 +1919,7 @@ class StartScanThread extends Thread { // System.out.println("modify scanning to "+name); } catch (Exception e) { - System.out.println("unable to modify scanning to " + name + ":" + System.err.println("Unable to modify scanning to " + name + ":" + e); } }