3 import java.rmi.Naming;
4 import java.util.Calendar;
5 import java.util.GregorianCalendar;
6 import java.util.Vector;
8 public class JaceSpawner {
11 private Task tache = null;
12 public static JaceSpawner Instance;
13 private static String superNode_IP = null;
14 private int superNode_port = 1098;
15 private static int spawnerPort = 1099;
16 private static JaceSuperNodeInterface centralServer = null;
17 private JaceSpawnerInterface spawnerRef = null;
19 private String appliName;
20 private String[] params = null;
21 @SuppressWarnings("unused")
22 private String protocol;
23 // private int registerVersion=0;
24 final int NB_HEART_DECONNECT = 3;
25 private int heartTime; // frequency of heartBeat
26 @SuppressWarnings("unused")
27 private int timeBeforeKill; // wait 3 non-response of heartBeat before
28 // considering de Daemon as dead
29 private boolean broadcasting = false;
30 @SuppressWarnings("unused")
32 private static int nbOfDaemonsPerSpawner;
33 private static int nbOfDeamonsPerThread;
34 private Vector<Object> spawnersList;
36 private int nbSavingNodes;
38 // Variables for Mapping
40 private double paramAlgo ;
42 public JaceSpawner(String superNode, int port, String comProtocol,
43 String[] args, int nbDaemonPerSpawner, int nbDaemonPerThread,
44 int nbSavingNodes, int _algo, double _paramAlgo) {
45 // superNode_IP = LocalHost.Instance().resolve(superNode);
47 paramAlgo = _paramAlgo ;
49 superNode_IP = superNode;
51 protocol = comProtocol;
52 nbOfDaemonsPerSpawner = nbDaemonPerSpawner;
53 nbOfDeamonsPerThread = nbDaemonPerThread;
54 this.nbSavingNodes = nbSavingNodes;
57 // if less than 2 params (nb of tasks and name of the appli), error
58 System.err.println( "Parameters error !" ) ;
62 nbTasks = new Integer(args[0]).intValue(); // nb of tasks
65 } catch (Exception e) {
66 System.err.println("Number format exception :" + e ) ;
69 appliName = args[1]; // name of the class to launch
70 if (args.length > 2) { // get the eventual param of the appli
71 params = new String[args.length - 2];
72 for (int i = 0; i < params.length; i++) {
73 params[i] = args[2 + i];
77 c = load.load(appliName);
79 tache = ((Task) c.newInstance());
80 tache.setParam(params);
81 tache.setJaceSize(nbTasks);
85 } catch (Exception e) {
86 System.err.println( "Unable to instantiate the class " + e ) ;
96 public JaceSpawner(String[] params, String appliName, Register reg,
97 int nbTasks, JaceSuperNodeInterface snodeStub, int rank,
98 int heartTime, int tag, int nbdc, int nbsdc,
99 int nbDaemonPerSpawner, int nbDaemonPerThread) {
101 nbOfDaemonsPerSpawner = nbDaemonPerSpawner;
102 nbOfDeamonsPerThread = nbDaemonPerThread;
103 if (params.length != 0) {
104 this.params = new String[params.length];
105 for (int i = 0; i < params.length; i++)
106 this.params[i] = params[i];
109 System.err.println( "There is no parameter !" ) ;
111 } catch (Exception e) {
112 System.err.println("Error in copying the parameters: " + e ) ;
114 // System.out.println("xxxxxxxxxxxxxxx reg size="+reg.getSize()+" xxxxxxxxxxxxxx");
115 this.appliName = appliName;
117 this.nbTasks = nbTasks;
118 this.heartTime = heartTime;
119 LocalHost.Instance().setSuperNodeStub(snodeStub);
120 centralServer = snodeStub;
122 Register.Instance().replaceBy(reg);
123 Register.Instance().setSpawnerStub(this.spawnerRef);
124 Register.Instance().getListeOfTasks().viewAll();
128 c = load.load(appliName);
130 tache = ((Task) c.newInstance());
131 tache.setParam(params);
132 tache.setJaceSize(nbTasks);
133 // ****************//
135 } catch (Exception e) {
136 System.err.println("Unable to instantiate the class " + e);
138 RunningApplication.Instance().getChrono().start();
140 RunningApplication.Instance().setName(appliName);
141 RunningApplication.Instance().setNbTasks(nbTasks);
142 RunningApplication.Instance().setRunning(true);
143 RunningApplication.Instance().setNumberOfDisconnections(nbdc);
144 RunningApplication.Instance().setNumberOfSpawnerDisconnections(nbsdc);
145 // System.out.println("+++++++++++++++++++++++++");
148 broadcastRegister(1);
151 * x=Register.Instance().getListeOfTasks().getSize()/nbOfDaemonsPerSpawner
152 * ; int s; if(rank==x)
153 * s=(reg.getListeOfTasks().getSize()%nbOfDaemonsPerSpawner
154 * )/nbOfDeamonsPerThread; else
155 * s=nbOfDaemonsPerSpawner/nbOfDeamonsPerThread;
157 * int debut=nbOfDaemonsPerSpawnerrank;
160 * for(int i=0;i<s+1;i++){
162 * new BroadcastSpawner(i,
163 * debut,nbOfDaemonsPerSpawner,nbOfDeamonsPerThread).start(); }
167 System.out.println("########################");
170 public synchronized static JaceSpawner Instance() {
174 public int getNbOfDeamonsPerThread() {
175 return nbOfDeamonsPerThread;
178 public int getNbOfDeamonsPerSpawner() {
179 return nbOfDaemonsPerSpawner;
182 public void startProcess(Vector<Object> spawnersList) {
183 this.spawnersList = spawnersList;
184 int is = spawnersList.indexOf((Object) Register.Instance()
188 if (is == spawnersList.size() - 1)
191 nextNeighbour = is + 1;
193 * while((spawnersList.elementAt(nextNeighbour) instanceof Node))
195 * System.out.println("waiting till transform of spawner "+nextNeighbour
196 * +" is finished, for setServer"); Thread.sleep(20);
197 * }catch(Exception e1){}
199 HeartBeatSpawner.Instance().setServer(
200 (JaceSpawnerInterface) spawnersList.get(nextNeighbour));
201 HeartBeatSpawner.Instance().setHeartTime(heartTime);
202 HeartBeatSpawner.Instance().start();
203 int previousNeighbour;
205 previousNeighbour = spawnersList.size() - 1;
207 previousNeighbour = is - 1;
208 ScanThreadSpawner.Instance().setHeartTime(heartTime);
209 ScanThreadSpawner.Instance().setServer(
210 (JaceSpawnerInterface) spawnersList.get(previousNeighbour));
211 ScanThreadSpawner.Instance().start();
214 // System.out.println("apres broadcastScanning");
215 new StartScanning().start();
217 System.err.println("Cannot find myself in the spawnersList !");
222 public void setBroadcasting(boolean bool) {
226 public void initialize() {
227 // if(protocol.equals("rmi")){
228 // launch the JaceSpawnerServer
233 // get a Register on the Super Node
234 // completed with the required number of Daemons
235 getRegisterOnSuperNode();
238 createSpawnerNetwork();
243 public void startScanning() {
245 long time = RunningApplication.Instance().getChrono().getValue() / 1000;
246 System.out.println("Start scanning at time: " + time + "s");
247 // lancer le chrono qui gere les heart beat
248 while (RunningApplication.Instance().isRunning() == true) {
249 // 1 etape : scaner tous les "heartTime" milisecondes si les noeuds
250 // enregistes sont encore vivants
251 // res = scanConnectedHosts();
253 // 2 etape : a garder ou pas !!!!! regarder si l'appli est en
254 // attente de noeud pr lui en attribuer 1 nvx
257 Thread.sleep(heartTime);
258 } catch (Exception e) {
261 // /System.out.println("is running = false");
262 if (!JaceDaemon.Instance().isRunning())
266 public synchronized void signalDeadNode(JaceInterface host, int rankOfDead) {
268 TaskId myTaskId = null;
272 RunningApplication.Instance().incrementNumberOfDisconnections();
274 time = RunningApplication.Instance().getChrono().getValue() / 1000;
275 nb = RunningApplication.Instance().getNumberOfDisconnections();
276 nbC = RunningApplication.Instance().getNumberOfCouilles();
277 System.out.println("At time = " + time + "s, NbDisconnection = "
278 + nb + ", NbProblem = " + nbC);
280 // !!!!!!!!!!!!!!actualiser le ListeTask ds le Register
281 myTaskId = Register.Instance().getListeOfTasks()
282 .getTaskIdOfHostStub(host);
283 if (myTaskId == null) {
284 Register.Instance.getListeOfTasks().viewAll();
285 myTaskId = Register.Instance().getListeOfTasks()
286 .getTaskIdOfRank(rankOfDead);
287 JaceInterface deadStub = myTaskId.getHostStub();
288 deadStub.suicide("Not doing a good work");
290 myTaskId.setHostIP(null);
291 myTaskId.setHostName(null);
292 Node noeud = Register.Instance().getNodeOfStub(
293 myTaskId.getHostStub());
294 myTaskId.setHostStub(null);
295 int rankDeaD = myTaskId.getRank();
297 String nomNoeud = noeud.getName();
298 // Register.Instance().removeNodeAt(i);
299 // Register.Instance().removeNode(host.getIP());
300 // System.out.println("fait le remove : taille = " +
301 // Register.Instance().getSize());
303 boolean b = Register.Instance().removeNodeOfName(noeud.getName());
306 System.out.println("Removing Node of rank "
307 + rankDeaD + " : size = "
308 + Register.Instance().getSize());
311 .println("Cannot remove the Node, it doesn't exist anymore: size = "
312 + Register.Instance().getSize());
315 Calendar cal = new GregorianCalendar();
316 System.out.println("At time=" + cal.get(Calendar.MINUTE) + ":"
317 + cal.get(Calendar.SECOND));
319 // retrouver SI POSSIBLE un autre libre pr remplacer celui la pr
322 /**** Sébastien Miquée **/
323 //Node tmpNode = foundToReplaceThisNode(rankDeaD, nomNoeud);
324 Node tmpNode = foundToReplaceThisNode(rankDeaD, nomNoeud, noeud);
326 // broadcastRegister(0);
327 updateConcernedNodes(rankDeaD, noeud, tmpNode);
330 System.out.println("Set scanning on %%%%%%");
331 tmpNode.getStub().setScanning(true);
332 } catch (Exception e) {
333 System.err.println("Unable to setScannig on for the new node: "
337 // Register.Instance().getListeOfTasks().viewAll();
338 for (int z = 0; z < spawnersList.size(); z++)
339 if (!((JaceSpawnerInterface) spawnersList.get(z))
340 .equals(Register.Instance().getSpawnerStub()))
342 ((JaceSpawnerInterface) spawnersList.get(z))
343 .replaceDeamonBy(noeud, tmpNode, rankDeaD);
345 } catch (Exception e) {
347 .println("Unable to broadcast the modifications to all the spawners: "
350 } catch (Exception ee) {
351 System.err.println("Error in signalDeadNode() :" + ee);
355 // verifie si les noeud notes vivant ds le Register.Instance() du SuperNode
357 // retourne 0 si erreur, 1 sinon
359 * private synchronized int scanConnectedHosts() { long time = 0; Node host;
360 * Node tmpNode; long workerTime; long currentTime; int rank; int restempo;
361 * int nb = 0; int nbC = 0; boolean changed = false; int index=0; try{
362 * JaceSpawnerInterface spawnerStub=Register.Instance().getSpawnerStub();
363 * if(spawnerStub.getFinished()==true){
364 * System.out.println("nbre de taches="+Register.Instance().getSize());
365 * ListeTask t=Register.Instance().getListeOfTasks();
366 * for(index=z;index<t.getSize();index++){ TaskId recev = null;
367 * System.out.println("deleting Task************"+index);
369 * recev = t.get(index); JaceInterface stub=recev.getHostStub();
370 * spawnerStub.killApplication(stub); }
374 * } }catch(Exception e){
375 * System.out.println("w aiiiiiiiiiiiiiirrrr"+e+" index="+index); z=index;
378 * if (Register.Instance().getSize() == 0) {
379 * System.out.println("aucun noeuds a scanner");
380 * RunningApplication.Instance().purge(); System.exit(0);
387 // trouver un noeud sur les superNode
388 // pr les requisitionner
390 /*** Sébastien Miquée ***/
392 //private synchronized Node foundToReplaceThisNode(int theRank, String nom) {
393 private synchronized Node foundToReplaceThisNode(int theRank, String nom, Node _n) {
395 boolean found = false;
398 while (found == false) {
401 //node = centralServer.getNewNode(LocalHost.Instance().getIP());
402 node = centralServer.getNewNode(LocalHost.Instance().getIP(), _n);
408 } catch (Exception e) {
409 // trouver un autre superNode et lui demander le noeud a lui
411 System.err.println("Cannot localize SuperNode ! " + e);
419 System.out.println("Using Node " + node.getName() + " ("
420 + node.getIP() + ") in order to replace " + nom
421 + " size before add: " + Register.Instance().getSize()
423 node.setAliveFlag(true);
426 // rajouter le noeud ds le Register
427 node.setAppliName(RunningApplication.Instance().getName());
429 // lui envoyer mon stub pr qu'il commence a me pinguer des
431 // TODO a mettre ds un thread ????
435 * neighborTask=Register.Instance().getListeOfTasks().getTaskIdOfRank
436 * ((theRank+1)%Register.Instance().getListeOfTasks().getSize());
437 * try{ node.getStub().updateHeart(neighborTask.getHostStub()); }
438 * catch(Exception e) {
439 * System.out.println("nvx noeud deja plu dispo2"); //node = null; }
441 // TODO verif pourkoi superNode me le redonne
442 // alors qu'il fait deja du calcul
443 // int is = Register.Instance().existNode(node.getIP());
444 int is = Register.Instance().existNode(node);
446 System.out.println("The Node is already in the register ! I don't add it.");
447 System.out.println("Node " + node.getName() + " not added !") ;
450 Register.Instance().addNode(node);
452 // !!!!!!!!!!!!!!actualiser le ListeTask
453 TaskId myTaskId = Register.Instance().getListeOfTasks()
454 .getTaskIdOfRank(theRank);
455 myTaskId.setHostIP(node.getIP());
456 myTaskId.setHostName(node.getName());
457 myTaskId.setHostStub(node.getStub());
459 // Register.Instance().getListeOfTasks().viewAll();
462 neighborRank = Register.Instance().getSize() - 1;
464 neighborRank = theRank - 1;
465 TaskId neighborTask2 = Register.Instance().getListeOfTasks()
466 .getTaskIdOfRank(neighborRank);
468 JaceInterface jaceStub = neighborTask2.getHostStub();
469 jaceStub.updateHeart(node.getStub());
470 } catch (Exception e) {
471 System.err.println("Next node unreachable ! " + e);
478 System.out.println("I didn't receive a new Node !");
483 public void replaceBy(JaceSpawnerInterface oldStub,
484 JaceSpawnerInterface stub) {
485 int index = spawnersList.indexOf((Object) oldStub);
487 spawnersList.setElementAt(stub, index);
489 System.err.println("Spawner's stub not foud in spawnersList !");
492 public void getNewSpawner(JaceSpawnerInterface previousSpawner) {
493 //boolean found = false;
496 JaceSpawnerInterface spawnerStub = null;
498 // while (found == false) {
500 // TODO : trouver l'erreur !!!
502 // "pas localise le super node java.lang.NullPointerException"
503 if (centralServer == null) {
504 System.err.println("Central Server not localized !");
506 node = centralServer.getNewNode( LocalHost.Instance().getIP(), null ) ;
507 RunningApplication.Instance()
508 .incrementNumberOfSpawnerDisconnections();
510 } catch (Exception e) {
511 // trouver un autre superNode et lui demander le noeud a lui
512 System.err.println("Super Node not localized !\n " + e);
513 // System.out.println("pas localise le super node " + e);
514 // System.out.println("pas localise le super node " + e);
515 // System.out.println("pas localise le super node " + e);
516 // System.out.println("pas localise le super node " + e);
517 // System.out.println("pas localise le super node " + e);
518 // System.out.println("pas localise le super node " + e);
519 // System.out.println("pas localise le super node " + e);
520 System.err.println("My IP : " + LocalHost.Instance().getIP());
521 if (centralServer == null) {
522 System.err.println("CentralServer is NULL !");
528 index = spawnersList.indexOf((Object) previousSpawner);
530 System.out.println("Using Node " + node.getName()
532 + LocalHost.Instance().resolve(node.getName())
533 + ") to replace a dead spawner\n\n");
535 // Register.Instance().viewAll();
536 // Register.Instance().getListeOfTasks().viewAll();
537 spawnerStub = node.getStub().transformIntoSpawner(
546 RunningApplication.Instance()
547 .getNumberOfDisconnections(),
548 RunningApplication.Instance()
549 .getNumberOfSpawnerDisconnections(),
550 nbOfDaemonsPerSpawner, nbOfDeamonsPerThread);
551 spawnersList.setElementAt(spawnerStub, index);
552 new StartProcessThread(index).start();
553 // spawnerStub.startProcess( spawnersList);
554 } catch (Exception e) {
555 System.err.println("Unable to reach the new spawner: " + e);
557 for (int j = 0; j < spawnersList.size(); j++)
559 if (!((JaceSpawnerInterface) spawnersList.get(j))
560 .equals(Register.Instance().getSpawnerStub())
561 && !((JaceSpawnerInterface) spawnersList.get(j))
562 .equals(spawnerStub)) {
564 .println("Trying to broadcast to spawner of rank "
567 ((JaceSpawnerInterface) spawnersList.get(j))
568 .replaceBy(previousSpawner, spawnerStub);
570 } catch (Exception e) {
572 .println("Unable to broadcast to spawner of rank: "
573 + j + ". Error:" + e);
575 ScanThreadSpawner.Instance().setServer(spawnerStub);
579 previous = spawnersList.size() - 1;
581 previous = index - 1;
583 ((JaceSpawnerInterface) spawnersList.get(previous))
584 .updateHeart(spawnerStub);
585 } catch (Exception e) {
587 .println("unable to change the server of the heartbeatThread for the previous node of rank "
588 + previous + ". error:" + e);
592 System.err.println("Node is null !");
597 public void broadcastFinished(boolean bool) {
598 for (int i = 0; i < spawnersList.size(); i++)
600 ((JaceSpawnerInterface) spawnersList.get(i)).setFinished(bool);
601 } catch (Exception e) {
603 .println("Unable to propagate the end of the application :"
608 private synchronized void scanAppliNodes() {
611 //ListeTask tskList = null;
615 JaceSpawnerInterface spawnerStub = Register.Instance()
617 if (spawnerStub.getFinished() == true) {
618 System.out.println("Number of tasks ="
619 + Register.Instance().getSize());
621 int x = Register.Instance().getListeOfTasks().getSize()
622 / nbOfDaemonsPerSpawner;
625 s = (Register.Instance().getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
626 / nbOfDeamonsPerThread;
628 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
630 int debut = nbOfDaemonsPerSpawner * rank;
632 // for(int i=debut;i<nbOfDaemonsPerSpawner*(rank+1) &&
633 // i<reg.getSize();i++)
634 // System.out.println(((Node)nodes.elementAt(i)).getName());
636 ListeTask t = Register.Instance().getListeOfTasks();
637 ScanThreadSpawner.Instance().kill();
638 HeartBeatSpawner.Instance().kill();
639 for (int i = 0; i < s + 1; i++) {
641 new KillThread(i, debut, nbOfDaemonsPerSpawner,
642 nbOfDeamonsPerThread, t).start();
647 long finalTime = RunningApplication.Instance().getChrono()
650 int nbe = RunningApplication.Instance()
651 .getNumberOfDisconnections();
653 int nbsdc = RunningApplication.Instance()
654 .getNumberOfSpawnerDisconnections();
655 System.out.println("Application finished successfully !");
656 // System.out.println("Application finished successfully !!!!!!");
657 // System.out.println("Application finished successfully !!!!!!");
658 // System.out.println("Application finished successfully !!!!!!");
659 // System.out.println("Application finished successfully !!!!!!");
660 // System.out.println("Application finished successfully !!!!!!");
662 // .println("Application finished successfully !!!!!!\n");
663 System.out.println("TOTAL TIME in s : " + (finalTime / 1000));
664 System.out.println("nb of desconnections: " + nbe);
665 System.out.println("nb of spawners desconnections: " + nbsdc);
666 if (JaceDaemon.Instance().isRunning()) {
667 JaceDaemon.Instance().reconnectSuperNode();
669 RunningApplication.Instance().purge();
674 RunningApplication.Instance().purge();
678 } catch (Exception e) {
680 .println("w aiiiiiiiiiiiiiirrrr" + e + " index=" + index);
684 * if (Register.Instance().getSize() == 0) {
685 * System.out.println("aucun noeuds a scanner");
686 * RunningApplication.Instance().purge(); System.exit(0); return 0;
688 * } else{ tskList = Register.Instance().getListeOfTasks();
690 * //si mon appli a besoin d'un noeud //sinon, on fait rien if ((nbTasks
691 * - Register.Instance().getSize()) > 0) { cptReplaced = 0;
693 * //TODO demander des paquet de nodes, pas qu'un //on scanne toutes les
694 * taches de cette appli for (int ind = 0; ind < tskList.getSize();
695 * ind++) { //si 1 tache a pas de noeud, on trouve 1 remplacant
697 * //if (tskList.get(ind).getHostIP() == null) { if
698 * (tskList.get(ind).getHostStub() == null) { rank =
699 * tskList.get(ind).getRank(); node = foundToReplaceThisNodeTMP(rank);
700 * if (node != null) { cptReplaced++; }
704 * //qd fini de scanner taches, envoyer Register //si remplacement de
705 * noeud (c a d si Register modifier) if (cptReplaced != 0) {
706 * broadcastRegister(0); } try { Thread.currentThread().yield(); } catch
709 * }// fin if(appli.getNeededNodes() > 0) {
710 * //System.out.println("SCAN APPLI : taille : " +
711 * Register.Instance().getSize()); return 1; }
715 // @SuppressWarnings("unused")
716 // private synchronized Node foundToReplaceThisNodeTMP(int theRank) {
718 // boolean found = false;
720 // // while (found == false) {
722 // // TODO : trouver l'erreur !!!
724 // // "pas localise le super node java.lang.NullPointerException"
725 // if (centralServer == null) {
726 // System.out.println("centralServer est NUUUUUUUUULL");
728 // node = centralServer.getNewNode(LocalHost.Instance().getIP());
731 // } catch (Exception e) {
732 // // trouver un autre superNode et lui demander le noeud a lui
733 // System.out.println("TMP pas localise le super node " + e);
734 // System.out.println("TMP pas localise le super node " + e);
735 // System.out.println("TMP pas localise le super node " + e);
736 // System.out.println("TMP pas localise le super node " + e);
737 // System.out.println("TMP pas localise le super node " + e);
738 // System.out.println("TMP pas localise le super node " + e);
739 // System.out.println("TMP pas localise le super node " + e);
740 // System.out.println("mon IP : " + LocalHost.Instance().getIP());
741 // if (centralServer == null) {
742 // System.out.println("centralServer : NULL");
744 // connectSuperNode();
747 // if (node != null) {
748 // System.out.println("COOOOOOOOOOOOOOOOOOOOOOL : requisition de "
749 // + node.getName() + " taille avt add: "
750 // + Register.Instance().getSize() + "\n\n");
751 // node.setAliveFlag(true);
752 // node.setAliveTime();
754 // // rajouter le noeud ds le Register
755 // System.out.println("ds Register, manque "
756 // + (nbTasks - Register.Instance().getSize()));
757 // node.setAppliName(RunningApplication.Instance().getName());
759 // // lui envoyer mon stub pr qu'il commence a me pinguer des
761 // // TODO a mettre ds un thread ????
763 // TaskId neighborTask = Register.Instance().getListeOfTasks()
766 // % Register.Instance().getListeOfTasks()
768 // node.getStub().updateHeart(neighborTask.getHostStub());
769 // // node.getStub().updateHeart(this.spawnerRef);
771 // // int is = Register.Instance().existNode(node.getIP());
772 // int is = Register.Instance().existNode(node);
773 // // TODO verif pourkoi superNode me le redonne
774 // // alors qu'il fait deja du calcul
776 // System.out.println("j'ajoute pas le noeud, il y est deja");
777 // System.out.println("PAS AJOUTEE TMP " + node.getName());
778 // System.out.println("PAS AJOUTEE TMP " + node.getName());
779 // System.out.println("PAS AJOUTEE TMP " + node.getName());
780 // System.out.println("PAS AJOUTEE TMP " + node.getName());
781 // System.out.println("PAS AJOUTEE TMP " + node.getName());
784 // Register.Instance().addNode(node);
786 // // !!!!!!!!!!!!!!actualiser le ListeTask
787 // TaskId myTaskId = Register.Instance().getListeOfTasks()
788 // .getTaskIdOfRank(theRank);
789 // myTaskId.setHostIP(node.getIP());
790 // myTaskId.setHostName(node.getName());
791 // myTaskId.setHostStub(node.getStub());
792 // // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostIP(node.getIP());
793 // // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostName(node.getName());
794 // // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostStub(node.getStub());
796 // } catch (Exception e) {
797 // System.out.println("nvx noeud deja plu dispo");
801 // System.out.println("RADINNNNNNNNNNNNNN TMP ");
806 private void exportObject() {
808 JaceSpawnerServer spawnerServer = null;
810 System.out.println("Name of local machine is: "
811 + LocalHost.Instance().getName());
812 System.out.println("IP of local machine is: "
813 + LocalHost.Instance().getIP());
815 // launch the JaceSpawnerServer
816 spawnerServer = new JaceSpawnerServer();
817 java.rmi.registry.LocateRegistry.createRegistry(spawnerPort);
818 java.rmi.registry.LocateRegistry.getRegistry(spawnerPort).rebind(
819 "JaceSpawnerServer", spawnerServer);
820 spawnerRef = (JaceSpawnerInterface) Naming.lookup("rmi://"
821 + LocalHost.Instance().getIP() + ":" + spawnerPort
822 + "/JaceSpawnerServer");
824 } catch (Exception e) {
826 .println("JaceP2P_Error in JaceSpawner.exportObject() when creating the local JaceSpawnerServer "
828 // System.err.println("exit ds JaceSpawner.exportObject");
834 public void connectSuperNode() {
835 System.out.println("I'm looking for a super node");
836 boolean connected = false;
837 if (!(superNode_IP == null)) {
839 System.out.println("Trying to invoke super node "
841 centralServer = (JaceSuperNodeInterface) Naming.lookup("rmi://"
842 + superNode_IP + ":" + superNode_port
844 System.out.println("Succesfully located " + superNode_IP);
846 // add stub and IP in LocalHost to store it until super node
848 LocalHost.Instance().setSuperNodeStub(centralServer);
849 LocalHost.Instance().setSuperNodeIP(superNode_IP);
850 heartTime = centralServer.getSuperNodeBeat();
851 timeBeforeKill = NB_HEART_DECONNECT * heartTime;
854 } catch (Exception e) {
855 System.err.println("Super Node not accessible, try another one (1/2s)");
858 } catch (Exception e1) {
863 if (connected == false) {
865 SuperNodeListe.Instance().staticInitialization();
866 while (connected == false
867 && i < SuperNodeListe.Instance().getListe().size()) {
868 SuperNodeData d = null;
869 d = SuperNodeListe.Instance().getSuperNodeData(i);
871 superNode_IP = LocalHost.Instance().resolve(d.getIP());
872 superNode_port = d.getPort();
873 // superNode_port = d.getPort();
875 System.out.println("Trying to invoke Super Node "
877 centralServer = (JaceSuperNodeInterface) Naming
878 .lookup("rmi://" + superNode_IP + ":"
879 + superNode_port + "/JaceSuperNode");
880 System.out.println("Succesfully located SuperNode "
882 LocalHost.Instance().setSuperNodeStub(centralServer);
883 LocalHost.Instance().setSuperNodeIP(superNode_IP);
884 heartTime = centralServer.getSuperNodeBeat();
885 timeBeforeKill = NB_HEART_DECONNECT * heartTime;
888 } catch (Exception e) {
890 .println("SuperNode "
892 + " not accessible, trying to locate another one in 0.5s\n");
896 } catch (Exception e1) {
902 if (connected == false) {
903 System.err.println("All the Super Nodes in the list are not accessible. I'm unable to connect to the platform !");
909 // get a Register on the SuperNode
910 // completed with the required number of Daemons
912 public synchronized void getRegisterOnSuperNode() {
913 Register registerSpawner = null;
915 boolean recieved = false;
917 System.out.println("Trying to get a Register on the SuperNode");
918 int nbExtraSpawners = 0;
919 if (nbTasks > nbOfDaemonsPerSpawner) {
920 nbExtraSpawners = (nbTasks - 1) / nbOfDaemonsPerSpawner;
925 registerSpawner = centralServer.getRegisterSpawner(LocalHost
926 .Instance().getIP(), nbTasks, (Task) tache, nbTasks
927 + nbExtraSpawners, algo, paramAlgo);
929 } catch (Exception e) {
931 .println("Unable to recieve a register from superNode "
936 if (registerSpawner.getSize() != (nbTasks + nbExtraSpawners)) {
937 System.err.println("I did not recieve enough nodes from superNode!!!! \n killing application !!!!");
938 for (int i = 0; i < registerSpawner.getSize(); i++) {
940 registerSpawner.getNodeAt(i).getStub().reconnectSuperNode();
941 } catch (Exception e) {
942 System.err.println("The reserved node was unable to reconnect to the super node");
948 spawnersList = new Vector<Object>();
949 for (int i = 0; i < nbExtraSpawners && i < registerSpawner.getSize(); i++) {
950 spawnersList.add(registerSpawner.getNodeAt(i
951 * nbOfDaemonsPerSpawner));
952 registerSpawner.removeNode(registerSpawner.getNodeAt(i
953 * nbOfDaemonsPerSpawner));
956 registerSpawner.setNbOfTasks(nbTasks);
957 registerSpawner.setNumBackupNeighbors(nbSavingNodes);
959 * System.out.println("Trying to connect another SuperNode");
960 * connectSuperNode(); try { registerSpawner =
961 * centralServer.getRegisterSpawner(LocalHost.Instance().getIP(),
962 * nbTasks); } catch(Exception e1) {}
965 if (registerSpawner != null) {
966 System.out.println("I received the register");
967 // registerSpawner.setVersion(registerVersion);
968 // registerVersion++;
969 Register.Instance().replaceBy(registerSpawner);
970 System.out.println("It contains " + Register.Instance().getSize()
971 + " Nodes" + " " + nbExtraSpawners + " ExtraSpawners");
973 // set each Node aliveTime value to the Spawner current time
974 for (int i = 0; i < Register.Instance().getSize(); i++) {
975 noeud = Register.Instance().getNodeAt(i);
976 noeud.setAliveFlag(true);
977 noeud.setAliveTime();
981 System.err.println("\n---------------WARNING--------------");
982 System.err.println("No Daemon available on the SuperNode dispo, try later, please");
987 public class TransformThread extends Thread {
991 public TransformThread(int i, Node n) {
999 System.out.println("Trying to transform the spawner ("
1000 + n.getName() + ") of rank " + i);
1001 spawnersList.setElementAt(n.getStub().transformIntoSpawner(
1002 params, appliName, Register.Instance(), nbTasks,
1003 centralServer, i, heartTime, 0, 0, 0,
1004 nbOfDaemonsPerSpawner, nbOfDeamonsPerThread), i);
1005 } catch (Exception e) {
1006 System.err.println("Error while contacting newly acquired spawner ("
1007 + n.getName() + "): " + e);
1009 n = centralServer.getNewNode( LocalHost.Instance().getIP(), null ) ;
1011 new TransformThread(i, n).start();
1012 } catch (Exception e1) {
1013 System.err.println("The Super Node is maybe dead: " + e1) ;
1014 for (int z = 0; z < Register.Instance().getSize(); z++) {
1016 Register.Instance().getNodeAt(z).getStub()
1017 .reconnectSuperNode();
1018 } catch (Exception ez) {
1019 System.err.println("The reserved node was unable to reconnect to the super node: \n"
1029 public class StartProcessThread extends Thread {
1032 public StartProcessThread(int i) {
1040 * while((spawnersList.elementAt(i) instanceof Node)) try{
1041 * System.out.println("waiting till transform of spawner "+i+
1042 * " is finished"); Thread.sleep(20); }catch(Exception e1){}
1045 // System.out.println("start process on spawner of rank "+i);
1046 JaceSpawnerInterface spawnerStub = (JaceSpawnerInterface) spawnersList
1048 spawnerStub.startProcess(spawnersList);
1049 } catch (Exception e) {
1050 e.printStackTrace(System.out);
1051 System.err.println("Unable to start the process on the spawner of rank "
1052 + i + ".error: " + e);
1057 public void createSpawnerNetwork() {
1060 for (i = 0; i < spawnersList.size(); i++) {
1061 n = (Node) spawnersList.elementAt(i);
1063 // Register.Instance().getListeOfTasks().viewAll();
1064 // spawnersList.setElementAt(n.getStub().transformIntoSpawner(
1065 // params, appliName, Register.Instance(),nbTasks, centralServer,i,
1066 // heartTime,0,0),i);
1067 new TransformThread(i, n).start();
1070 // broadcast the Register.Instance() to all the JaceServer
1071 // in order to start each task on the Daemons
1073 spawnersList.add(Register.Instance().getSpawnerStub());
1074 System.out.println(" rank=spawnersList.size()=" + spawnersList.size());
1075 rank = spawnersList.size() - 1;
1076 broadcastRegister(1);
1077 for (int j = 0; j < spawnersList.size(); j++) {
1078 System.out.println("waiting till transform of spawner " + j
1080 while ((spawnersList.elementAt(j) instanceof Node))
1084 } catch (Exception e) {
1087 .println("End Transformation of all spawners. Beginning the computing processes");
1089 for (i = 0; i < spawnersList.size(); i++) {
1091 // while(!(spawnersList.elementAt(i) instanceof
1092 // JaceSpawnerInterface))
1094 new StartProcessThread(i).start();
1097 System.out.println("End create Spawner Network!!!!!!!!!");
1100 public JaceSpawnerInterface getSpawnerResponsibleOn(int rank) {
1101 int id = rank / nbOfDaemonsPerSpawner;
1102 return (JaceSpawnerInterface) spawnersList.get(id);
1105 public void createAppli() {
1110 ListeTask tsk = new ListeTask();
1112 JaceInterface nodeStub = null;
1113 TaskId myTask = null;
1115 System.out.println("appli launched, starting the chrono");
1116 RunningApplication.Instance().getChrono().start();
1118 RunningApplication.Instance().setName(appliName);
1119 RunningApplication.Instance().setNbTasks(nbTasks);
1120 // RunningApplication.Instance().setRegister(Register.Instance());
1122 Register.Instance().setParams(params);
1123 Register.Instance().setAppliName(appliName);
1124 Register.Instance().setSpawnerStub(this.spawnerRef);
1126 // assign a TaskId to each Node of the Register
1127 // and insert the TaskId in tke ListTask
1128 while (i < Register.Instance().getSize() && count < nbTasks) {
1129 tmpNode = Register.Instance().getNodeAt(i);
1130 if (tmpNode.getAliveFlag() == true) {
1131 tmpNode.setAppliName(appliName);
1132 nodeStub = tmpNode.getStub();
1133 nodeName = tmpNode.getName();
1134 nodeIP = tmpNode.getIP();
1136 myTask = new TaskId(appliName, count, nodeStub);
1137 myTask.setHostIP(nodeIP);
1138 myTask.setHostName(nodeName);
1140 tsk.addTask(myTask);
1146 // if not enough Nodes in the Register,
1147 // insert not assigned TaskId in the ListTask
1148 if (count < nbTasks) {
1149 for (int j = count; j < nbTasks; j++) {
1150 tsk.addTask(new TaskId(appliName, j, null));
1152 System.out.println("in Register, misses "
1153 + (nbTasks - Register.Instance().getSize()) + " nodes");
1156 // insert the ListeTask in the Register of the appli
1157 Register.Instance().setListeOfTasks(tsk);
1158 // Register.Instance().getListeOfTasks().viewAll();
1159 RunningApplication.Instance().setRunning(true);
1160 System.out.println("fin create appli");
1163 public class BroadcastSpawner extends Thread {
1166 int nbOfDaemonsPerThread, nbOfDeamonsPerSpawner;
1168 public BroadcastSpawner(int i, int debut, int nbOfDeamonsPerSpawner,
1169 int nbOfDaemonsPerThread) {
1172 this.nbOfDaemonsPerThread = nbOfDaemonsPerThread;
1173 this.nbOfDeamonsPerSpawner = nbOfDeamonsPerSpawner;
1178 for (int index = debut + i * nbOfDaemonsPerThread; index < debut
1179 + i * nbOfDeamonsPerThread + nbOfDeamonsPerThread
1180 && index < debut + nbOfDeamonsPerSpawner
1181 && index < Register.Instance().getListeOfTasks().getSize(); index++) {
1183 Register.Instance().getNodeAt(index).getStub().setSpawner(
1184 Register.Instance().getSpawnerStub());
1185 } catch (Exception e) {
1186 System.out.println("can't change spawner stub on node: "
1187 + Register.Instance().getNodeAt(i).getName()
1194 public class KillThread extends Thread {
1197 int nbOfDaemonsPerThread, nbOfDeamonsPerSpawner;
1200 public KillThread(int i, int debut, int nbOfDeamonsPerSpawner,
1201 int nbOfDaemonsPerThread, ListeTask t) {
1204 this.nbOfDaemonsPerThread = nbOfDaemonsPerThread;
1205 this.nbOfDeamonsPerSpawner = nbOfDeamonsPerSpawner;
1212 for (int index = debut + i * nbOfDaemonsPerThread; index < debut
1213 + i * nbOfDeamonsPerThread + nbOfDeamonsPerThread
1214 && index < debut + nbOfDeamonsPerSpawner
1215 && index < t.getSize(); index++) {
1218 TaskId recev = null;
1219 System.out.println("deleting Task" + index);
1221 recev = t.getTaskIdOfRank(index);
1223 JaceInterface stub = recev.getHostStub();
1224 System.out.println("name=" + recev.getHostName());
1225 noeud = Register.Instance().getNodeOfStub(stub);
1226 noeud.setAppliName(null);
1227 new ReconnectThread(stub, noeud.getName()).start();
1228 Register.Instance().removeNode(noeud);
1229 // LocalHost.Instance().getSpawnerStub().killApplication(stub);
1231 } catch (Exception e) {
1233 System.err.println("error in killThread on node "
1234 + noeud.getName() + ". " + e);
1235 } catch (Exception e2) {
1236 System.err.println("error in error :" + e2);
1242 class ReconnectThread extends Thread {
1243 JaceInterface stub = null;
1246 public ReconnectThread(JaceInterface s, String name) {
1253 // System.out.println("reconnexion SuperNode");
1254 // Register.Instance().getNode(workerIP).getWorkerStub().reconnectSuperNode();
1256 // stub.reconnectSuperNode();
1257 stub.suicide("fin d'appli");
1258 } catch (Exception e) {
1259 System.err.println("can't kill node " + name);
1267 // faire une copie du Register et l'envoyer aux noeuds qui le compose
1268 // car si il est modif en meme tmp, on envoi pas un truc coherent
1269 private synchronized void broadcastRegister(int requete) {
1270 // Register reg = Register.Instance().clone();
1271 Register reg = Register.Instance();
1274 System.out.println("name of spawner: "
1275 + Register.Instance().getSpawnerStub().getName());
1276 // launch 1 thread to send the Register to all the nodes
1277 while (broadcasting == true)
1279 broadcasting = true;
1280 Register.Instance().setSpawnerStub(
1281 Register.Instance().getSpawnerStub());
1282 int x = reg.getListeOfTasks().getSize() / nbOfDaemonsPerSpawner;
1285 if ((reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1286 % nbOfDeamonsPerThread == 0)
1287 s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1288 / nbOfDeamonsPerThread;
1290 s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1291 / nbOfDeamonsPerThread + 1;
1292 else if ((nbOfDaemonsPerSpawner % nbOfDeamonsPerThread) == 0)
1293 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
1295 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread + 1;
1296 int debut = nbOfDaemonsPerSpawner * rank;
1297 System.out.println("rank=" + rank + " debut=" + debut + " s=" + s
1298 + " nbOfDaemonsPerSpawner=" + nbOfDaemonsPerSpawner
1299 + " nbOfDeamonsPerThread=" + nbOfDeamonsPerThread + " x="
1301 for (int i = 0; i < s; i++)
1302 new UpdateRegisterThread(tache, reg, requete, i, debut).start();
1304 * This thread : -updates the goal of the Node beats if necessary
1305 * (stub.updateHeart) -updates the Register on each Node
1306 * (stub.updateRegister)
1308 JaceSpawner.Instance().setBroadcasting(false);
1311 } catch (Exception e) {
1314 } catch (Exception e) {
1316 .println("\n1 node has died during JaceSpawner.broadcastRegister()");
1320 private synchronized void broadcastScanning() {
1321 Register reg = Register.Instance();
1322 while (broadcasting == true)
1325 } catch (Exception e) {
1327 // Register.Instance().viewAll();
1328 Vector<?> nodes = (Vector<?>) Register.Instance().getListOfNodes().clone();
1329 int x = reg.getListeOfTasks().getSize() / nbOfDaemonsPerSpawner;
1332 s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1333 / nbOfDeamonsPerThread;
1335 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
1337 int debut = nbOfDaemonsPerSpawner * rank;
1339 // for(int i=debut;i<nbOfDaemonsPerSpawner*(rank+1) &&
1340 // i<reg.getSize();i++)
1341 // System.out.println(((Node)nodes.elementAt(i)).getName());
1343 for (int i = 0; i < s + 1; i++) {
1345 new StartScanThread(i, nodes, debut).start();
1350 public Register getRegister(int rank) {
1352 ListeTask listOfTasks = Register.Instance().getListeOfTasks();
1353 Vector<Integer> dependencies = getDependencies(rank, listOfTasks.getSize());
1354 Register g = new Register();
1355 ListeTask newListOfTasks = new ListeTask();
1356 g.setAppliName(Register.Instance().getAppliName());
1357 g.setParams(Register.Instance().getParams());
1358 g.setSpawnerStub(Register.Instance().getSpawnerStub());
1359 g.setNbOfTasks(Register.Instance().getNbOfTasks());
1360 // g.setVersion(reg.getVersion());
1361 for (int j = 0; j < dependencies.size(); j++) {
1362 TaskId id = listOfTasks.getTaskIdOfRank(((Integer) dependencies
1363 .elementAt(j)).intValue());
1364 newListOfTasks.addTask(id);
1365 if (id.getHostStub() != null) {
1366 Node noeud = Register.Instance()
1367 .getNodeOfStub(id.getHostStub());
1371 g.setListeOfTasks(newListOfTasks);
1375 private void updateConcernedNodes(int rank, Node oldNode, Node node) {
1376 ListeTask listOfTasks = Register.Instance().getListeOfTasks();
1377 Vector<?> dependencies = getDependencies(rank, listOfTasks.getSize());
1378 System.out.println("la liste des voisins concernes de : " + rank);
1379 for (int z = 0; z < dependencies.size(); z++)
1380 System.out.print(((Integer) dependencies.elementAt(z)).intValue()
1382 System.out.println();
1383 // Register.Instance().setVersion(registerVersion);
1384 // registerVersion++;
1386 .setSpawnerStub(Register.Instance().getSpawnerStub());
1388 if ((dependencies.size() % nbOfDeamonsPerThread) == 0)
1389 s = dependencies.size() / nbOfDeamonsPerThread;
1391 s = dependencies.size() / nbOfDeamonsPerThread + 1;
1392 Register reg = Register.Instance();
1394 for (int j = 0; j < s; j++) {
1395 new UpdateRegisterConcernedThread(dependencies, reg, j, rank,
1396 oldNode, node).start();
1400 private Vector<Integer> getDependencies(int id, int jaceSize) {
1401 // get computing dependencies
1402 Vector<Integer> neighbors = new Vector<Integer>();
1403 int[] dep = tache.getDependencies(id);
1404 for (int z = 0; z < taille(dep); z++)
1405 neighbors.add(dep[z]);
1406 // System.out.println("la liste des voisins de calcul de: "+id+" concerne");
1407 // for(int z=0;z<neighbors.size();z++)
1408 // System.out.print(((Integer)neighbors.elementAt(z)).intValue()+" ");
1409 // System.out.println();
1411 // get convergence neighbors
1413 while (Math.pow(2, d) < jaceSize) {
1414 if (id < Math.pow(2, d) && ((id + Math.pow(2, d)) < jaceSize))
1415 if (!neighbors.contains((Object) ((int) (id + Math.pow(2, d)))))
1416 neighbors.add((int) (id + Math.pow(2, d)));
1417 if (id < Math.pow(2, d + 1) && id >= Math.pow(2, d))
1418 if (!neighbors.contains((Object) ((int) (id - Math.pow(2, d)))))
1419 neighbors.add((int) (id - Math.pow(2, d)));
1423 // get backup neighbors
1424 int nb = Register.Instance().getNumBackupNeighbors();
1427 for (int j = 1; j <= nb; j++) {
1428 // ------------ 1 - for backups "j + n" (to the right of j)
1429 rankOfBackTask = (id + j) % jaceSize;
1430 if (!neighbors.contains((Object) rankOfBackTask))
1431 neighbors.add(rankOfBackTask);
1433 // ------------ 2 - for backups "j - n" (to the left of j)
1436 rankOfBackTask = tmp % jaceSize;
1438 rankOfBackTask = jaceSize - (Math.abs(tmp) % jaceSize);
1440 if (!neighbors.contains((Object) rankOfBackTask))
1441 neighbors.add(rankOfBackTask);
1449 public static int taille(int[] vect) {
1452 while (x < vect.length && vect[x] >= 0) {
1459 class StartScanning extends Thread {
1461 public StartScanning() {
1471 class StartScanThread extends Thread {
1474 int nbOfDeamonsPerThread, nbOfDeamonsPerSpawner;
1476 StartScanThread(int i, Vector<?> nodes, int debut) {
1480 nbOfDeamonsPerThread = JaceSpawner.Instance().getNbOfDeamonsPerThread();
1481 nbOfDeamonsPerSpawner = JaceSpawner.Instance()
1482 .getNbOfDeamonsPerSpawner();
1487 for (index = debut + i * nbOfDeamonsPerThread; index < debut + i
1488 * nbOfDeamonsPerThread + nbOfDeamonsPerThread
1489 && index < debut + nbOfDeamonsPerSpawner
1490 && index < nodes.size(); index++) {
1492 Node node = (Node) nodes.elementAt(index);
1493 JaceInterface stub = node.getStub();
1494 String name = node.getName();
1497 stub.setScanning(true);
1498 // System.out.println("modify scanning to "+name);
1500 } catch (Exception e) {
1501 System.out.println("unable to modify scanning to " + name + ":"
1505 // for(int x=0;x<nodes.size();x++)
1506 // System.out.println(((Node)nodes.elementAt(x)).getName());
1507 // System.out.println("nbre total: "+(index-1));