package jaceP2P;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
import java.rmi.Naming;
+import java.rmi.RemoteException;
+import java.rmi.UnmarshalException;
import java.util.Calendar;
import java.util.GregorianCalendar;
+import java.util.Random;
import java.util.Vector;
+import and.Mapping.Algo;
+import and.Mapping.Utils;
+
public class JaceSpawner {
private Class<?> c;
private Loader load;
private Task tache = null;
public static JaceSpawner Instance;
private static String superNode_IP = null;
- private int superNode_port = 1098;
+ private int superNode_port = 1099;
private static int spawnerPort = 1099;
- private static JaceSuperNodeInterface centralServer = null;
+ protected static JaceSuperNodeInterface centralServer = null;
private JaceSpawnerInterface spawnerRef = null;
private int nbTasks;
private String appliName;
private int algo;
private double paramAlgo ;
private String idAlgo ;
+
+ // ** Test ** //
+ private int test;
+ private double dh ;
+ protected int nbFault ;
+ protected int faultTime ;
public JaceSpawner(String superNode, int port, String comProtocol,
String[] args, int nbDaemonPerSpawner, int nbDaemonPerThread,
- int nbSavingNodes, int _algo, double _paramAlgo) {
+ int nbSavingNodes, int _algo, double _paramAlgo, int _test, int _nbF, int _fT) {
// superNode_IP = LocalHost.Instance().resolve(superNode);
algo = _algo;
paramAlgo = _paramAlgo ;
+
+ test = _test ;
+ nbFault = _nbF ;
+ faultTime = _fT ;
+
+
superNode_IP = superNode;
spawnerPort = port;
protocol = comProtocol;
}
idAlgo = _idAlgo ;
- // System.out.println("xxxxxxxxxxxxxxx reg size="+reg.getSize()+" xxxxxxxxxxxxxx");
+
this.appliName = appliName;
this.nbTasks = nbTasks;
RunningApplication.Instance().setRunning(true);
RunningApplication.Instance().setNumberOfDisconnections(nbdc);
RunningApplication.Instance().setNumberOfSpawnerDisconnections(nbsdc);
- // System.out.println("+++++++++++++++++++++++++");
+
Instance = this;
// if(tag==0)
broadcastRegister(1);
ScanThreadSpawner.Instance().start();
broadcastScanning();
-// System.out.println("apres broadcastScanning");
+
new StartScanning().start();
} else {
System.err.println("Cannot find myself in the spawnersList !");
public void initialize() {
// if(protocol.equals("rmi")){
// launch the JaceSpawnerServer
+ if (System.getSecurityManager() == null) {
+ System.setSecurityManager(new SecurityManager());
+ }
+
+// signal = false ;
+
exportObject();
connectSuperNode();
createAppli();
createSpawnerNetwork();
+ // ** Tests ** //
+ new FaultMake().run() ;
// }
}
}
public synchronized void signalDeadNode(JaceInterface host, int rankOfDead) {
- try {
+
+
TaskId myTaskId = null;
int nb = 0;
int nbC = 0;
nbC = RunningApplication.Instance().getNumberOfCouilles();
System.out.println("At time = " + time + "s, NbDisconnection = "
+ nb + ", NbProblem = " + nbC);
+
+ if( host == null )
+ System.err.println( "SIGNAL DE NODE NULL" ) ;
// !!!!!!!!!!!!!!actualiser le ListeTask ds le Register
- myTaskId = Register.Instance().getListeOfTasks()
+ try {
+ myTaskId = Register.Instance().getListeOfTasks()
.getTaskIdOfHostStub(host);
+ } catch( Exception e ) {}
+
if (myTaskId == null) {
- Register.Instance.getListeOfTasks().viewAll();
myTaskId = Register.Instance().getListeOfTasks()
.getTaskIdOfRank(rankOfDead);
+ if( myTaskId == null )
+ {
+ System.err.println("Houston we have a serious problem!!");
+ return ;
+ }
+
JaceInterface deadStub = myTaskId.getHostStub();
- deadStub.suicide("Not doing a good work");
+
+ if( deadStub != null )
+ {
+ try{
+ deadStub.suicide2("Not doing a good work");
+ }catch(Exception e){}
+ } else {
+ System.err.println( "Dead node stub unavailable!" );
+ }
}
- myTaskId.setHostIP(null);
- myTaskId.setHostName(null);
- Node noeud = Register.Instance().getNodeOfStub(
- myTaskId.getHostStub());
- myTaskId.setHostStub(null);
+
+ Node noeud = Register.Instance().getNodeOfName( myTaskId.getHostName() ) ;
+
int rankDeaD = myTaskId.getRank();
- String nomNoeud = noeud.getName();
- // Register.Instance().removeNodeAt(i);
- // Register.Instance().removeNode(host.getIP());
- // System.out.println("fait le remove : taille = " +
- // Register.Instance().getSize());
+ String nomNoeud = "" ;
+
+ if( noeud != null )
+ {
+ nomNoeud = noeud.getName();
+ }
- boolean b = Register.Instance().removeNodeOfName(noeud.getName());
+ boolean b = false ;
+
+ if( ! nomNoeud.equals( "" ) )
+ {
+ b = Register.Instance().removeNodeOfName( nomNoeud ) ;
+ } else {
+ System.err.println( "Dead node name unknown!!" ) ;
+ }
+
+ if( ! b )
+ {
+ b = Register.Instance().removeNode( noeud ) ;
+ }
- if (b == true) {
+ if( b == true )
+ {
System.out.println("Removing Node of rank "
+ rankDeaD + " : size = "
+ Register.Instance().getSize());
System.out.println("At time=" + cal.get(Calendar.MINUTE) + ":"
+ cal.get(Calendar.SECOND));
- // retrouver SI POSSIBLE un autre libre pr remplacer celui la pr
- // cette tache
-
/**** Sébastien Miquée **/
- //Node tmpNode = foundToReplaceThisNode(rankDeaD, nomNoeud);
- Node tmpNode = foundToReplaceThisNode(rankDeaD, nomNoeud, noeud);
+ Node tmpNode = null ;
+ int retry = 0, retryMax = 4 ;
+
+ while( tmpNode == null )
+ {
+ tmpNode = foundToReplaceThisNode(rankDeaD, nomNoeud, noeud);
+
+ if( tmpNode == null )
+ {
+ try{
+ Thread.sleep( 1000 ) ;
+ } catch( Exception e ) {}
+
+ retry++ ;
+
+ if( retry > retryMax )
+ {
+ System.err.println( "Unable to replace the dead node "+nomNoeud ) ;
+ return ;
+ }
+ }
+ }
+
try {
- // broadcastRegister(0);
updateConcernedNodes(rankDeaD, noeud, tmpNode);
Thread.sleep(500);
- System.out.println("Set scanning on %%%%%%");
- tmpNode.getStub().setScanning(true);
+ System.out.println("Set scanning on "+tmpNode.getName());
+ tmpNode.getStub().setScanning( true ) ;
} catch (Exception e) {
- System.err.println("Unable to setScannig on for the new node: "
- + e);
+ try {
+ Thread.sleep( 2000 ) ;
+
+ tmpNode.getStub().setScanning( true ) ;
+ } catch (InterruptedException e1) {
+ } catch (RemoteException e2) {
+ System.err.println( "Unable to setScanning on for the new node: " + e2 ) ;
+ }
+ System.err.println("Unable to setScanning on for the new node: "
+ + e) ;
}
- // Register.Instance().getListeOfTasks().viewAll();
for (int z = 0; z < spawnersList.size(); z++)
+ {
if (!((JaceSpawnerInterface) spawnersList.get(z))
.equals(Register.Instance().getSpawnerStub()))
+ {
try {
((JaceSpawnerInterface) spawnersList.get(z))
.replaceDeamonBy(noeud, tmpNode, rankDeaD);
.println("Unable to broadcast the modifications to all the spawners: "
+ e);
}
- } catch (Exception ee) {
- System.err.println("Error in signalDeadNode() :" + ee);
- }
+ }
+ }
}
// verifie si les noeud notes vivant ds le Register.Instance() du SuperNode
// pr les requisitionner
/*** Sébastien Miquée ***/
-
- //private synchronized Node foundToReplaceThisNode(int theRank, String nom) {
- private synchronized Node foundToReplaceThisNode(int theRank, String nom, Node _n) {
- // int i = 0;
+ private synchronized Node foundToReplaceThisNode(int theRank, String nom, Node _n)
+ {
boolean found = false ;
Node node = null ;
+ int nbNull = 0 ;
while( found == false ) {
try {
-
- //node = centralServer.getNewNode(LocalHost.Instance().getIP());
- node = centralServer.getNewNode( idAlgo, _n);
+ node = null ;
- if( node != null )
+ node = centralServer.getNewNode( idAlgo, theRank ) ;
+
+ if( node != null && ! node.getIP().isEmpty() && node.getIP() != null
+ && ! node.getIP().equals( "" ) )
{
found = true ;
} else {
- Thread.sleep( 1000 ) ;
- System.out.println("Pas de bon retour !");
+ System.err.println("Returned node is null!");
+ nbNull++ ;
+ try {
+ Thread.sleep( 2000 ) ;
+ } catch( Exception e ) {}
}
} catch (Exception e) {
// trouver un autre superNode et lui demander le noeud a lui
connectSuperNode();
}
+
+ if( Register.Instance().existNode( node ) != -1 )
+ {
+ found = false ;
+ }
}
- if (node != null) {
- System.out.println("Using Node " + node.getName() + " ("
- + node.getIP() + ") in order to replace " + nom
- + " size before add: " + Register.Instance().getSize()
- + "\n\n");
+ if( node != null )
+ {
+ System.out.println( "Using Node " + node.getName() + " in order to replace " + nom + "\n" ) ;
+
node.setAliveFlag(true);
node.setAliveTime();
// TODO verif pourkoi superNode me le redonne
// alors qu'il fait deja du calcul
// int is = Register.Instance().existNode(node.getIP());
- int is = Register.Instance().existNode(node);
- if (is != -1) {
+ int is = Register.Instance().existNode( node ) ;
+
+ if( is != -1 )
+ {
System.out.println("The Node is already in the register ! I don't add it.");
System.out.println("Node " + node.getName() + " not added !") ;
node = null;
myTaskId.setHostName(node.getName());
myTaskId.setHostStub(node.getStub());
- // Register.Instance().getListeOfTasks().viewAll();
int neighborRank;
if (theRank == 0)
+ {
neighborRank = Register.Instance().getSize() - 1;
- else
+ } else {
neighborRank = theRank - 1;
+ }
+
TaskId neighborTask2 = Register.Instance().getListeOfTasks()
- .getTaskIdOfRank(neighborRank);
- try {
- JaceInterface jaceStub = neighborTask2.getHostStub();
- jaceStub.updateHeart(node.getStub());
- } catch (Exception e) {
- System.err.println("Next node unreachable ! " + e);
- // node = null;
+ .getTaskIdOfRank( neighborRank ) ;
+
+ int loop = 0, MAX_LOOP = 1 ;
+ boolean ok = false ;
+ JaceInterface jaceStub = null ;
+
+ while( !ok && loop < MAX_LOOP )
+ {
+ try {
+ jaceStub = neighborTask2.getHostStub();
+ jaceStub.updateHeart(node.getStub());
+
+ ok = true ;
+ } catch( Exception e ) {
+ loop++ ;
+
+ if( loop < MAX_LOOP )
+ {
+ try {
+ Thread.sleep( loop * 2000 ) ;
+ } catch (InterruptedException e1) {}
+ }
+
+ if( loop == MAX_LOOP )
+ {
+ System.err.println( "Next node unreachable! " + e ) ;
+ }
+ }
+ }
+
+ if( loop == MAX_LOOP )
+ {
+
+ try {
+ node.getStub().suicide2( "Not reachable!" ) ;
+ } catch (RemoteException e1) {
+ System.err.println( "Unable to suicide the node!\n"+e1 );
+ signalDeadNode( null, neighborTask2.getRank() ) ;
+ }
}
}
if (centralServer == null) {
System.err.println("Central Server not localized !");
}
- node = centralServer.getNewNode( idAlgo, null ) ;
+ node = centralServer.getNewNode( idAlgo, -2 ) ;
RunningApplication.Instance()
.incrementNumberOfSpawnerDisconnections();
//found = true;
} catch (Exception e) {
// trouver un autre superNode et lui demander le noeud a lui
System.err.println("Super Node not localized !\n " + e);
-// System.out.println("pas localise le super node " + e);
-// System.out.println("pas localise le super node " + e);
-// System.out.println("pas localise le super node " + e);
-// System.out.println("pas localise le super node " + e);
-// System.out.println("pas localise le super node " + e);
-// System.out.println("pas localise le super node " + e);
-// System.out.println("pas localise le super node " + e);
System.err.println("My IP : " + LocalHost.Instance().getIP());
if (centralServer == null) {
System.err.println("CentralServer is NULL !");
+ LocalHost.Instance().resolve(node.getName())
+ ") to replace a dead spawner\n\n");
try {
- // Register.Instance().viewAll();
- // Register.Instance().getListeOfTasks().viewAll();
spawnerStub = node.getStub().transformIntoSpawner(
params,
appliName,
, idAlgo);
spawnersList.setElementAt(spawnerStub, index);
-// spawnerStub.setIdAlgo( idAlgo ) ;
-
new StartProcessThread(index).start();
- // spawnerStub.startProcess( spawnersList);
+
} catch (Exception e) {
System.err.println("Unable to reach the new spawner: " + e);
}
.getNumberOfSpawnerDisconnections();
System.out.println("Application finished successfully !");
-// System.out.println("Application finished successfully !!!!!!");
-// System.out.println("Application finished successfully !!!!!!");
-// System.out.println("Application finished successfully !!!!!!");
-// System.out.println("Application finished successfully !!!!!!");
-// System.out.println("Application finished successfully !!!!!!");
-// System.out
-// .println("Application finished successfully !!!!!!\n");
+
System.out.println("TOTAL TIME in s : " + (finalTime / 1000));
System.out.println("nb of desconnections: " + nbe);
System.out.println("nb of spawners desconnections: " + nbsdc);
+
+ // ** Tests ** //
+ String path = "/home/lyon/smiquee/resultats/execTime_CG_"+algo+"_"+test ;
+ PrintWriter ecrivain = null ;
+ ecrivain = new PrintWriter( new OutputStreamWriter( new FileOutputStream( path ), "UTF8" ) ) ;
+
+ ecrivain.println( "TOTAL TIME in s : " + (finalTime / 1000));
+ ecrivain.println( "nb of desconnections: " + nbe);
+ ecrivain.println( "nb of spawners desconnections: " + nbsdc);
+ ecrivain.println( "DH = "+dh ) ;
+
+ ecrivain.flush() ;
+ ecrivain.close() ;
+
+
if (JaceDaemon.Instance().isRunning()) {
JaceDaemon.Instance().reconnectSuperNode();
// purger l'appli
RunningApplication.Instance().purge();
- // System.exit(1);
}
/** Suprresion of the mapping algorithm on the SuperNode **/
boolean connected = false;
if (!(superNode_IP == null)) {
try {
+ centralServer = null ;
+ System.gc() ;
+
System.out.println("Trying to invoke super node "
+ superNode_IP);
centralServer = (JaceSuperNodeInterface) Naming.lookup("rmi://"
Node noeud = null;
boolean recieved = false;
+ idAlgo = LocalHost.Instance().getIP() + ":" + LocalHost.Instance().getPort() ;
+
System.out.println("Trying to get a Register on the SuperNode");
int nbExtraSpawners = 0;
if (nbTasks > nbOfDaemonsPerSpawner) {
}
while (!recieved) {
try {
- registerSpawner = centralServer.getRegisterSpawner(LocalHost
- .Instance().getIP(), nbTasks, (Task) tache, nbTasks
+ registerSpawner = centralServer.getRegisterSpawner(idAlgo,
+ nbTasks, (Task) tache, nbTasks
+ nbExtraSpawners, algo, paramAlgo);
recieved = true;
} catch (Exception e) {
}
}
- idAlgo = LocalHost.Instance().getIP() + ":" + LocalHost.Instance().getPort() ;
+ // ** Tests ** //
+ try {
+ Algo al = centralServer.getAlgo( idAlgo ) ;
+
+ dh = al.getGrid().getHeterogenityDegre() ;
+ System.out.println( "### DH = " + dh ) ;
+
+ Utils.writeGraph( al.getGraph(), "/home/lyon/smiquee/resultats/",
+ "graph_CG_"+algo+"_"+test+".xml" ) ;
+ Utils.writeGrid( al.getGrid(), "/home/lyon/smiquee/resultats/",
+ "grid_CG_"+algo+"_"+test+".xml") ;
+ Utils.writeMapping( al.getMapping(), "/home/lyon/smiquee/resultats/",
+ "mapping_CG_"+algo+"_"+test+".xml" ) ;
+ } catch (RemoteException e1) {
+ System.err.println( "Problème avec Algo dans Spawner !" ) ;
+ e1.printStackTrace();
+ }
if (registerSpawner.getSize() != (nbTasks + nbExtraSpawners)) {
System.err.println("I did not recieve enough nodes from superNode!!!! \n killing application !!!!");
for (int i = 0; i < registerSpawner.getSize(); i++) {
try {
- registerSpawner.getNodeAt(i).getStub().reconnectSuperNode();
+ registerSpawner.getNodeAt(i).getStub().suicide( "Not enough nodes for the application" ) ;
+
} catch (Exception e) {
System.err.println("The reserved node was unable to reconnect to the super node");
}
spawnersList = new Vector<Object>();
for (int i = 0; i < nbExtraSpawners && i < registerSpawner.getSize(); i++) {
spawnersList.add(registerSpawner.getNodeAt(0));
-// * nbOfDaemonsPerSpawner));
+
registerSpawner.removeNodeOfName(registerSpawner.getNodeAt(0).getName());
-// * nbOfDaemonsPerSpawner));
+
}
registerSpawner.setNbOfTasks(nbTasks);
}
-// /**
-// * Set the identifier of the mapping algorithm used.
-// * @param _s The mapping identifier
-// *
-// * @author Sébastien Miquée
-// */
-// public void setIdAlgo( String _s ) throws RemoteException
-// {
-// System.err.println("############# SET ID ALGO ################# "+_s);
-// idAlgo = _s ;
-// }
-
public class TransformThread extends Thread {
int i;
Node n;
System.err.println("The Super Node is maybe dead: " + e1) ;
for (int z = 0; z < Register.Instance().getSize(); z++) {
try {
- Register.Instance().getNodeAt(z).getStub()
- .reconnectSuperNode();
+ Register.Instance().getNodeAt(z).getStub().suicide2("Problem") ;
+
} catch (Exception ez) {
System.err.println("The reserved node was unable to reconnect to the super node: \n"
+ ez);
}
}
}
+
+
+ public class FaultMake extends Thread {
+
+ public FaultMake(){}
+
+ public void run()
+ {
+ ListeTask t = null ;
+ Random r = null ;
+ int next, old = -1 ;
+
+ System.out.println( "Starting fault tolerance !" ) ;
+
+ while( ! RunningApplication.Instance().isRunning() )
+ {
+ /** Waiting some time the beginning of the computation **/
+ try {
+ sleep( 1000 ) ;
+ } catch (InterruptedException e) {
+ }
+ }
+
+ while( RunningApplication.Instance().isRunning() )
+ {
+ /** Waiting some time ... **/
+ try {
+ sleep( faultTime * 1000 ) ;
+ } catch (InterruptedException e) {
+ }
+
+ /** ... and kill some daemons ! **/
+ t = Register.Instance().getListeOfTasks();
+ r = new Random() ;
+
+ for( int i = 0 ; i < nbFault ; i++ )
+ {
+ next = r.nextInt( t.getSize() ) ;
+
+ while( next == old )
+ {
+ next = r.nextInt( t.getSize() ) ;
+ }
+ old = next ;
+
+ Node noeud = null;
+ try {
+ TaskId recev = null;
+ System.out.println("Making fault on Task" + next);
+
+ recev = t.getTaskIdOfRank( next );
+
+ JaceInterface stub = recev.getHostStub();
+ System.out.println("name = " + recev.getHostName());
+ noeud = Register.Instance().getNodeOfStub( stub ) ;
+
+ stub.suicide2( "Test fault tolerance" ) ;
+
+ } catch (UnmarshalException ue ){
+ } catch (Exception e) {
+ try {
+ System.err.println("Error in FaultMake on node: "
+ + noeud.getName() + ". " + e);
+ } catch (Exception e2) {
+ System.err.println("(Fault)Error in error:" + e2);
+ }
+ }
+
+ try {
+ sleep( 500 ) ;
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+ }
+
+
public class StartProcessThread extends Thread {
int i;
// Register.Instance().getNode(workerIP).getWorkerStub().reconnectSuperNode();
// stub.reconnectSuperNode();
- stub.suicide("fin d'appli");
+ stub.suicide("End of application");
} catch (Exception e) {
- System.err.println("can't kill node " + name);
+ System.err.println("Cannot kill node " + name);
}
yield();
}
}
- public Register getRegister(int rank) {
+ @SuppressWarnings("unchecked")
+ public synchronized Register getRegister( int rank ) {
ListeTask listOfTasks = Register.Instance().getListeOfTasks();
- Vector<Integer> dependencies = getDependencies(rank, listOfTasks.getSize());
+ Vector dependencies = getDependencies(rank, listOfTasks.getSize());
Register g = new Register();
ListeTask newListOfTasks = new ListeTask();
+
g.setAppliName(Register.Instance().getAppliName());
g.setParams(Register.Instance().getParams());
g.setSpawnerStub(Register.Instance().getSpawnerStub());
g.setNbOfTasks(Register.Instance().getNbOfTasks());
// g.setVersion(reg.getVersion());
+
for (int j = 0; j < dependencies.size(); j++) {
TaskId id = listOfTasks.getTaskIdOfRank(((Integer) dependencies
.elementAt(j)).intValue());
newListOfTasks.addTask(id);
if (id.getHostStub() != null) {
Node noeud = Register.Instance()
- .getNodeOfStub(id.getHostStub());
- g.addNode(noeud);
+ .getNodeOfName( id.getHostName() ) ;// (id.getHostStub());
+ if( noeud != null )
+ g.addNode( noeud ) ;
+ else
+ System.err.println( "PAS BON DU TOUT" ) ;
}
}
g.setListeOfTasks(newListOfTasks);
return g;
}
+ @SuppressWarnings("unchecked")
private void updateConcernedNodes(int rank, Node oldNode, Node node) {
ListeTask listOfTasks = Register.Instance().getListeOfTasks();
- Vector<?> dependencies = getDependencies(rank, listOfTasks.getSize());
- System.out.println("la liste des voisins concernes de : " + rank);
+ Vector dependencies = getDependencies(rank, listOfTasks.getSize());
+ System.out.println("List of concerned neighbors of task " + rank+": ");
for (int z = 0; z < dependencies.size(); z++)
System.out.print(((Integer) dependencies.elementAt(z)).intValue()
+ " ");
System.out.println();
// Register.Instance().setVersion(registerVersion);
// registerVersion++;
- Register.Instance()
- .setSpawnerStub(Register.Instance().getSpawnerStub());
+// Register.Instance()
+// .setSpawnerStub(Register.Instance().getSpawnerStub());
int s;
if ((dependencies.size() % nbOfDeamonsPerThread) == 0)
s = dependencies.size() / nbOfDeamonsPerThread;