3 import java.io.FileOutputStream;
4 import java.io.OutputStreamWriter;
5 import java.io.PrintWriter;
6 import java.rmi.Naming;
7 import java.rmi.RemoteException;
8 import java.rmi.UnmarshalException;
9 import java.util.ArrayList;
10 import java.util.Calendar;
11 import java.util.GregorianCalendar;
12 import java.util.Random;
13 import java.util.concurrent.Semaphore;
15 import and.Mapping.Algo;
16 import and.Mapping.Utils;
18 public class JaceSpawner {
21 private Task tache = null;
22 public static JaceSpawner Instance;
23 private static String superNode_IP = null;
24 private int superNode_port = 1098;
25 private static int spawnerPort = 1098;
26 protected static JaceSuperNodeInterface centralServer = null;
27 private JaceSpawnerInterface spawnerRef = null;
29 private String appliName;
30 private String[] params = null;
31 @SuppressWarnings("unused")
32 private String protocol;
33 // private int registerVersion=0;
34 final int NB_HEART_DECONNECT = 3;
35 private int heartTime; // frequency of heartBeat
36 @SuppressWarnings("unused")
37 private int timeBeforeKill; // wait 3 non-response of heartBeat before
38 // considering de Daemon as dead
39 private boolean broadcasting = false;
40 @SuppressWarnings("unused")
42 private static int nbOfDaemonsPerSpawner;
43 private static int nbOfDeamonsPerThread;
44 private ArrayList<Object> spawnersList;
46 private int nbSavingNodes;
48 // Variables for Mapping
50 private double paramAlgo ;
51 private String idAlgo ;
53 private Semaphore sema ;
58 protected int nbFault ;
59 protected int faultTime ;
61 public JaceSpawner(String superNode, int port, String comProtocol,
62 String[] args, int nbDaemonPerSpawner, int nbDaemonPerThread,
63 int nbSavingNodes, int _algo, double _paramAlgo, int _test, int _nbF, int _fT) {
64 // superNode_IP = LocalHost.Instance().resolve(superNode);
66 paramAlgo = _paramAlgo ;
73 sema = new Semaphore( 1, true ) ;
75 superNode_IP = superNode;
77 protocol = comProtocol;
78 nbOfDaemonsPerSpawner = nbDaemonPerSpawner;
79 nbOfDeamonsPerThread = nbDaemonPerThread;
80 this.nbSavingNodes = nbSavingNodes;
83 // if less than 2 params (nb of tasks and name of the appli), error
84 System.err.println( "Parameters error !" ) ;
88 nbTasks = new Integer(args[0]).intValue(); // nb of tasks
91 } catch (Exception e) {
92 System.err.println("Number format exception :" + e ) ;
95 appliName = args[1]; // name of the class to launch
96 if (args.length > 2) { // get the eventual param of the appli
97 params = new String[args.length - 2];
98 for (int i = 0; i < params.length; i++) {
99 params[i] = args[2 + i];
103 c = load.load(appliName);
105 tache = ((Task) c.newInstance());
106 tache.setParam(params);
107 tache.setJaceSize(nbTasks);
109 // ****************//
111 } catch (Exception e) {
112 System.err.println( "Unable to instantiate the class " + e ) ;
122 public JaceSpawner(String[] params, String appliName, Register reg,
123 int nbTasks, JaceSuperNodeInterface snodeStub, int rank,
124 int heartTime, int tag, int nbdc, int nbsdc,
125 int nbDaemonPerSpawner, int nbDaemonPerThread, String _idAlgo) {
127 nbOfDaemonsPerSpawner = nbDaemonPerSpawner;
128 nbOfDeamonsPerThread = nbDaemonPerThread;
129 if (params.length != 0) {
130 this.params = new String[params.length];
131 for (int i = 0; i < params.length; i++)
132 this.params[i] = params[i];
135 System.err.println( "There is no parameter !" ) ;
137 } catch (Exception e) {
138 System.err.println("Error in copying the parameters: " + e ) ;
143 sema = new Semaphore( 1, true ) ;
145 this.appliName = appliName;
147 this.nbTasks = nbTasks;
148 this.heartTime = heartTime;
149 LocalHost.Instance().setSuperNodeStub(snodeStub);
150 centralServer = snodeStub;
152 Register.Instance().replaceBy(reg);
153 Register.Instance().setSpawnerStub(this.spawnerRef);
154 Register.Instance().getListeOfTasks().viewAll();
158 c = load.load(appliName);
160 tache = ((Task) c.newInstance());
161 tache.setParam(params);
162 tache.setJaceSize(nbTasks);
163 // ****************//
165 } catch (Exception e) {
166 System.err.println("Unable to instantiate the class " + e);
168 RunningApplication.Instance().getChrono().start();
170 RunningApplication.Instance().setName(appliName);
171 RunningApplication.Instance().setNbTasks(nbTasks);
172 RunningApplication.Instance().setRunning(true);
173 RunningApplication.Instance().setNumberOfDisconnections(nbdc);
174 RunningApplication.Instance().setNumberOfSpawnerDisconnections(nbsdc);
178 broadcastRegister(1);
181 * x=Register.Instance().getListeOfTasks().getSize()/nbOfDaemonsPerSpawner
182 * ; int s; if(rank==x)
183 * s=(reg.getListeOfTasks().getSize()%nbOfDaemonsPerSpawner
184 * )/nbOfDeamonsPerThread; else
185 * s=nbOfDaemonsPerSpawner/nbOfDeamonsPerThread;
187 * int debut=nbOfDaemonsPerSpawnerrank;
190 * for(int i=0;i<s+1;i++){
192 * new BroadcastSpawner(i,
193 * debut,nbOfDaemonsPerSpawner,nbOfDeamonsPerThread).start(); }
197 System.out.println("########################");
200 public synchronized static JaceSpawner Instance() {
204 public int getNbOfDeamonsPerThread() {
205 return nbOfDeamonsPerThread;
208 public int getNbOfDeamonsPerSpawner() {
209 return nbOfDaemonsPerSpawner;
212 public void startProcess(ArrayList<Object> spawnersList) {
213 this.spawnersList = spawnersList;
215 int is = spawnersList.indexOf((Object) Register.Instance()
220 if (is == spawnersList.size() - 1)
223 nextNeighbour = is + 1;
225 * while((spawnersList.elementAt(nextNeighbour) instanceof Node))
227 * System.out.println("waiting till transform of spawner "+nextNeighbour
228 * +" is finished, for setServer"); Thread.sleep(20);
229 * }catch(Exception e1){}
231 HeartBeatSpawner.Instance().setServer(
232 (JaceSpawnerInterface) spawnersList.get(nextNeighbour));
233 HeartBeatSpawner.Instance().setHeartTime(heartTime);
234 HeartBeatSpawner.Instance().start();
235 int previousNeighbour;
237 previousNeighbour = spawnersList.size() - 1;
239 previousNeighbour = is - 1;
240 ScanThreadSpawner.Instance().setHeartTime(heartTime);
241 ScanThreadSpawner.Instance().setServer(
242 (JaceSpawnerInterface) spawnersList.get(previousNeighbour));
243 ScanThreadSpawner.Instance().start();
247 new StartScanning().start();
249 System.err.println("Cannot find myself in the spawnersList !");
254 public void setBroadcasting(boolean bool) {
258 public void initialize() {
259 // if(protocol.equals("rmi")){
260 // launch the JaceSpawnerServer
261 if (System.getSecurityManager() == null) {
262 System.setSecurityManager(new SecurityManager());
271 // get a Register on the Super Node
272 // completed with the required number of Daemons
273 getRegisterOnSuperNode();
276 createSpawnerNetwork();
279 new FaultMake().run() ;
283 public void startScanning() {
285 long time = RunningApplication.Instance().getChrono().getValue() / 1000;
286 System.out.println("Start scanning at time: " + time + "s");
287 // lancer le chrono qui gere les heart beat
288 while (RunningApplication.Instance().isRunning() == true) {
289 // 1 etape : scaner tous les "heartTime" milisecondes si les noeuds
290 // enregistes sont encore vivants
291 // res = scanConnectedHosts();
293 // 2 etape : a garder ou pas !!!!! regarder si l'appli est en
294 // attente de noeud pr lui en attribuer 1 nvx
297 Thread.sleep(heartTime);
298 } catch (Exception e) {
301 // /System.out.println("is running = false");
302 if (!JaceDaemon.Instance().isRunning())
306 public synchronized void signalDeadNode(JaceInterface host, int rankOfDead )
311 } catch (InterruptedException e3) {
312 System.err.println( "Problem while acquiring the semaphore in signalDeadNode!" ) ;
313 e3.printStackTrace() ;
317 TaskId myTaskId = null;
321 RunningApplication.Instance().incrementNumberOfDisconnections();
323 time = RunningApplication.Instance().getChrono().getValue() / 1000;
324 nb = RunningApplication.Instance().getNumberOfDisconnections();
325 nbC = RunningApplication.Instance().getNumberOfProblems();
326 System.out.println( "At time = " + time + "s, NbDisconnection = "
327 + nb + ", NbProblem = " + nbC ) ;
330 System.err.println( "Signal of a node null!" ) ;
332 // !!!!!!!!!!!!!!actualiser le ListeTask ds le Register
334 myTaskId = Register.Instance().getListeOfTasks()
335 .getTaskIdOfHostStub( host ) ;
336 } catch( Exception e ) {}
338 if (myTaskId == null)
340 myTaskId = Register.Instance().getListeOfTasks()
341 .getTaskIdOfRank( rankOfDead ) ;
342 if( myTaskId == null )
344 System.err.println( "Houston we have a serious problem!!" ) ;
349 JaceInterface deadStub = myTaskId.getHostStub() ;
351 if( deadStub != null )
354 deadStub.suicide2( "Not doing a good work" ) ;
355 }catch(Exception e){}
357 System.err.println( "Dead node stub unavailable!" ) ;
361 Node noeud = Register.Instance().getNodeOfName( myTaskId.getHostName() ) ;
363 int rankDeaD = myTaskId.getRank() ;
365 String nomNoeud = "" ;
369 nomNoeud = noeud.getName() ;
374 if( ! nomNoeud.equals( "" ) )
376 b = Register.Instance().removeNodeOfName( nomNoeud ) ;
378 System.err.println( "Dead node name unknown!!" ) ;
383 b = Register.Instance().removeNode( noeud ) ;
388 System.out.println( "Removing Node of rank "
389 + rankDeaD + " : size = "
390 + Register.Instance().getSize() ) ;
392 System.err.println( "Cannot remove the Node, it doesn't exist anymore: size = "
393 + Register.Instance().getSize() ) ;
396 Calendar cal = new GregorianCalendar() ;
397 System.out.println( "At time=" + cal.get(Calendar.MINUTE) + ":"
398 + cal.get(Calendar.SECOND) ) ;
400 /**** Sébastien Miquée **/
401 Node tmpNode = null ;
403 int retry = 0, retryMax = 4 ;
405 while( tmpNode == null && ! ok )
409 while( tmpNode == null )
411 tmpNode = foundToReplaceThisNode( rankDeaD, nomNoeud, noeud ) ;
413 if( tmpNode == null )
415 System.err.println( "I didn't receive a new Node! (" + retry + ")" ) ;
418 Thread.sleep( 1000 ) ;
419 } catch( Exception e ) {}
423 if( retry > retryMax )
425 System.err.println( "Unable to replace the dead node " + nomNoeud ) ;
432 System.out.println( "Using Node " + tmpNode.getName() + " in order to replace " + nomNoeud + "\n" ) ;
434 tmpNode.setAliveFlag( true ) ;
435 tmpNode.setAliveTime() ;
436 tmpNode.setAppliName( RunningApplication.Instance().getName() ) ;
438 // Replacing the node in the Register
439 int is = Register.Instance().existNode( tmpNode ) ;
443 System.out.println( "The Node is already in the register! I don't add it." ) ;
444 System.out.println( "Node " + tmpNode.getName() + " not added!" ) ;
447 Register.Instance().addNode( tmpNode ) ;
449 myTaskId = Register.Instance().getListeOfTasks().getTaskIdOfRank( rankDeaD ) ;
450 myTaskId.setHostIP( tmpNode.getIP() ) ;
451 myTaskId.setHostName( tmpNode.getName() ) ;
452 myTaskId.setHostStub( tmpNode.getStub() ) ;
458 neighborRank = Register.Instance().getSize() - 1 ;
460 neighborRank = rankDeaD - 1 ;
463 TaskId neighborTask2 = Register.Instance().getListeOfTasks()
464 .getTaskIdOfRank( neighborRank ) ;
466 int loop = 0, MAX_LOOP = 1 ;
467 boolean ook = false ;
468 JaceInterface jaceStub = null ;
470 while( !ook && loop < MAX_LOOP )
473 jaceStub = neighborTask2.getHostStub() ;
474 jaceStub.updateHeart( tmpNode.getStub() ) ;
477 } catch( Exception e ) {
480 if( loop < MAX_LOOP )
483 Thread.sleep( loop * 2000 ) ;
484 } catch( InterruptedException e1 ) {}
487 if( loop == MAX_LOOP )
489 System.err.println( "Next node unreachable! " + e ) ;
494 if( loop == MAX_LOOP && ! ok )
498 jaceStub.suicide2( "Not reachable!" ) ;
499 } catch (RemoteException e1) {
500 System.err.println( "Unable to suicide the node!\n"+e1 ) ;
504 int pos = Register.Instance().existNodeOfStub( jaceStub ) ;
508 centralServer.delGNodeFromList( Register.Instance().getNodeAt( pos ), 0, LocalHost.Instance().getIP() ) ;
510 System.err.println( "The ping neighbor of " + tmpNode.getName() + " does not respond. It is probably dead ..." ) ;
512 } catch( RemoteException e ) {
513 System.err.println( "Unable to signal dead node to SuperNode!" ) ;
514 e.printStackTrace() ;
520 updateConcernedNodes( rankDeaD, noeud, tmpNode ) ;
522 Thread.sleep( 500 ) ;
523 tmpNode.getStub().setScanning( true ) ;
524 System.out.println( "Set scanning on " + tmpNode.getName() ) ;
525 } catch( Exception e ) {
527 Thread.sleep( 2000 ) ;
529 tmpNode.getStub().setScanning( true ) ;
530 } catch( InterruptedException e1 ) {
531 } catch( RemoteException e2 ) {
532 System.err.println( "Unable to setScanning on for the new node: " + e2 ) ;
537 for( int z = 0 ; z < spawnersList.size() ; z++ )
539 if( !( (JaceSpawnerInterface) spawnersList.get( z ) )
540 .equals(Register.Instance().getSpawnerStub() ) )
543 ((JaceSpawnerInterface) spawnersList.get(z))
544 .replaceDeamonBy(noeud, tmpNode, rankDeaD);
546 } catch (Exception e) {
547 System.err.println( "Unable to broadcast the modifications to all the spawners: "
558 // verifie si les noeud notes vivant ds le Register.Instance() du SuperNode
560 // retourne 0 si erreur, 1 sinon
562 * private synchronized int scanConnectedHosts() { long time = 0; Node host;
563 * Node tmpNode; long workerTime; long currentTime; int rank; int restempo;
564 * int nb = 0; int nbC = 0; boolean changed = false; int index=0; try{
565 * JaceSpawnerInterface spawnerStub=Register.Instance().getSpawnerStub();
566 * if(spawnerStub.getFinished()==true){
567 * System.out.println("nbre de taches="+Register.Instance().getSize());
568 * ListeTask t=Register.Instance().getListeOfTasks();
569 * for(index=z;index<t.getSize();index++){ TaskId recev = null;
570 * System.out.println("deleting Task************"+index);
572 * recev = t.get(index); JaceInterface stub=recev.getHostStub();
573 * spawnerStub.killApplication(stub); }
577 * } }catch(Exception e){
578 * System.out.println("w aiiiiiiiiiiiiiirrrr"+e+" index="+index); z=index;
581 * if (Register.Instance().getSize() == 0) {
582 * System.out.println("aucun noeuds a scanner");
583 * RunningApplication.Instance().purge(); System.exit(0);
590 // trouver un noeud sur les superNode
591 // pr les requisitionner
593 /*** Sébastien Miquée ***/
594 private synchronized Node foundToReplaceThisNode(int theRank, String nom, Node _n)
596 boolean found = false ;
600 while( found == false ) {
604 if( centralServer == null )
606 centralServer = LocalHost.Instance().getSuperNodeStub() ;
608 if( centralServer == null )
610 System.err.println( "Connection lost to SuperNode!" ) ;
611 System.err.println( "Reconnecting..." ) ;
616 node = centralServer.getNewNode( idAlgo, theRank ) ;
618 if( node != null && ! node.getIP().isEmpty() && node.getIP() != null
619 && ! node.getIP().equals( "" ) )
621 if( Register.Instance().existNode( node ) != -1 )
628 System.err.println( "Returned node is null!" ) ;
631 Thread.sleep( 2000 ) ;
632 } catch( Exception e ) {}
634 } catch (RemoteException e) {
635 // trouver un autre superNode et lui demander le noeud a lui
636 System.err.println( "Cannot localize SuperNode ! " + e ) ;
643 // if( node != null )
645 // System.out.println( "Using Node " + node.getName() + " in order to replace " + nom + "\n" ) ;
647 // node.setAliveFlag(true);
648 // node.setAliveTime();
650 // // rajouter le noeud ds le Register
651 // node.setAppliName(RunningApplication.Instance().getName());
653 // // lui envoyer mon stub pr qu'il commence a me pinguer des
655 // // TODO a mettre ds un thread ????
659 // * neighborTask=Register.Instance().getListeOfTasks().getTaskIdOfRank
660 // * ((theRank+1)%Register.Instance().getListeOfTasks().getSize());
661 // * try{ node.getStub().updateHeart(neighborTask.getHostStub()); }
662 // * catch(Exception e) {
663 // * System.out.println("nvx noeud deja plu dispo2"); //node = null; }
665 // // TODO verif pourkoi superNode me le redonne
666 // // alors qu'il fait deja du calcul
667 // // int is = Register.Instance().existNode(node.getIP());
668 // int is = Register.Instance().existNode( node ) ;
672 // System.out.println("The Node is already in the register ! I don't add it.");
673 // System.out.println("Node " + node.getName() + " not added !") ;
676 // Register.Instance().addNode(node);
678 // // !!!!!!!!!!!!!!actualiser le ListeTask
679 // TaskId myTaskId = Register.Instance().getListeOfTasks()
680 // .getTaskIdOfRank(theRank);
681 // myTaskId.setHostIP(node.getIP());
682 // myTaskId.setHostName(node.getName());
683 // myTaskId.setHostStub(node.getStub());
688 // neighborRank = Register.Instance().getSize() - 1;
690 // neighborRank = theRank - 1;
693 // TaskId neighborTask2 = Register.Instance().getListeOfTasks()
694 // .getTaskIdOfRank( neighborRank ) ;
696 // int loop = 0, MAX_LOOP = 1 ;
697 // boolean ok = false ;
698 // JaceInterface jaceStub = null ;
700 // while( !ok && loop < MAX_LOOP )
703 // jaceStub = neighborTask2.getHostStub();
704 // jaceStub.updateHeart(node.getStub());
707 // } catch( Exception e ) {
710 // if( loop < MAX_LOOP )
713 // Thread.sleep( loop * 2000 ) ;
714 // } catch (InterruptedException e1) {}
717 // if( loop == MAX_LOOP )
719 // System.err.println( "Next node unreachable! " + e ) ;
724 // if( loop == MAX_LOOP )
728 // node.getStub().suicide2( "Not reachable!" ) ;
729 // } catch (RemoteException e1) {
730 // System.err.println( "Unable to suicide the node!\n"+e1 ) ;
732 // //signalDeadNode( null, neighborTask2.getRank(), 1 ) ;
736 // centralServer.delGNodeFromList( node, 0, LocalHost.Instance().getIP() ) ;
737 // } catch( RemoteException e ) {
738 // System.err.println( "Unable to signal dead node to SuperNode!" ) ;
739 // e.printStackTrace() ;
746 // System.out.println( "I didn't receive a new Node !" ) ;
752 public void replaceBy(JaceSpawnerInterface oldStub,
753 JaceSpawnerInterface stub) {
754 int index = spawnersList.indexOf((Object) oldStub) ;
756 spawnersList.set(index, stub ) ;
758 System.err.println( "Spawner's stub not foud in spawnersList !" ) ;
761 public void getNewSpawner(JaceSpawnerInterface previousSpawner) {
762 //boolean found = false;
765 JaceSpawnerInterface spawnerStub = null;
767 // while (found == false) {
769 // TODO : trouver l'erreur !!!
771 // "pas localise le super node java.lang.NullPointerException"
772 if (centralServer == null) {
773 System.err.println("Central Server not localized !");
775 node = centralServer.getNewNode( idAlgo, -2 ) ;
776 RunningApplication.Instance()
777 .incrementNumberOfSpawnerDisconnections();
779 } catch (Exception e) {
780 // trouver un autre superNode et lui demander le noeud a lui
781 System.err.println("Super Node not localized !\n " + e);
782 System.err.println("My IP : " + LocalHost.Instance().getIP());
783 if (centralServer == null) {
784 System.err.println("CentralServer is NULL !");
790 index = spawnersList.indexOf((Object) previousSpawner);
792 System.out.println("Using Node " + node.getName()
794 + LocalHost.Instance().resolve(node.getName())
795 + ") to replace a dead spawner\n\n");
797 spawnerStub = node.getStub().transformIntoSpawner(
806 RunningApplication.Instance()
807 .getNumberOfDisconnections(),
808 RunningApplication.Instance()
809 .getNumberOfSpawnerDisconnections(),
810 nbOfDaemonsPerSpawner, nbOfDeamonsPerThread
812 spawnersList.set( index, spawnerStub ) ;
815 new StartProcessThread(index).start();
817 } catch (Exception e) {
818 System.err.println("Unable to reach the new spawner: " + e);
820 for (int j = 0; j < spawnersList.size(); j++)
822 if (!((JaceSpawnerInterface) spawnersList.get(j))
823 .equals(Register.Instance().getSpawnerStub())
824 && !((JaceSpawnerInterface) spawnersList.get(j))
825 .equals(spawnerStub)) {
827 .println("Trying to broadcast to spawner of rank "
830 ((JaceSpawnerInterface) spawnersList.get(j))
831 .replaceBy(previousSpawner, spawnerStub);
833 } catch (Exception e) {
835 .println("Unable to broadcast to spawner of rank: "
836 + j + ". Error:" + e);
838 ScanThreadSpawner.Instance().setServer(spawnerStub);
842 previous = spawnersList.size() - 1;
844 previous = index - 1;
846 ((JaceSpawnerInterface) spawnersList.get(previous))
847 .updateHeart(spawnerStub);
848 } catch (Exception e) {
850 .println("Unable to change the server of the heartbeatThread for the previous node of rank "
851 + previous + ". error:" + e);
855 System.err.println("Node is null !");
860 public void broadcastFinished(boolean bool) {
861 for (int i = 0; i < spawnersList.size(); i++)
863 ((JaceSpawnerInterface) spawnersList.get(i)).setFinished(bool);
864 } catch (Exception e) {
866 .println("Unable to propagate the end of the application :"
871 private synchronized void scanAppliNodes() {
874 //ListeTask tskList = null;
878 JaceSpawnerInterface spawnerStub = Register.Instance()
880 if (spawnerStub.getFinished() == true) {
881 System.out.println("Number of tasks ="
882 + Register.Instance().getSize());
884 int x = Register.Instance().getListeOfTasks().getSize()
885 / nbOfDaemonsPerSpawner;
888 s = (Register.Instance().getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
889 / nbOfDeamonsPerThread;
891 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
893 int debut = nbOfDaemonsPerSpawner * rank;
895 // for(int i=debut;i<nbOfDaemonsPerSpawner*(rank+1) &&
896 // i<reg.getSize();i++)
897 // System.out.println(((Node)nodes.elementAt(i)).getName());
899 ListeTask t = Register.Instance().getListeOfTasks();
900 ScanThreadSpawner.Instance().kill();
901 HeartBeatSpawner.Instance().kill();
903 for (int i = 0; i < s + 1; i++) {
905 new KillThread(i, debut, nbOfDaemonsPerSpawner,
906 nbOfDeamonsPerThread, t).start();
911 long finalTime = RunningApplication.Instance().getChrono()
914 int nbe = RunningApplication.Instance()
915 .getNumberOfDisconnections();
917 int nbsdc = RunningApplication.Instance()
918 .getNumberOfSpawnerDisconnections();
920 System.out.println("Application finished successfully !");
922 System.out.println("TOTAL TIME in s : " + (finalTime / 1000));
923 System.out.println("Number of node disconnections: " + nbe);
924 System.out.println("Number of spawner disconnections: " + nbsdc);
927 String path = "/home/lyon/smiquee/resultats/execTime_"+algo+"_"+nbFault+"_"+test ;
928 PrintWriter ecrivain = null ;
929 ecrivain = new PrintWriter( new OutputStreamWriter( new FileOutputStream( path ), "UTF8" ) ) ;
931 ecrivain.println( "TOTAL TIME in s : " + (finalTime / 1000));
932 ecrivain.println( "Number of node disconnections: " + nbe);
933 ecrivain.println( "Number of spawner disconnections: " + nbsdc);
934 ecrivain.println( "DH = "+dh ) ;
940 if (JaceDaemon.Instance().isRunning()) {
941 JaceDaemon.Instance().reconnectSuperNode();
943 RunningApplication.Instance().purge();
948 RunningApplication.Instance().purge();
951 /** Suprresion of the mapping algorithm on the SuperNode **/
952 centralServer.removeAlgo( idAlgo, 0 ) ;
954 } catch( Exception e ) {
955 System.err.println( "Error the application nodes scan!\n " + e ) ;
959 * if (Register.Instance().getSize() == 0) {
960 * System.out.println("aucun noeuds a scanner");
961 * RunningApplication.Instance().purge(); System.exit(0); return 0;
963 * } else{ tskList = Register.Instance().getListeOfTasks();
965 * //si mon appli a besoin d'un noeud //sinon, on fait rien if ((nbTasks
966 * - Register.Instance().getSize()) > 0) { cptReplaced = 0;
968 * //TO DO demander des paquet de nodes, pas qu'un //on scanne toutes les
969 * taches de cette appli for (int ind = 0; ind < tskList.getSize();
970 * ind++) { //si 1 tache a pas de noeud, on trouve 1 remplacant
972 * //if (tskList.get(ind).getHostIP() == null) { if
973 * (tskList.get(ind).getHostStub() == null) { rank =
974 * tskList.get(ind).getRank(); node = foundToReplaceThisNodeTMP(rank);
975 * if (node != null) { cptReplaced++; }
979 * //qd fini de scanner taches, envoyer Register //si remplacement de
980 * noeud (c a d si Register modifier) if (cptReplaced != 0) {
981 * broadcastRegister(0); } try { Thread.currentThread().yield(); } catch
984 * }// fin if(appli.getNeededNodes() > 0) {
985 * //System.out.println("SCAN APPLI : taille : " +
986 * Register.Instance().getSize()); return 1; }
990 // @SuppressWarnings("unused")
991 // private synchronized Node foundToReplaceThisNodeTMP(int theRank) {
993 // boolean found = false;
995 // // while (found == false) {
997 // // TO DO : trouver l'erreur !!!
999 // // "pas localise le super node java.lang.NullPointerException"
1000 // if (centralServer == null) {
1001 // System.out.println("centralServer est NUUUUUUUUULL");
1003 // node = centralServer.getNewNode(LocalHost.Instance().getIP());
1006 // } catch (Exception e) {
1007 // // trouver un autre superNode et lui demander le noeud a lui
1008 // System.out.println("TMP pas localise le super node " + e);
1009 // System.out.println("TMP pas localise le super node " + e);
1010 // System.out.println("TMP pas localise le super node " + e);
1011 // System.out.println("TMP pas localise le super node " + e);
1012 // System.out.println("TMP pas localise le super node " + e);
1013 // System.out.println("TMP pas localise le super node " + e);
1014 // System.out.println("TMP pas localise le super node " + e);
1015 // System.out.println("mon IP : " + LocalHost.Instance().getIP());
1016 // if (centralServer == null) {
1017 // System.out.println("centralServer : NULL");
1019 // connectSuperNode();
1022 // if (node != null) {
1023 // System.out.println("COOOOOOOOOOOOOOOOOOOOOOL : requisition de "
1024 // + node.getName() + " taille avt add: "
1025 // + Register.Instance().getSize() + "\n\n");
1026 // node.setAliveFlag(true);
1027 // node.setAliveTime();
1029 // // rajouter le noeud ds le Register
1030 // System.out.println("ds Register, manque "
1031 // + (nbTasks - Register.Instance().getSize()));
1032 // node.setAppliName(RunningApplication.Instance().getName());
1034 // // lui envoyer mon stub pr qu'il commence a me pinguer des
1036 // // TO DO a mettre ds un thread ????
1038 // TaskId neighborTask = Register.Instance().getListeOfTasks()
1039 // .getTaskIdOfRank(
1041 // % Register.Instance().getListeOfTasks()
1043 // node.getStub().updateHeart(neighborTask.getHostStub());
1044 // // node.getStub().updateHeart(this.spawnerRef);
1046 // // int is = Register.Instance().existNode(node.getIP());
1047 // int is = Register.Instance().existNode(node);
1048 // // TO DO verif pourkoi superNode me le redonne
1049 // // alors qu'il fait deja du calcul
1051 // System.out.println("j'ajoute pas le noeud, il y est deja");
1052 // System.out.println("PAS AJOUTEE TMP " + node.getName());
1053 // System.out.println("PAS AJOUTEE TMP " + node.getName());
1054 // System.out.println("PAS AJOUTEE TMP " + node.getName());
1055 // System.out.println("PAS AJOUTEE TMP " + node.getName());
1056 // System.out.println("PAS AJOUTEE TMP " + node.getName());
1059 // Register.Instance().addNode(node);
1061 // // !!!!!!!!!!!!!!actualiser le ListeTask
1062 // TaskId myTaskId = Register.Instance().getListeOfTasks()
1063 // .getTaskIdOfRank(theRank);
1064 // myTaskId.setHostIP(node.getIP());
1065 // myTaskId.setHostName(node.getName());
1066 // myTaskId.setHostStub(node.getStub());
1067 // // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostIP(node.getIP());
1068 // // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostName(node.getName());
1069 // // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostStub(node.getStub());
1071 // } catch (Exception e) {
1072 // System.out.println("nvx noeud deja plu dispo");
1076 // System.out.println("RADINNNNNNNNNNNNNN TMP ");
1081 private void exportObject() {
1083 JaceSpawnerServer spawnerServer = null;
1085 System.out.println("Name of local machine is: "
1086 + LocalHost.Instance().getName());
1087 System.out.println("IP of local machine is: "
1088 + LocalHost.Instance().getIP());
1090 // launch the JaceSpawnerServer
1091 spawnerServer = new JaceSpawnerServer();
1092 java.rmi.registry.LocateRegistry.createRegistry(spawnerPort);
1093 java.rmi.registry.LocateRegistry.getRegistry(spawnerPort).rebind(
1094 "JaceSpawnerServer", spawnerServer);
1095 spawnerRef = (JaceSpawnerInterface) Naming.lookup("rmi://"
1096 + LocalHost.Instance().getIP() + ":" + spawnerPort
1097 + "/JaceSpawnerServer");
1099 } catch (Exception e) {
1101 .println("JaceP2P_Error in JaceSpawner.exportObject() when creating the local JaceSpawnerServer "
1103 // System.err.println("exit ds JaceSpawner.exportObject");
1109 public void connectSuperNode() {
1110 System.out.println("I'm looking for a super node");
1111 boolean connected = false;
1112 if (!(superNode_IP == null)) {
1114 centralServer = null ;
1117 System.out.println("Trying to invoke super node "
1119 centralServer = (JaceSuperNodeInterface) Naming.lookup("rmi://"
1120 + superNode_IP + ":" + superNode_port
1121 + "/JaceSuperNode");
1122 System.out.println("Succesfully located " + superNode_IP);
1124 // add stub and IP in LocalHost to store it until super node
1126 LocalHost.Instance().setSuperNodeStub(centralServer);
1127 LocalHost.Instance().setSuperNodeIP(superNode_IP);
1128 heartTime = centralServer.getSuperNodeBeat();
1129 timeBeforeKill = NB_HEART_DECONNECT * heartTime;
1132 } catch (Exception e) {
1133 System.err.println("Super Node not accessible, try another one (1/2s)");
1136 } catch (Exception e1) {
1141 if (connected == false) {
1143 SuperNodeListe.Instance().staticInitialization();
1144 while (connected == false
1145 && i < SuperNodeListe.Instance().getListe().size()) {
1146 SuperNodeData d = null;
1147 d = SuperNodeListe.Instance().getSuperNodeData(i);
1149 superNode_IP = LocalHost.Instance().resolve(d.getIP());
1150 superNode_port = d.getPort();
1151 // superNode_port = d.getPort();
1153 System.out.println("Trying to invoke Super Node "
1155 centralServer = (JaceSuperNodeInterface) Naming
1156 .lookup("rmi://" + superNode_IP + ":"
1157 + superNode_port + "/JaceSuperNode");
1158 System.out.println("Succesfully located SuperNode "
1160 LocalHost.Instance().setSuperNodeStub(centralServer);
1161 LocalHost.Instance().setSuperNodeIP(superNode_IP);
1162 heartTime = centralServer.getSuperNodeBeat();
1163 timeBeforeKill = NB_HEART_DECONNECT * heartTime;
1166 } catch (Exception e) {
1168 .println("SuperNode "
1170 + " not accessible, trying to locate another one in 0.5s\n");
1174 } catch (Exception e1) {
1180 if (connected == false) {
1181 System.err.println("All the Super Nodes in the list are not accessible. I'm unable to connect to the platform !");
1187 // get a Register on the SuperNode
1188 // completed with the required number of Daemons
1190 public synchronized void getRegisterOnSuperNode() {
1191 Register registerSpawner = null ;
1193 boolean recieved = false ;
1195 idAlgo = LocalHost.Instance().getIP() + ":" + LocalHost.Instance().getPort() ;
1197 System.out.println( "Trying to get a Register on the SuperNode" ) ;
1198 int nbExtraSpawners = 0 ;
1199 if (nbTasks > nbOfDaemonsPerSpawner) {
1200 nbExtraSpawners = (nbTasks - 1) / nbOfDaemonsPerSpawner ;
1207 paramAlgo = nbFault ;
1213 registerSpawner = centralServer.getRegisterSpawner(idAlgo,
1214 nbTasks, (Task) tache, nbTasks
1215 + nbExtraSpawners, algo, paramAlgo);
1217 } catch (Exception e) {
1218 System.err.println( "Unable to recieve a register from superNode: " + e ) ;
1219 connectSuperNode() ;
1225 Algo al = centralServer.getAlgo( idAlgo ) ;
1227 dh = al.getGrid().getHeterogenityDegre() ;
1228 System.out.println( "### DH = " + dh ) ;
1230 Utils.writeGraph( al.getGraph(), "/home/lyon/smiquee/resultats/",
1231 "graph_"+algo+"_"+nbFault+"_"+test+".xml" ) ;
1232 Utils.writeGrid( al.getGrid(), "/home/lyon/smiquee/resultats/",
1233 "grid_"+algo+"_"+nbFault+"_"+test+".xml") ;
1234 Utils.writeMapping( al.getMapping(), "/home/lyon/smiquee/resultats/",
1235 "mapping_"+algo+"_"+nbFault+"_"+test+".xml" ) ;
1236 } catch( RemoteException e1 ) {
1237 System.err.println( "Unable to retrieve Algo information in Spawner!" ) ;
1238 e1.printStackTrace() ;
1241 if (registerSpawner.getSize() != (nbTasks + nbExtraSpawners)) {
1242 System.err.println("I did not recieve enough nodes from superNode!!!! \n killing application !!!!");
1243 for (int i = 0; i < registerSpawner.getSize(); i++) {
1245 registerSpawner.getNodeAt(i).getStub().suicide( "Not enough nodes for the application" ) ;
1247 } catch (Exception e) {
1248 System.err.println("The reserved node was unable to reconnect to the super node");
1254 spawnersList = new ArrayList<Object>();
1255 for (int i = 0; i < nbExtraSpawners && i < registerSpawner.getSize(); i++) {
1256 spawnersList.add(registerSpawner.getNodeAt(0));
1258 registerSpawner.removeNodeOfName(registerSpawner.getNodeAt(0).getName());
1262 registerSpawner.setNbOfTasks(nbTasks);
1263 registerSpawner.setNumBackupNeighbors(nbSavingNodes);
1265 * System.out.println("Trying to connect another SuperNode");
1266 * connectSuperNode(); try { registerSpawner =
1267 * centralServer.getRegisterSpawner(LocalHost.Instance().getIP(),
1268 * nbTasks); } catch(Exception e1) {}
1271 if( registerSpawner.getSize() > 0 )
1273 System.out.println( "I received the register" ) ;
1274 // registerSpawner.setVersion(registerVersion);
1275 // registerVersion++;
1276 Register.Instance().replaceBy( registerSpawner ) ;
1277 System.out.println( "It contains " + Register.Instance().getSize()
1278 + " Nodes" + " " + nbExtraSpawners + " ExtraSpawners" ) ;
1280 // set each Node aliveTime value to the Spawner current time
1281 for (int i = 0; i < Register.Instance().getSize(); i++)
1283 noeud = Register.Instance().getNodeAt( i ) ;
1284 noeud.setAliveFlag( true ) ;
1285 noeud.setAliveTime() ;
1289 System.err.println( "\n---------------WARNING--------------" ) ;
1290 System.err.println( "No Daemon available on the SuperNode, try later, please" ) ;
1296 public class TransformThread extends Thread {
1300 public TransformThread(int i, Node n) {
1308 System.out.println("Trying to transform the spawner ("
1309 + n.getName() + ") of rank " + i);
1310 spawnersList.set( i, n.getStub().transformIntoSpawner(
1311 params, appliName, Register.Instance(), nbTasks,
1312 centralServer, i, heartTime, 0, 0, 0,
1313 nbOfDaemonsPerSpawner, nbOfDeamonsPerThread, idAlgo) ) ;
1314 } catch (Exception e) {
1315 System.err.println("Error while contacting newly acquired spawner ("
1316 + n.getName() + "): " + e);
1318 n = centralServer.getNewNode( LocalHost.Instance().getIP(), null ) ;
1320 new TransformThread(i, n).start();
1321 } catch (Exception e1) {
1322 System.err.println("The Super Node is maybe dead: " + e1) ;
1323 for (int z = 0; z < Register.Instance().getSize(); z++) {
1325 Register.Instance().getNodeAt(z).getStub().suicide2("Problem") ;
1327 } catch (Exception ez) {
1328 System.err.println("The reserved node was unable to reconnect to the super node: \n"
1339 public class FaultMake extends Thread {
1341 public FaultMake(){}
1345 ListeTask t = null ;
1347 int next, old = -1 ;
1349 System.out.println( "Starting faults simulation!" ) ;
1351 while( ! RunningApplication.Instance().isRunning() )
1353 /** Waiting some time the beginning of the computation **/
1356 } catch (InterruptedException e) {
1360 while( RunningApplication.Instance().isRunning() )
1362 /** Stop too long computation **/
1363 if( (RunningApplication.Instance().getChrono().getValue() / 1000) > 700 )
1366 Register.Instance().getSpawnerStub().setFinished( true ) ;
1367 } catch (RemoteException e) {
1368 System.err.println( "Unable to stop the too long computation!" ) ;
1369 e.printStackTrace() ;
1373 /** Waiting some time ... **/
1375 sleep( faultTime * 1000 ) ;
1376 } catch( InterruptedException e ) {
1379 /** ... and kill some daemons ! **/
1380 t = Register.Instance().getListeOfTasks() ;
1383 for( int i = 0 ; i < nbFault ; i++ )
1385 next = r.nextInt( t.getSize() ) ;
1387 while( next == old )
1389 next = r.nextInt( t.getSize() ) ;
1395 TaskId recev = null ;
1396 System.out.println( "Making fault on Task" + next ) ;
1398 recev = t.getTaskIdOfRank( next ) ;
1400 JaceInterface stub = recev.getHostStub() ;
1401 System.out.println( "name = " + recev.getHostName() ) ;
1402 noeud = Register.Instance().getNodeOfStub( stub ) ;
1404 stub.suicide2( "Test fault tolerance" ) ;
1406 } catch( UnmarshalException ue ) {
1407 } catch( Exception e) {
1409 System.err.println( "Error in FaultMake on node: "
1410 + noeud.getName() + ". " + e ) ;
1411 } catch (Exception e2) {
1412 System.err.println( "(Fault) Error in error:" + e2 ) ;
1418 } catch( InterruptedException e ) {
1427 public class StartProcessThread extends Thread {
1430 public StartProcessThread(int i) {
1438 * while((spawnersList.elementAt(i) instanceof Node)) try{
1439 * System.out.println("waiting till transform of spawner "+i+
1440 * " is finished"); Thread.sleep(20); }catch(Exception e1){}
1442 // System.out.println("start process on spawner of rank "+i);
1443 JaceSpawnerInterface spawnerStub = (JaceSpawnerInterface) spawnersList.get(i);
1444 spawnerStub.startProcess( spawnersList ) ;
1445 } catch (Exception e) {
1446 e.printStackTrace(System.out);
1447 System.err.println("Unable to start the process on the spawner of rank "
1448 + i + ".error: " + e);
1453 public void createSpawnerNetwork() {
1456 for (i = 0; i < spawnersList.size(); i++) {
1457 n = (Node) spawnersList.get( i ) ;
1459 // Register.Instance().getListeOfTasks().viewAll();
1460 // spawnersList.setElementAt(n.getStub().transformIntoSpawner(
1461 // params, appliName, Register.Instance(),nbTasks, centralServer,i,
1462 // heartTime,0,0),i);
1463 new TransformThread(i, n).start();
1466 // broadcast the Register.Instance() to all the JaceServer
1467 // in order to start each task on the Daemons
1469 spawnersList.add(Register.Instance().getSpawnerStub());
1470 System.out.println(" rank="+rank+" spawnersList.size()=" + spawnersList.size());
1471 rank = spawnersList.size() - 1;
1473 broadcastRegister(1);
1475 for (int j = 0; j < spawnersList.size(); j++) {
1476 System.out.println("waiting till transform of spawner " + j
1478 while ((spawnersList.get(j) instanceof Node))
1483 } catch (Exception e) {
1488 // for (int k = 0; k < spawnersList.size(); k++)
1491 // ((JaceSpawnerInterface) spawnersList.get( k )).setIdAlgo( idAlgo ) ;
1492 // } catch (Exception e) {
1493 // System.err.println("Unable to propagate the mapping algorithm identifier:" + e) ;
1497 System.out.println("End Transformation of all spawners. Beginning the computing processes");
1499 for (i = 0; i < spawnersList.size(); i++) {
1501 // while(!(spawnersList.elementAt(i) instanceof
1502 // JaceSpawnerInterface))
1504 new StartProcessThread(i).start();
1507 System.out.println("End create Spawner Network!!!!!!!!!");
1510 public JaceSpawnerInterface getSpawnerResponsibleOn(int rank) {
1511 int id = rank / nbOfDaemonsPerSpawner;
1512 return (JaceSpawnerInterface) spawnersList.get(id);
1515 public void createAppli() {
1520 ListeTask tsk = new ListeTask();
1522 JaceInterface nodeStub = null;
1523 TaskId myTask = null;
1525 System.out.println("Application launched, starting the chronometer");
1526 RunningApplication.Instance().getChrono().start();
1528 RunningApplication.Instance().setName(appliName);
1529 RunningApplication.Instance().setNbTasks(nbTasks);
1530 // RunningApplication.Instance().setRegister(Register.Instance());
1532 Register.Instance().setParams(params);
1533 Register.Instance().setAppliName(appliName);
1534 Register.Instance().setSpawnerStub(this.spawnerRef);
1536 // assign a TaskId to each Node of the Register
1537 // and insert the TaskId in tke ListTask
1538 while (i < Register.Instance().getSize() && count < nbTasks) {
1539 tmpNode = Register.Instance().getNodeAt(i);
1540 if (tmpNode.getAliveFlag() == true) {
1541 tmpNode.setAppliName(appliName);
1542 nodeStub = tmpNode.getStub();
1543 nodeName = tmpNode.getName();
1544 nodeIP = tmpNode.getIP();
1546 myTask = new TaskId(appliName, count, nodeStub);
1547 myTask.setHostIP(nodeIP);
1548 myTask.setHostName(nodeName);
1550 tsk.addTask(myTask);
1556 // if not enough Nodes in the Register,
1557 // insert not assigned TaskId in the ListTask
1558 if (count < nbTasks) {
1559 for (int j = count; j < nbTasks; j++) {
1560 tsk.addTask(new TaskId(appliName, j, null));
1562 System.out.println("In Register, misses "
1563 + (nbTasks - Register.Instance().getSize()) + " nodes");
1566 // insert the ListeTask in the Register of the appli
1567 Register.Instance().setListeOfTasks(tsk);
1568 // Register.Instance().getListeOfTasks().viewAll();
1569 RunningApplication.Instance().setRunning(true);
1570 System.out.println("fin create appli");
1573 public class BroadcastSpawner extends Thread {
1576 int nbOfDaemonsPerThread, nbOfDeamonsPerSpawner;
1578 public BroadcastSpawner(int i, int debut, int nbOfDeamonsPerSpawner,
1579 int nbOfDaemonsPerThread) {
1582 this.nbOfDaemonsPerThread = nbOfDaemonsPerThread;
1583 this.nbOfDeamonsPerSpawner = nbOfDeamonsPerSpawner;
1588 for (int index = debut + i * nbOfDaemonsPerThread; index < debut
1589 + i * nbOfDeamonsPerThread + nbOfDeamonsPerThread
1590 && index < debut + nbOfDeamonsPerSpawner
1591 && index < Register.Instance().getListeOfTasks().getSize(); index++) {
1593 Register.Instance().getNodeAt(index).getStub().setSpawner(
1594 Register.Instance().getSpawnerStub());
1595 } catch (Exception e) {
1596 System.out.println("can't change spawner stub on node: "
1597 + Register.Instance().getNodeAt(i).getName()
1604 public class KillThread extends Thread {
1607 int nbOfDaemonsPerThread, nbOfDeamonsPerSpawner;
1610 public KillThread(int i, int debut, int nbOfDeamonsPerSpawner,
1611 int nbOfDaemonsPerThread, ListeTask t) {
1614 this.nbOfDaemonsPerThread = nbOfDaemonsPerThread;
1615 this.nbOfDeamonsPerSpawner = nbOfDeamonsPerSpawner;
1622 for (int index = debut + i * nbOfDaemonsPerThread; index < debut
1623 + i * nbOfDeamonsPerThread + nbOfDeamonsPerThread
1624 && index < debut + nbOfDeamonsPerSpawner
1625 && index < t.getSize(); index++) {
1628 TaskId recev = null;
1629 System.out.println("deleting Task" + index);
1631 recev = t.getTaskIdOfRank(index);
1633 JaceInterface stub = recev.getHostStub();
1634 System.out.println("name=" + recev.getHostName());
1635 noeud = Register.Instance().getNodeOfStub(stub);
1636 noeud.setAppliName(null);
1637 new ReconnectThread(stub, noeud.getName()).start();
1638 Register.Instance().removeNode(noeud);
1639 // LocalHost.Instance().getSpawnerStub().killApplication(stub);
1641 } catch (Exception e) {
1643 System.err.println("error in killThread on node "
1644 + noeud.getName() + ". " + e);
1645 } catch (Exception e2) {
1646 System.err.println("error in error :" + e2);
1652 class ReconnectThread extends Thread {
1653 JaceInterface stub = null;
1656 public ReconnectThread(JaceInterface s, String name) {
1663 // System.out.println("reconnexion SuperNode");
1664 // Register.Instance().getNode(workerIP).getWorkerStub().reconnectSuperNode();
1666 // stub.reconnectSuperNode();
1667 stub.suicide("End of application");
1668 } catch (Exception e) {
1669 System.err.println("Cannot kill node " + name);
1677 // faire une copie du Register et l'envoyer aux noeuds qui le compose
1678 // car si il est modif en meme tmp, on envoi pas un truc coherent
1679 private synchronized void broadcastRegister(int requete) {
1680 // Register reg = Register.Instance().clone();
1681 Register reg = Register.Instance();
1684 System.out.println("name of spawner: "
1685 + Register.Instance().getSpawnerStub().getName());
1686 // launch 1 thread to send the Register to all the nodes
1687 while (broadcasting == true)
1689 broadcasting = true;
1690 // Register.Instance().setSpawnerStub(
1691 // Register.Instance().getSpawnerStub());
1692 int x = reg.getListeOfTasks().getSize() / nbOfDaemonsPerSpawner;
1695 if ((reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1696 % nbOfDeamonsPerThread == 0)
1697 s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1698 / nbOfDeamonsPerThread;
1700 s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1701 / nbOfDeamonsPerThread + 1;
1702 else if ((nbOfDaemonsPerSpawner % nbOfDeamonsPerThread) == 0)
1703 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
1705 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread + 1;
1706 int debut = nbOfDaemonsPerSpawner * rank;
1707 System.out.println("rank=" + rank + " debut=" + debut + " s=" + s
1708 + " nbOfDaemonsPerSpawner=" + nbOfDaemonsPerSpawner
1709 + " nbOfDeamonsPerThread=" + nbOfDeamonsPerThread + " x="
1711 for (int i = 0; i < s; i++)
1712 new UpdateRegisterThread(tache, reg, requete, i, debut).start();
1714 * This thread : -updates the goal of the Node beats if necessary
1715 * (stub.updateHeart) -updates the Register on each Node
1716 * (stub.updateRegister)
1718 JaceSpawner.Instance().setBroadcasting(false);
1721 } catch (Exception e) {
1724 } catch (Exception e) {
1726 .println("\n1 node has died during JaceSpawner.broadcastRegister()");
1730 @SuppressWarnings("unchecked")
1731 private synchronized void broadcastScanning() {
1732 Register reg = Register.Instance();
1733 while (broadcasting == true)
1736 } catch (Exception e) {
1738 // Register.Instance().viewAll();
1739 ArrayList<Node> nodes = (ArrayList<Node>) Register.Instance().getListOfNodes().clone();
1740 int x = reg.getListeOfTasks().getSize() / nbOfDaemonsPerSpawner;
1743 s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1744 / nbOfDeamonsPerThread;
1746 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
1748 int debut = nbOfDaemonsPerSpawner * rank;
1750 // for(int i=debut;i<nbOfDaemonsPerSpawner*(rank+1) &&
1751 // i<reg.getSize();i++)
1752 // System.out.println(((Node)nodes.elementAt(i)).getName());
1754 for (int i = 0; i < s + 1; i++) {
1756 new StartScanThread(i, nodes, debut).start();
1761 public synchronized Register getRegister( int rank ) {
1763 ListeTask listOfTasks = Register.Instance().getListeOfTasks() ;
1764 ArrayList<Integer> dependencies = getDependencies( rank, listOfTasks.getSize() ) ;
1765 Register g = new Register() ;
1766 ListeTask newListOfTasks = new ListeTask() ;
1768 g.setAppliName( Register.Instance().getAppliName() ) ;
1769 g.setParams( Register.Instance().getParams() ) ;
1770 g.setSpawnerStub( Register.Instance().getSpawnerStub() ) ;
1771 g.setNbOfTasks( Register.Instance().getNbOfTasks() ) ;
1772 // g.setVersion(reg.getVersion());
1774 for( int j = 0 ; j < dependencies.size() ; j++ )
1776 TaskId id = listOfTasks.getTaskIdOfRank( ( (Integer) dependencies
1777 .get(j) ).intValue() ) ;
1778 newListOfTasks.addTask( id ) ;
1779 if( id.getHostStub() != null ) {
1780 Node noeud = Register.Instance().getNodeOfName( id.getHostName() ) ;// (id.getHostStub());
1783 g.addNode( noeud ) ;
1785 System.err.println( "PAS BON DU TOUT" ) ;
1789 g.setListeOfTasks(newListOfTasks) ;
1793 private void updateConcernedNodes(int rank, Node oldNode, Node node) {
1794 ListeTask listOfTasks = Register.Instance().getListeOfTasks();
1795 ArrayList<Integer> dependencies = getDependencies(rank, listOfTasks.getSize());
1796 System.out.println("List of concerned neighbors of task " + rank+": ");
1797 for (int z = 0; z < dependencies.size(); z++)
1798 System.out.print(((Integer) dependencies.get(z)).intValue()
1800 System.out.println();
1801 // Register.Instance().setVersion(registerVersion);
1802 // registerVersion++;
1803 // Register.Instance()
1804 // .setSpawnerStub(Register.Instance().getSpawnerStub());
1806 if ((dependencies.size() % nbOfDeamonsPerThread) == 0)
1807 s = dependencies.size() / nbOfDeamonsPerThread;
1809 s = dependencies.size() / nbOfDeamonsPerThread + 1;
1810 Register reg = Register.Instance();
1812 for (int j = 0; j < s; j++) {
1813 new UpdateRegisterConcernedThread(dependencies, reg, j, rank,
1814 oldNode, node).start();
1818 private ArrayList<Integer> getDependencies( int id, int jaceSize )
1820 // get computing dependencies
1821 ArrayList<Integer> neighbors = new ArrayList<Integer>() ;
1822 int[] dep = tache.getDependencies( id ) ;
1823 for( int z = 0 ; z < taille(dep) ; z++ )
1825 neighbors.add( dep[z] ) ;
1827 // System.out.println("la liste des voisins de calcul de: "+id+" concerne");
1828 // for(int z=0;z<neighbors.size();z++)
1829 // System.out.print(((Integer)neighbors.elementAt(z)).intValue()+" ");
1830 // System.out.println();
1832 // get convergence neighbors
1834 while (Math.pow(2, d) < jaceSize) {
1835 if (id < Math.pow(2, d) && ((id + Math.pow(2, d)) < jaceSize))
1836 if (!neighbors.contains((Object) ((int) (id + Math.pow(2, d)))))
1837 neighbors.add((int) (id + Math.pow(2, d)));
1838 if (id < Math.pow(2, d + 1) && id >= Math.pow(2, d))
1839 if (!neighbors.contains((Object) ((int) (id - Math.pow(2, d)))))
1840 neighbors.add((int) (id - Math.pow(2, d)));
1844 // get backup neighbors
1845 int nb = Register.Instance().getNumBackupNeighbors();
1848 for (int j = 1; j <= nb; j++) {
1849 // ------------ 1 - for backups "j + n" (to the right of j)
1850 rankOfBackTask = (id + j) % jaceSize;
1851 if (!neighbors.contains((Object) rankOfBackTask))
1852 neighbors.add(rankOfBackTask);
1854 // ------------ 2 - for backups "j - n" (to the left of j)
1857 rankOfBackTask = tmp % jaceSize;
1859 rankOfBackTask = jaceSize - (Math.abs(tmp) % jaceSize);
1861 if (!neighbors.contains((Object) rankOfBackTask))
1862 neighbors.add(rankOfBackTask);
1865 neighbors.add( id ) ;
1870 public static int taille(int[] vect) {
1873 while (x < vect.length && vect[x] >= 0) {
1880 class StartScanning extends Thread {
1882 public StartScanning() {
1892 class StartScanThread extends Thread {
1894 ArrayList<Node> nodes;
1895 int nbOfDeamonsPerThread, nbOfDeamonsPerSpawner;
1897 StartScanThread(int i, ArrayList<Node> nodes, int debut) {
1901 nbOfDeamonsPerThread = JaceSpawner.Instance().getNbOfDeamonsPerThread();
1902 nbOfDeamonsPerSpawner = JaceSpawner.Instance()
1903 .getNbOfDeamonsPerSpawner();
1908 for (index = debut + i * nbOfDeamonsPerThread; index < debut + i
1909 * nbOfDeamonsPerThread + nbOfDeamonsPerThread
1910 && index < debut + nbOfDeamonsPerSpawner
1911 && index < nodes.size(); index++) {
1913 Node node = (Node) nodes.get(index);
1914 JaceInterface stub = node.getStub();
1915 String name = node.getName();
1918 stub.setScanning(true);
1919 // System.out.println("modify scanning to "+name);
1921 } catch (Exception e) {
1922 System.err.println("Unable to modify scanning to " + name + ":"
1926 // for(int x=0;x<nodes.size();x++)
1927 // System.out.println(((Node)nodes.elementAt(x)).getName());
1928 // System.out.println("nbre total: "+(index-1));