3 import java.rmi.Naming;
4 import java.util.Calendar;
5 import java.util.GregorianCalendar;
6 import java.util.Vector;
8 public class JaceSpawner {
11 private Task tache = null;
12 public static JaceSpawner Instance;
13 private static String superNode_IP = null;
14 private int superNode_port = 1098;
15 private static int spawnerPort = 1099;
16 private static JaceSuperNodeInterface centralServer = null;
17 private JaceSpawnerInterface spawnerRef = null;
19 private String appliName;
20 private String[] params = null;
21 @SuppressWarnings("unused")
22 private String protocol;
23 // private int registerVersion=0;
24 final int NB_HEART_DECONNECT = 3;
25 private int heartTime; // frequency of heartBeat
26 @SuppressWarnings("unused")
27 private int timeBeforeKill; // wait 3 non-response of heartBeat before
28 // considering de Daemon as dead
29 private boolean broadcasting = false;
30 @SuppressWarnings("unused")
32 private static int nbOfDaemonsPerSpawner;
33 private static int nbOfDeamonsPerThread;
34 private Vector<Object> spawnersList;
36 private int nbSavingNodes;
38 // Variables for Mapping
40 private double paramAlgo ;
42 public JaceSpawner(String superNode, int port, String comProtocol,
43 String[] args, int nbDaemonPerSpawner, int nbDaemonPerThread,
44 int nbSavingNodes, int _algo, double _paramAlgo) {
45 // superNode_IP = LocalHost.Instance().resolve(superNode);
47 paramAlgo = _paramAlgo ;
49 superNode_IP = superNode;
51 protocol = comProtocol;
52 nbOfDaemonsPerSpawner = nbDaemonPerSpawner;
53 nbOfDeamonsPerThread = nbDaemonPerThread;
54 this.nbSavingNodes = nbSavingNodes;
57 // if less than 2 params (nb of tasks and name of the appli), error
58 System.err.println( "Parameters error !" ) ;
62 nbTasks = new Integer(args[0]).intValue(); // nb of tasks
65 } catch (Exception e) {
66 System.err.println("Number format exception :" + e ) ;
69 appliName = args[1]; // name of the class to launch
70 if (args.length > 2) { // get the eventual param of the appli
71 params = new String[args.length - 2];
72 for (int i = 0; i < params.length; i++) {
73 params[i] = args[2 + i];
77 c = load.load(appliName);
79 tache = ((Task) c.newInstance());
80 tache.setParam(params);
81 tache.setJaceSize(nbTasks);
85 } catch (Exception e) {
86 System.err.println( "Unable to instantiate the class " + e ) ;
96 public JaceSpawner(String[] params, String appliName, Register reg,
97 int nbTasks, JaceSuperNodeInterface snodeStub, int rank,
98 int heartTime, int tag, int nbdc, int nbsdc,
99 int nbDaemonPerSpawner, int nbDaemonPerThread) {
101 nbOfDaemonsPerSpawner = nbDaemonPerSpawner;
102 nbOfDeamonsPerThread = nbDaemonPerThread;
103 if (params.length != 0) {
104 this.params = new String[params.length];
105 for (int i = 0; i < params.length; i++)
106 this.params[i] = params[i];
109 System.err.println( "There is no parameter !" ) ;
111 } catch (Exception e) {
112 System.err.println("Error in copying the parameters: " + e ) ;
114 // System.out.println("xxxxxxxxxxxxxxx reg size="+reg.getSize()+" xxxxxxxxxxxxxx");
115 this.appliName = appliName;
117 this.nbTasks = nbTasks;
118 this.heartTime = heartTime;
119 LocalHost.Instance().setSuperNodeStub(snodeStub);
120 centralServer = snodeStub;
122 Register.Instance().replaceBy(reg);
123 Register.Instance().setSpawnerStub(this.spawnerRef);
124 Register.Instance().getListeOfTasks().viewAll();
128 c = load.load(appliName);
130 tache = ((Task) c.newInstance());
131 tache.setParam(params);
132 tache.setJaceSize(nbTasks);
133 // ****************//
135 } catch (Exception e) {
136 System.err.println("Unable to instantiate the class " + e);
138 RunningApplication.Instance().getChrono().start();
140 RunningApplication.Instance().setName(appliName);
141 RunningApplication.Instance().setNbTasks(nbTasks);
142 RunningApplication.Instance().setRunning(true);
143 RunningApplication.Instance().setNumberOfDisconnections(nbdc);
144 RunningApplication.Instance().setNumberOfSpawnerDisconnections(nbsdc);
145 // System.out.println("+++++++++++++++++++++++++");
148 broadcastRegister(1);
151 * x=Register.Instance().getListeOfTasks().getSize()/nbOfDaemonsPerSpawner
152 * ; int s; if(rank==x)
153 * s=(reg.getListeOfTasks().getSize()%nbOfDaemonsPerSpawner
154 * )/nbOfDeamonsPerThread; else
155 * s=nbOfDaemonsPerSpawner/nbOfDeamonsPerThread;
157 * int debut=nbOfDaemonsPerSpawnerrank;
160 * for(int i=0;i<s+1;i++){
162 * new BroadcastSpawner(i,
163 * debut,nbOfDaemonsPerSpawner,nbOfDeamonsPerThread).start(); }
167 System.out.println("########################");
170 public synchronized static JaceSpawner Instance() {
174 public int getNbOfDeamonsPerThread() {
175 return nbOfDeamonsPerThread;
178 public int getNbOfDeamonsPerSpawner() {
179 return nbOfDaemonsPerSpawner;
182 public void startProcess(Vector<Object> spawnersList) {
183 this.spawnersList = spawnersList;
185 int is = spawnersList.indexOf((Object) Register.Instance()
190 if (is == spawnersList.size() - 1)
193 nextNeighbour = is + 1;
195 * while((spawnersList.elementAt(nextNeighbour) instanceof Node))
197 * System.out.println("waiting till transform of spawner "+nextNeighbour
198 * +" is finished, for setServer"); Thread.sleep(20);
199 * }catch(Exception e1){}
201 HeartBeatSpawner.Instance().setServer(
202 (JaceSpawnerInterface) spawnersList.get(nextNeighbour));
203 HeartBeatSpawner.Instance().setHeartTime(heartTime);
204 HeartBeatSpawner.Instance().start();
205 int previousNeighbour;
207 previousNeighbour = spawnersList.size() - 1;
209 previousNeighbour = is - 1;
210 ScanThreadSpawner.Instance().setHeartTime(heartTime);
211 ScanThreadSpawner.Instance().setServer(
212 (JaceSpawnerInterface) spawnersList.get(previousNeighbour));
213 ScanThreadSpawner.Instance().start();
216 // System.out.println("apres broadcastScanning");
217 new StartScanning().start();
219 System.err.println("Cannot find myself in the spawnersList !");
224 public void setBroadcasting(boolean bool) {
228 public void initialize() {
229 // if(protocol.equals("rmi")){
230 // launch the JaceSpawnerServer
235 // get a Register on the Super Node
236 // completed with the required number of Daemons
237 getRegisterOnSuperNode();
240 createSpawnerNetwork();
245 public void startScanning() {
247 long time = RunningApplication.Instance().getChrono().getValue() / 1000;
248 System.out.println("Start scanning at time: " + time + "s");
249 // lancer le chrono qui gere les heart beat
250 while (RunningApplication.Instance().isRunning() == true) {
251 // 1 etape : scaner tous les "heartTime" milisecondes si les noeuds
252 // enregistes sont encore vivants
253 // res = scanConnectedHosts();
255 // 2 etape : a garder ou pas !!!!! regarder si l'appli est en
256 // attente de noeud pr lui en attribuer 1 nvx
259 Thread.sleep(heartTime);
260 } catch (Exception e) {
263 // /System.out.println("is running = false");
264 if (!JaceDaemon.Instance().isRunning())
268 public synchronized void signalDeadNode(JaceInterface host, int rankOfDead) {
270 TaskId myTaskId = null;
274 RunningApplication.Instance().incrementNumberOfDisconnections();
276 time = RunningApplication.Instance().getChrono().getValue() / 1000;
277 nb = RunningApplication.Instance().getNumberOfDisconnections();
278 nbC = RunningApplication.Instance().getNumberOfCouilles();
279 System.out.println("At time = " + time + "s, NbDisconnection = "
280 + nb + ", NbProblem = " + nbC);
282 // !!!!!!!!!!!!!!actualiser le ListeTask ds le Register
283 myTaskId = Register.Instance().getListeOfTasks()
284 .getTaskIdOfHostStub(host);
285 if (myTaskId == null) {
286 Register.Instance.getListeOfTasks().viewAll();
287 myTaskId = Register.Instance().getListeOfTasks()
288 .getTaskIdOfRank(rankOfDead);
289 JaceInterface deadStub = myTaskId.getHostStub();
290 deadStub.suicide("Not doing a good work");
292 myTaskId.setHostIP(null);
293 myTaskId.setHostName(null);
294 Node noeud = Register.Instance().getNodeOfStub(
295 myTaskId.getHostStub());
296 myTaskId.setHostStub(null);
297 int rankDeaD = myTaskId.getRank();
299 String nomNoeud = noeud.getName();
300 // Register.Instance().removeNodeAt(i);
301 // Register.Instance().removeNode(host.getIP());
302 // System.out.println("fait le remove : taille = " +
303 // Register.Instance().getSize());
305 boolean b = Register.Instance().removeNodeOfName(noeud.getName());
308 System.out.println("Removing Node of rank "
309 + rankDeaD + " : size = "
310 + Register.Instance().getSize());
313 .println("Cannot remove the Node, it doesn't exist anymore: size = "
314 + Register.Instance().getSize());
317 Calendar cal = new GregorianCalendar();
318 System.out.println("At time=" + cal.get(Calendar.MINUTE) + ":"
319 + cal.get(Calendar.SECOND));
321 // retrouver SI POSSIBLE un autre libre pr remplacer celui la pr
324 /**** Sébastien Miquée **/
325 //Node tmpNode = foundToReplaceThisNode(rankDeaD, nomNoeud);
326 Node tmpNode = foundToReplaceThisNode(rankDeaD, nomNoeud, noeud);
328 // broadcastRegister(0);
329 updateConcernedNodes(rankDeaD, noeud, tmpNode);
332 System.out.println("Set scanning on %%%%%%");
333 tmpNode.getStub().setScanning(true);
334 } catch (Exception e) {
335 System.err.println("Unable to setScannig on for the new node: "
339 // Register.Instance().getListeOfTasks().viewAll();
340 for (int z = 0; z < spawnersList.size(); z++)
341 if (!((JaceSpawnerInterface) spawnersList.get(z))
342 .equals(Register.Instance().getSpawnerStub()))
344 ((JaceSpawnerInterface) spawnersList.get(z))
345 .replaceDeamonBy(noeud, tmpNode, rankDeaD);
347 } catch (Exception e) {
349 .println("Unable to broadcast the modifications to all the spawners: "
352 } catch (Exception ee) {
353 System.err.println("Error in signalDeadNode() :" + ee);
357 // verifie si les noeud notes vivant ds le Register.Instance() du SuperNode
359 // retourne 0 si erreur, 1 sinon
361 * private synchronized int scanConnectedHosts() { long time = 0; Node host;
362 * Node tmpNode; long workerTime; long currentTime; int rank; int restempo;
363 * int nb = 0; int nbC = 0; boolean changed = false; int index=0; try{
364 * JaceSpawnerInterface spawnerStub=Register.Instance().getSpawnerStub();
365 * if(spawnerStub.getFinished()==true){
366 * System.out.println("nbre de taches="+Register.Instance().getSize());
367 * ListeTask t=Register.Instance().getListeOfTasks();
368 * for(index=z;index<t.getSize();index++){ TaskId recev = null;
369 * System.out.println("deleting Task************"+index);
371 * recev = t.get(index); JaceInterface stub=recev.getHostStub();
372 * spawnerStub.killApplication(stub); }
376 * } }catch(Exception e){
377 * System.out.println("w aiiiiiiiiiiiiiirrrr"+e+" index="+index); z=index;
380 * if (Register.Instance().getSize() == 0) {
381 * System.out.println("aucun noeuds a scanner");
382 * RunningApplication.Instance().purge(); System.exit(0);
389 // trouver un noeud sur les superNode
390 // pr les requisitionner
392 /*** Sébastien Miquée ***/
394 //private synchronized Node foundToReplaceThisNode(int theRank, String nom) {
395 private synchronized Node foundToReplaceThisNode(int theRank, String nom, Node _n) {
397 boolean found = false;
400 while (found == false) {
403 //node = centralServer.getNewNode(LocalHost.Instance().getIP());
404 node = centralServer.getNewNode(LocalHost.Instance().getIP(), _n);
410 Thread.sleep( 1000 ) ;
411 System.out.println("Pas de bon retour !");
413 } catch (Exception e) {
414 // trouver un autre superNode et lui demander le noeud a lui
416 System.err.println("Cannot localize SuperNode ! " + e);
424 System.out.println("Using Node " + node.getName() + " ("
425 + node.getIP() + ") in order to replace " + nom
426 + " size before add: " + Register.Instance().getSize()
428 node.setAliveFlag(true);
431 // rajouter le noeud ds le Register
432 node.setAppliName(RunningApplication.Instance().getName());
434 // lui envoyer mon stub pr qu'il commence a me pinguer des
436 // TODO a mettre ds un thread ????
440 * neighborTask=Register.Instance().getListeOfTasks().getTaskIdOfRank
441 * ((theRank+1)%Register.Instance().getListeOfTasks().getSize());
442 * try{ node.getStub().updateHeart(neighborTask.getHostStub()); }
443 * catch(Exception e) {
444 * System.out.println("nvx noeud deja plu dispo2"); //node = null; }
446 // TODO verif pourkoi superNode me le redonne
447 // alors qu'il fait deja du calcul
448 // int is = Register.Instance().existNode(node.getIP());
449 int is = Register.Instance().existNode(node);
451 System.out.println("The Node is already in the register ! I don't add it.");
452 System.out.println("Node " + node.getName() + " not added !") ;
455 Register.Instance().addNode(node);
457 // !!!!!!!!!!!!!!actualiser le ListeTask
458 TaskId myTaskId = Register.Instance().getListeOfTasks()
459 .getTaskIdOfRank(theRank);
460 myTaskId.setHostIP(node.getIP());
461 myTaskId.setHostName(node.getName());
462 myTaskId.setHostStub(node.getStub());
464 // Register.Instance().getListeOfTasks().viewAll();
467 neighborRank = Register.Instance().getSize() - 1;
469 neighborRank = theRank - 1;
470 TaskId neighborTask2 = Register.Instance().getListeOfTasks()
471 .getTaskIdOfRank(neighborRank);
473 JaceInterface jaceStub = neighborTask2.getHostStub();
474 jaceStub.updateHeart(node.getStub());
475 } catch (Exception e) {
476 System.err.println("Next node unreachable ! " + e);
483 System.out.println("I didn't receive a new Node !");
488 public void replaceBy(JaceSpawnerInterface oldStub,
489 JaceSpawnerInterface stub) {
490 int index = spawnersList.indexOf((Object) oldStub);
492 spawnersList.setElementAt(stub, index);
494 System.err.println("Spawner's stub not foud in spawnersList !");
497 public void getNewSpawner(JaceSpawnerInterface previousSpawner) {
498 //boolean found = false;
501 JaceSpawnerInterface spawnerStub = null;
503 // while (found == false) {
505 // TODO : trouver l'erreur !!!
507 // "pas localise le super node java.lang.NullPointerException"
508 if (centralServer == null) {
509 System.err.println("Central Server not localized !");
511 node = centralServer.getNewNode( LocalHost.Instance().getIP(), null ) ;
512 RunningApplication.Instance()
513 .incrementNumberOfSpawnerDisconnections();
515 } catch (Exception e) {
516 // trouver un autre superNode et lui demander le noeud a lui
517 System.err.println("Super Node not localized !\n " + e);
518 // System.out.println("pas localise le super node " + e);
519 // System.out.println("pas localise le super node " + e);
520 // System.out.println("pas localise le super node " + e);
521 // System.out.println("pas localise le super node " + e);
522 // System.out.println("pas localise le super node " + e);
523 // System.out.println("pas localise le super node " + e);
524 // System.out.println("pas localise le super node " + e);
525 System.err.println("My IP : " + LocalHost.Instance().getIP());
526 if (centralServer == null) {
527 System.err.println("CentralServer is NULL !");
533 index = spawnersList.indexOf((Object) previousSpawner);
535 System.out.println("Using Node " + node.getName()
537 + LocalHost.Instance().resolve(node.getName())
538 + ") to replace a dead spawner\n\n");
540 // Register.Instance().viewAll();
541 // Register.Instance().getListeOfTasks().viewAll();
542 spawnerStub = node.getStub().transformIntoSpawner(
551 RunningApplication.Instance()
552 .getNumberOfDisconnections(),
553 RunningApplication.Instance()
554 .getNumberOfSpawnerDisconnections(),
555 nbOfDaemonsPerSpawner, nbOfDeamonsPerThread);
556 spawnersList.setElementAt(spawnerStub, index);
557 new StartProcessThread(index).start();
558 // spawnerStub.startProcess( spawnersList);
559 } catch (Exception e) {
560 System.err.println("Unable to reach the new spawner: " + e);
562 for (int j = 0; j < spawnersList.size(); j++)
564 if (!((JaceSpawnerInterface) spawnersList.get(j))
565 .equals(Register.Instance().getSpawnerStub())
566 && !((JaceSpawnerInterface) spawnersList.get(j))
567 .equals(spawnerStub)) {
569 .println("Trying to broadcast to spawner of rank "
572 ((JaceSpawnerInterface) spawnersList.get(j))
573 .replaceBy(previousSpawner, spawnerStub);
575 } catch (Exception e) {
577 .println("Unable to broadcast to spawner of rank: "
578 + j + ". Error:" + e);
580 ScanThreadSpawner.Instance().setServer(spawnerStub);
584 previous = spawnersList.size() - 1;
586 previous = index - 1;
588 ((JaceSpawnerInterface) spawnersList.get(previous))
589 .updateHeart(spawnerStub);
590 } catch (Exception e) {
592 .println("Unable to change the server of the heartbeatThread for the previous node of rank "
593 + previous + ". error:" + e);
597 System.err.println("Node is null !");
602 public void broadcastFinished(boolean bool) {
603 for (int i = 0; i < spawnersList.size(); i++)
605 ((JaceSpawnerInterface) spawnersList.get(i)).setFinished(bool);
606 } catch (Exception e) {
608 .println("Unable to propagate the end of the application :"
613 private synchronized void scanAppliNodes() {
616 //ListeTask tskList = null;
620 JaceSpawnerInterface spawnerStub = Register.Instance()
622 if (spawnerStub.getFinished() == true) {
623 System.out.println("Number of tasks ="
624 + Register.Instance().getSize());
626 int x = Register.Instance().getListeOfTasks().getSize()
627 / nbOfDaemonsPerSpawner;
630 s = (Register.Instance().getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
631 / nbOfDeamonsPerThread;
633 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
635 int debut = nbOfDaemonsPerSpawner * rank;
637 // for(int i=debut;i<nbOfDaemonsPerSpawner*(rank+1) &&
638 // i<reg.getSize();i++)
639 // System.out.println(((Node)nodes.elementAt(i)).getName());
641 ListeTask t = Register.Instance().getListeOfTasks();
642 ScanThreadSpawner.Instance().kill();
643 HeartBeatSpawner.Instance().kill();
644 for (int i = 0; i < s + 1; i++) {
646 new KillThread(i, debut, nbOfDaemonsPerSpawner,
647 nbOfDeamonsPerThread, t).start();
652 long finalTime = RunningApplication.Instance().getChrono()
655 int nbe = RunningApplication.Instance()
656 .getNumberOfDisconnections();
658 int nbsdc = RunningApplication.Instance()
659 .getNumberOfSpawnerDisconnections();
660 System.out.println("Application finished successfully !");
661 // System.out.println("Application finished successfully !!!!!!");
662 // System.out.println("Application finished successfully !!!!!!");
663 // System.out.println("Application finished successfully !!!!!!");
664 // System.out.println("Application finished successfully !!!!!!");
665 // System.out.println("Application finished successfully !!!!!!");
667 // .println("Application finished successfully !!!!!!\n");
668 System.out.println("TOTAL TIME in s : " + (finalTime / 1000));
669 System.out.println("nb of desconnections: " + nbe);
670 System.out.println("nb of spawners desconnections: " + nbsdc);
671 if (JaceDaemon.Instance().isRunning()) {
672 JaceDaemon.Instance().reconnectSuperNode();
674 RunningApplication.Instance().purge();
679 RunningApplication.Instance().purge();
683 } catch (Exception e) {
685 .println("w aiiiiiiiiiiiiiirrrr" + e + " index=" + index);
689 * if (Register.Instance().getSize() == 0) {
690 * System.out.println("aucun noeuds a scanner");
691 * RunningApplication.Instance().purge(); System.exit(0); return 0;
693 * } else{ tskList = Register.Instance().getListeOfTasks();
695 * //si mon appli a besoin d'un noeud //sinon, on fait rien if ((nbTasks
696 * - Register.Instance().getSize()) > 0) { cptReplaced = 0;
698 * //TODO demander des paquet de nodes, pas qu'un //on scanne toutes les
699 * taches de cette appli for (int ind = 0; ind < tskList.getSize();
700 * ind++) { //si 1 tache a pas de noeud, on trouve 1 remplacant
702 * //if (tskList.get(ind).getHostIP() == null) { if
703 * (tskList.get(ind).getHostStub() == null) { rank =
704 * tskList.get(ind).getRank(); node = foundToReplaceThisNodeTMP(rank);
705 * if (node != null) { cptReplaced++; }
709 * //qd fini de scanner taches, envoyer Register //si remplacement de
710 * noeud (c a d si Register modifier) if (cptReplaced != 0) {
711 * broadcastRegister(0); } try { Thread.currentThread().yield(); } catch
714 * }// fin if(appli.getNeededNodes() > 0) {
715 * //System.out.println("SCAN APPLI : taille : " +
716 * Register.Instance().getSize()); return 1; }
720 // @SuppressWarnings("unused")
721 // private synchronized Node foundToReplaceThisNodeTMP(int theRank) {
723 // boolean found = false;
725 // // while (found == false) {
727 // // TODO : trouver l'erreur !!!
729 // // "pas localise le super node java.lang.NullPointerException"
730 // if (centralServer == null) {
731 // System.out.println("centralServer est NUUUUUUUUULL");
733 // node = centralServer.getNewNode(LocalHost.Instance().getIP());
736 // } catch (Exception e) {
737 // // trouver un autre superNode et lui demander le noeud a lui
738 // System.out.println("TMP pas localise le super node " + e);
739 // System.out.println("TMP pas localise le super node " + e);
740 // System.out.println("TMP pas localise le super node " + e);
741 // System.out.println("TMP pas localise le super node " + e);
742 // System.out.println("TMP pas localise le super node " + e);
743 // System.out.println("TMP pas localise le super node " + e);
744 // System.out.println("TMP pas localise le super node " + e);
745 // System.out.println("mon IP : " + LocalHost.Instance().getIP());
746 // if (centralServer == null) {
747 // System.out.println("centralServer : NULL");
749 // connectSuperNode();
752 // if (node != null) {
753 // System.out.println("COOOOOOOOOOOOOOOOOOOOOOL : requisition de "
754 // + node.getName() + " taille avt add: "
755 // + Register.Instance().getSize() + "\n\n");
756 // node.setAliveFlag(true);
757 // node.setAliveTime();
759 // // rajouter le noeud ds le Register
760 // System.out.println("ds Register, manque "
761 // + (nbTasks - Register.Instance().getSize()));
762 // node.setAppliName(RunningApplication.Instance().getName());
764 // // lui envoyer mon stub pr qu'il commence a me pinguer des
766 // // TODO a mettre ds un thread ????
768 // TaskId neighborTask = Register.Instance().getListeOfTasks()
771 // % Register.Instance().getListeOfTasks()
773 // node.getStub().updateHeart(neighborTask.getHostStub());
774 // // node.getStub().updateHeart(this.spawnerRef);
776 // // int is = Register.Instance().existNode(node.getIP());
777 // int is = Register.Instance().existNode(node);
778 // // TODO verif pourkoi superNode me le redonne
779 // // alors qu'il fait deja du calcul
781 // System.out.println("j'ajoute pas le noeud, il y est deja");
782 // System.out.println("PAS AJOUTEE TMP " + node.getName());
783 // System.out.println("PAS AJOUTEE TMP " + node.getName());
784 // System.out.println("PAS AJOUTEE TMP " + node.getName());
785 // System.out.println("PAS AJOUTEE TMP " + node.getName());
786 // System.out.println("PAS AJOUTEE TMP " + node.getName());
789 // Register.Instance().addNode(node);
791 // // !!!!!!!!!!!!!!actualiser le ListeTask
792 // TaskId myTaskId = Register.Instance().getListeOfTasks()
793 // .getTaskIdOfRank(theRank);
794 // myTaskId.setHostIP(node.getIP());
795 // myTaskId.setHostName(node.getName());
796 // myTaskId.setHostStub(node.getStub());
797 // // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostIP(node.getIP());
798 // // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostName(node.getName());
799 // // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostStub(node.getStub());
801 // } catch (Exception e) {
802 // System.out.println("nvx noeud deja plu dispo");
806 // System.out.println("RADINNNNNNNNNNNNNN TMP ");
811 private void exportObject() {
813 JaceSpawnerServer spawnerServer = null;
815 System.out.println("Name of local machine is: "
816 + LocalHost.Instance().getName());
817 System.out.println("IP of local machine is: "
818 + LocalHost.Instance().getIP());
820 // launch the JaceSpawnerServer
821 spawnerServer = new JaceSpawnerServer();
822 java.rmi.registry.LocateRegistry.createRegistry(spawnerPort);
823 java.rmi.registry.LocateRegistry.getRegistry(spawnerPort).rebind(
824 "JaceSpawnerServer", spawnerServer);
825 spawnerRef = (JaceSpawnerInterface) Naming.lookup("rmi://"
826 + LocalHost.Instance().getIP() + ":" + spawnerPort
827 + "/JaceSpawnerServer");
829 } catch (Exception e) {
831 .println("JaceP2P_Error in JaceSpawner.exportObject() when creating the local JaceSpawnerServer "
833 // System.err.println("exit ds JaceSpawner.exportObject");
839 public void connectSuperNode() {
840 System.out.println("I'm looking for a super node");
841 boolean connected = false;
842 if (!(superNode_IP == null)) {
844 System.out.println("Trying to invoke super node "
846 centralServer = (JaceSuperNodeInterface) Naming.lookup("rmi://"
847 + superNode_IP + ":" + superNode_port
849 System.out.println("Succesfully located " + superNode_IP);
851 // add stub and IP in LocalHost to store it until super node
853 LocalHost.Instance().setSuperNodeStub(centralServer);
854 LocalHost.Instance().setSuperNodeIP(superNode_IP);
855 heartTime = centralServer.getSuperNodeBeat();
856 timeBeforeKill = NB_HEART_DECONNECT * heartTime;
859 } catch (Exception e) {
860 System.err.println("Super Node not accessible, try another one (1/2s)");
863 } catch (Exception e1) {
868 if (connected == false) {
870 SuperNodeListe.Instance().staticInitialization();
871 while (connected == false
872 && i < SuperNodeListe.Instance().getListe().size()) {
873 SuperNodeData d = null;
874 d = SuperNodeListe.Instance().getSuperNodeData(i);
876 superNode_IP = LocalHost.Instance().resolve(d.getIP());
877 superNode_port = d.getPort();
878 // superNode_port = d.getPort();
880 System.out.println("Trying to invoke Super Node "
882 centralServer = (JaceSuperNodeInterface) Naming
883 .lookup("rmi://" + superNode_IP + ":"
884 + superNode_port + "/JaceSuperNode");
885 System.out.println("Succesfully located SuperNode "
887 LocalHost.Instance().setSuperNodeStub(centralServer);
888 LocalHost.Instance().setSuperNodeIP(superNode_IP);
889 heartTime = centralServer.getSuperNodeBeat();
890 timeBeforeKill = NB_HEART_DECONNECT * heartTime;
893 } catch (Exception e) {
895 .println("SuperNode "
897 + " not accessible, trying to locate another one in 0.5s\n");
901 } catch (Exception e1) {
907 if (connected == false) {
908 System.err.println("All the Super Nodes in the list are not accessible. I'm unable to connect to the platform !");
914 // get a Register on the SuperNode
915 // completed with the required number of Daemons
917 public synchronized void getRegisterOnSuperNode() {
918 Register registerSpawner = null;
920 boolean recieved = false;
922 System.out.println("Trying to get a Register on the SuperNode");
923 int nbExtraSpawners = 0;
924 if (nbTasks > nbOfDaemonsPerSpawner) {
925 nbExtraSpawners = (nbTasks - 1) / nbOfDaemonsPerSpawner;
930 registerSpawner = centralServer.getRegisterSpawner(LocalHost
931 .Instance().getIP(), nbTasks, (Task) tache, nbTasks
932 + nbExtraSpawners, algo, paramAlgo);
934 } catch (Exception e) {
936 .println("Unable to recieve a register from superNode "
941 if (registerSpawner.getSize() != (nbTasks + nbExtraSpawners)) {
942 System.err.println("I did not recieve enough nodes from superNode!!!! \n killing application !!!!");
943 for (int i = 0; i < registerSpawner.getSize(); i++) {
945 registerSpawner.getNodeAt(i).getStub().reconnectSuperNode();
946 } catch (Exception e) {
947 System.err.println("The reserved node was unable to reconnect to the super node");
953 spawnersList = new Vector<Object>();
954 for (int i = 0; i < nbExtraSpawners && i < registerSpawner.getSize(); i++) {
955 spawnersList.add(registerSpawner.getNodeAt(0));
956 // * nbOfDaemonsPerSpawner));
957 registerSpawner.removeNodeOfName(registerSpawner.getNodeAt(0).getName());
958 // * nbOfDaemonsPerSpawner));
961 registerSpawner.setNbOfTasks(nbTasks);
962 registerSpawner.setNumBackupNeighbors(nbSavingNodes);
964 * System.out.println("Trying to connect another SuperNode");
965 * connectSuperNode(); try { registerSpawner =
966 * centralServer.getRegisterSpawner(LocalHost.Instance().getIP(),
967 * nbTasks); } catch(Exception e1) {}
970 if (registerSpawner != null) {
971 System.out.println("I received the register");
972 // registerSpawner.setVersion(registerVersion);
973 // registerVersion++;
974 Register.Instance().replaceBy(registerSpawner);
975 System.out.println("It contains " + Register.Instance().getSize()
976 + " Nodes" + " " + nbExtraSpawners + " ExtraSpawners");
978 // set each Node aliveTime value to the Spawner current time
979 for (int i = 0; i < Register.Instance().getSize(); i++) {
980 noeud = Register.Instance().getNodeAt(i);
981 noeud.setAliveFlag(true);
982 noeud.setAliveTime();
986 System.err.println("\n---------------WARNING--------------");
987 System.err.println("No Daemon available on the SuperNode dispo, try later, please");
992 public class TransformThread extends Thread {
996 public TransformThread(int i, Node n) {
1004 System.out.println("Trying to transform the spawner ("
1005 + n.getName() + ") of rank " + i);
1006 spawnersList.setElementAt(n.getStub().transformIntoSpawner(
1007 params, appliName, Register.Instance(), nbTasks,
1008 centralServer, i, heartTime, 0, 0, 0,
1009 nbOfDaemonsPerSpawner, nbOfDeamonsPerThread), i);
1010 } catch (Exception e) {
1011 System.err.println("Error while contacting newly acquired spawner ("
1012 + n.getName() + "): " + e);
1014 n = centralServer.getNewNode( LocalHost.Instance().getIP(), null ) ;
1016 new TransformThread(i, n).start();
1017 } catch (Exception e1) {
1018 System.err.println("The Super Node is maybe dead: " + e1) ;
1019 for (int z = 0; z < Register.Instance().getSize(); z++) {
1021 Register.Instance().getNodeAt(z).getStub()
1022 .reconnectSuperNode();
1023 } catch (Exception ez) {
1024 System.err.println("The reserved node was unable to reconnect to the super node: \n"
1034 public class StartProcessThread extends Thread {
1037 public StartProcessThread(int i) {
1045 * while((spawnersList.elementAt(i) instanceof Node)) try{
1046 * System.out.println("waiting till transform of spawner "+i+
1047 * " is finished"); Thread.sleep(20); }catch(Exception e1){}
1049 // System.out.println("start process on spawner of rank "+i);
1050 JaceSpawnerInterface spawnerStub = (JaceSpawnerInterface) spawnersList.get(i);
1051 spawnerStub.startProcess(spawnersList);
1052 } catch (Exception e) {
1053 e.printStackTrace(System.out);
1054 System.err.println("Unable to start the process on the spawner of rank "
1055 + i + ".error: " + e);
1060 public void createSpawnerNetwork() {
1063 for (i = 0; i < spawnersList.size(); i++) {
1064 n = (Node) spawnersList.elementAt(i);
1066 // Register.Instance().getListeOfTasks().viewAll();
1067 // spawnersList.setElementAt(n.getStub().transformIntoSpawner(
1068 // params, appliName, Register.Instance(),nbTasks, centralServer,i,
1069 // heartTime,0,0),i);
1070 new TransformThread(i, n).start();
1073 // broadcast the Register.Instance() to all the JaceServer
1074 // in order to start each task on the Daemons
1076 spawnersList.add(Register.Instance().getSpawnerStub());
1077 System.out.println(" rank="+rank+" spawnersList.size()=" + spawnersList.size());
1078 rank = spawnersList.size() - 1;
1080 broadcastRegister(1);
1082 for (int j = 0; j < spawnersList.size(); j++) {
1083 System.out.println("waiting till transform of spawner " + j
1085 while ((spawnersList.elementAt(j) instanceof Node))
1089 } catch (Exception e) {
1093 System.out.println("End Transformation of all spawners. Beginning the computing processes");
1095 for (i = 0; i < spawnersList.size(); i++) {
1097 // while(!(spawnersList.elementAt(i) instanceof
1098 // JaceSpawnerInterface))
1100 new StartProcessThread(i).start();
1103 System.out.println("End create Spawner Network!!!!!!!!!");
1106 public JaceSpawnerInterface getSpawnerResponsibleOn(int rank) {
1107 int id = rank / nbOfDaemonsPerSpawner;
1108 return (JaceSpawnerInterface) spawnersList.get(id);
1111 public void createAppli() {
1116 ListeTask tsk = new ListeTask();
1118 JaceInterface nodeStub = null;
1119 TaskId myTask = null;
1121 System.out.println("Application launched, starting the chronometer");
1122 RunningApplication.Instance().getChrono().start();
1124 RunningApplication.Instance().setName(appliName);
1125 RunningApplication.Instance().setNbTasks(nbTasks);
1126 // RunningApplication.Instance().setRegister(Register.Instance());
1128 Register.Instance().setParams(params);
1129 Register.Instance().setAppliName(appliName);
1130 Register.Instance().setSpawnerStub(this.spawnerRef);
1132 // assign a TaskId to each Node of the Register
1133 // and insert the TaskId in tke ListTask
1134 while (i < Register.Instance().getSize() && count < nbTasks) {
1135 tmpNode = Register.Instance().getNodeAt(i);
1136 if (tmpNode.getAliveFlag() == true) {
1137 tmpNode.setAppliName(appliName);
1138 nodeStub = tmpNode.getStub();
1139 nodeName = tmpNode.getName();
1140 nodeIP = tmpNode.getIP();
1142 myTask = new TaskId(appliName, count, nodeStub);
1143 myTask.setHostIP(nodeIP);
1144 myTask.setHostName(nodeName);
1146 tsk.addTask(myTask);
1152 // if not enough Nodes in the Register,
1153 // insert not assigned TaskId in the ListTask
1154 if (count < nbTasks) {
1155 for (int j = count; j < nbTasks; j++) {
1156 tsk.addTask(new TaskId(appliName, j, null));
1158 System.out.println("in Register, misses "
1159 + (nbTasks - Register.Instance().getSize()) + " nodes");
1162 // insert the ListeTask in the Register of the appli
1163 Register.Instance().setListeOfTasks(tsk);
1164 // Register.Instance().getListeOfTasks().viewAll();
1165 RunningApplication.Instance().setRunning(true);
1166 System.out.println("fin create appli");
1169 public class BroadcastSpawner extends Thread {
1172 int nbOfDaemonsPerThread, nbOfDeamonsPerSpawner;
1174 public BroadcastSpawner(int i, int debut, int nbOfDeamonsPerSpawner,
1175 int nbOfDaemonsPerThread) {
1178 this.nbOfDaemonsPerThread = nbOfDaemonsPerThread;
1179 this.nbOfDeamonsPerSpawner = nbOfDeamonsPerSpawner;
1184 for (int index = debut + i * nbOfDaemonsPerThread; index < debut
1185 + i * nbOfDeamonsPerThread + nbOfDeamonsPerThread
1186 && index < debut + nbOfDeamonsPerSpawner
1187 && index < Register.Instance().getListeOfTasks().getSize(); index++) {
1189 Register.Instance().getNodeAt(index).getStub().setSpawner(
1190 Register.Instance().getSpawnerStub());
1191 } catch (Exception e) {
1192 System.out.println("can't change spawner stub on node: "
1193 + Register.Instance().getNodeAt(i).getName()
1200 public class KillThread extends Thread {
1203 int nbOfDaemonsPerThread, nbOfDeamonsPerSpawner;
1206 public KillThread(int i, int debut, int nbOfDeamonsPerSpawner,
1207 int nbOfDaemonsPerThread, ListeTask t) {
1210 this.nbOfDaemonsPerThread = nbOfDaemonsPerThread;
1211 this.nbOfDeamonsPerSpawner = nbOfDeamonsPerSpawner;
1218 for (int index = debut + i * nbOfDaemonsPerThread; index < debut
1219 + i * nbOfDeamonsPerThread + nbOfDeamonsPerThread
1220 && index < debut + nbOfDeamonsPerSpawner
1221 && index < t.getSize(); index++) {
1224 TaskId recev = null;
1225 System.out.println("deleting Task" + index);
1227 recev = t.getTaskIdOfRank(index);
1229 JaceInterface stub = recev.getHostStub();
1230 System.out.println("name=" + recev.getHostName());
1231 noeud = Register.Instance().getNodeOfStub(stub);
1232 noeud.setAppliName(null);
1233 new ReconnectThread(stub, noeud.getName()).start();
1234 Register.Instance().removeNode(noeud);
1235 // LocalHost.Instance().getSpawnerStub().killApplication(stub);
1237 } catch (Exception e) {
1239 System.err.println("error in killThread on node "
1240 + noeud.getName() + ". " + e);
1241 } catch (Exception e2) {
1242 System.err.println("error in error :" + e2);
1248 class ReconnectThread extends Thread {
1249 JaceInterface stub = null;
1252 public ReconnectThread(JaceInterface s, String name) {
1259 // System.out.println("reconnexion SuperNode");
1260 // Register.Instance().getNode(workerIP).getWorkerStub().reconnectSuperNode();
1262 // stub.reconnectSuperNode();
1263 stub.suicide("fin d'appli");
1264 } catch (Exception e) {
1265 System.err.println("can't kill node " + name);
1273 // faire une copie du Register et l'envoyer aux noeuds qui le compose
1274 // car si il est modif en meme tmp, on envoi pas un truc coherent
1275 private synchronized void broadcastRegister(int requete) {
1276 // Register reg = Register.Instance().clone();
1277 Register reg = Register.Instance();
1280 System.out.println("name of spawner: "
1281 + Register.Instance().getSpawnerStub().getName());
1282 // launch 1 thread to send the Register to all the nodes
1283 while (broadcasting == true)
1285 broadcasting = true;
1286 // Register.Instance().setSpawnerStub(
1287 // Register.Instance().getSpawnerStub());
1288 int x = reg.getListeOfTasks().getSize() / nbOfDaemonsPerSpawner;
1291 if ((reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1292 % nbOfDeamonsPerThread == 0)
1293 s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1294 / nbOfDeamonsPerThread;
1296 s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1297 / nbOfDeamonsPerThread + 1;
1298 else if ((nbOfDaemonsPerSpawner % nbOfDeamonsPerThread) == 0)
1299 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
1301 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread + 1;
1302 int debut = nbOfDaemonsPerSpawner * rank;
1303 System.out.println("rank=" + rank + " debut=" + debut + " s=" + s
1304 + " nbOfDaemonsPerSpawner=" + nbOfDaemonsPerSpawner
1305 + " nbOfDeamonsPerThread=" + nbOfDeamonsPerThread + " x="
1307 for (int i = 0; i < s; i++)
1308 new UpdateRegisterThread(tache, reg, requete, i, debut).start();
1310 * This thread : -updates the goal of the Node beats if necessary
1311 * (stub.updateHeart) -updates the Register on each Node
1312 * (stub.updateRegister)
1314 JaceSpawner.Instance().setBroadcasting(false);
1317 } catch (Exception e) {
1320 } catch (Exception e) {
1322 .println("\n1 node has died during JaceSpawner.broadcastRegister()");
1326 private synchronized void broadcastScanning() {
1327 Register reg = Register.Instance();
1328 while (broadcasting == true)
1331 } catch (Exception e) {
1333 // Register.Instance().viewAll();
1334 Vector<?> nodes = (Vector<?>) Register.Instance().getListOfNodes().clone();
1335 int x = reg.getListeOfTasks().getSize() / nbOfDaemonsPerSpawner;
1338 s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1339 / nbOfDeamonsPerThread;
1341 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
1343 int debut = nbOfDaemonsPerSpawner * rank;
1345 // for(int i=debut;i<nbOfDaemonsPerSpawner*(rank+1) &&
1346 // i<reg.getSize();i++)
1347 // System.out.println(((Node)nodes.elementAt(i)).getName());
1349 for (int i = 0; i < s + 1; i++) {
1351 new StartScanThread(i, nodes, debut).start();
1356 public Register getRegister(int rank) {
1358 ListeTask listOfTasks = Register.Instance().getListeOfTasks();
1359 Vector<Integer> dependencies = getDependencies(rank, listOfTasks.getSize());
1360 Register g = new Register();
1361 ListeTask newListOfTasks = new ListeTask();
1362 g.setAppliName(Register.Instance().getAppliName());
1363 g.setParams(Register.Instance().getParams());
1364 g.setSpawnerStub(Register.Instance().getSpawnerStub());
1365 g.setNbOfTasks(Register.Instance().getNbOfTasks());
1366 // g.setVersion(reg.getVersion());
1367 for (int j = 0; j < dependencies.size(); j++) {
1368 TaskId id = listOfTasks.getTaskIdOfRank(((Integer) dependencies
1369 .elementAt(j)).intValue());
1370 newListOfTasks.addTask(id);
1371 if (id.getHostStub() != null) {
1372 Node noeud = Register.Instance()
1373 .getNodeOfStub(id.getHostStub());
1377 g.setListeOfTasks(newListOfTasks);
1381 private void updateConcernedNodes(int rank, Node oldNode, Node node) {
1382 ListeTask listOfTasks = Register.Instance().getListeOfTasks();
1383 Vector<?> dependencies = getDependencies(rank, listOfTasks.getSize());
1384 System.out.println("la liste des voisins concernes de : " + rank);
1385 for (int z = 0; z < dependencies.size(); z++)
1386 System.out.print(((Integer) dependencies.elementAt(z)).intValue()
1388 System.out.println();
1389 // Register.Instance().setVersion(registerVersion);
1390 // registerVersion++;
1392 .setSpawnerStub(Register.Instance().getSpawnerStub());
1394 if ((dependencies.size() % nbOfDeamonsPerThread) == 0)
1395 s = dependencies.size() / nbOfDeamonsPerThread;
1397 s = dependencies.size() / nbOfDeamonsPerThread + 1;
1398 Register reg = Register.Instance();
1400 for (int j = 0; j < s; j++) {
1401 new UpdateRegisterConcernedThread(dependencies, reg, j, rank,
1402 oldNode, node).start();
1406 private Vector<Integer> getDependencies(int id, int jaceSize) {
1407 // get computing dependencies
1408 Vector<Integer> neighbors = new Vector<Integer>();
1409 int[] dep = tache.getDependencies(id);
1410 for (int z = 0; z < taille(dep); z++)
1411 neighbors.add(dep[z]);
1412 // System.out.println("la liste des voisins de calcul de: "+id+" concerne");
1413 // for(int z=0;z<neighbors.size();z++)
1414 // System.out.print(((Integer)neighbors.elementAt(z)).intValue()+" ");
1415 // System.out.println();
1417 // get convergence neighbors
1419 while (Math.pow(2, d) < jaceSize) {
1420 if (id < Math.pow(2, d) && ((id + Math.pow(2, d)) < jaceSize))
1421 if (!neighbors.contains((Object) ((int) (id + Math.pow(2, d)))))
1422 neighbors.add((int) (id + Math.pow(2, d)));
1423 if (id < Math.pow(2, d + 1) && id >= Math.pow(2, d))
1424 if (!neighbors.contains((Object) ((int) (id - Math.pow(2, d)))))
1425 neighbors.add((int) (id - Math.pow(2, d)));
1429 // get backup neighbors
1430 int nb = Register.Instance().getNumBackupNeighbors();
1433 for (int j = 1; j <= nb; j++) {
1434 // ------------ 1 - for backups "j + n" (to the right of j)
1435 rankOfBackTask = (id + j) % jaceSize;
1436 if (!neighbors.contains((Object) rankOfBackTask))
1437 neighbors.add(rankOfBackTask);
1439 // ------------ 2 - for backups "j - n" (to the left of j)
1442 rankOfBackTask = tmp % jaceSize;
1444 rankOfBackTask = jaceSize - (Math.abs(tmp) % jaceSize);
1446 if (!neighbors.contains((Object) rankOfBackTask))
1447 neighbors.add(rankOfBackTask);
1455 public static int taille(int[] vect) {
1458 while (x < vect.length && vect[x] >= 0) {
1465 class StartScanning extends Thread {
1467 public StartScanning() {
1477 class StartScanThread extends Thread {
1480 int nbOfDeamonsPerThread, nbOfDeamonsPerSpawner;
1482 StartScanThread(int i, Vector<?> nodes, int debut) {
1486 nbOfDeamonsPerThread = JaceSpawner.Instance().getNbOfDeamonsPerThread();
1487 nbOfDeamonsPerSpawner = JaceSpawner.Instance()
1488 .getNbOfDeamonsPerSpawner();
1493 for (index = debut + i * nbOfDeamonsPerThread; index < debut + i
1494 * nbOfDeamonsPerThread + nbOfDeamonsPerThread
1495 && index < debut + nbOfDeamonsPerSpawner
1496 && index < nodes.size(); index++) {
1498 Node node = (Node) nodes.elementAt(index);
1499 JaceInterface stub = node.getStub();
1500 String name = node.getName();
1503 stub.setScanning(true);
1504 // System.out.println("modify scanning to "+name);
1506 } catch (Exception e) {
1507 System.out.println("unable to modify scanning to " + name + ":"
1511 // for(int x=0;x<nodes.size();x++)
1512 // System.out.println(((Node)nodes.elementAt(x)).getName());
1513 // System.out.println("nbre total: "+(index-1));