3 import java.rmi.Naming;
4 import java.rmi.RemoteException;
5 import java.util.Calendar;
6 import java.util.GregorianCalendar;
7 import java.util.Vector;
9 public class JaceSpawner {
12 private Task tache = null;
13 public static JaceSpawner Instance;
14 private static String superNode_IP = null;
15 private int superNode_port = 1098;
16 private static int spawnerPort = 1099;
17 private static JaceSuperNodeInterface centralServer = null;
18 private JaceSpawnerInterface spawnerRef = null;
20 private String appliName;
21 private String[] params = null;
22 @SuppressWarnings("unused")
23 private String protocol;
24 // private int registerVersion=0;
25 final int NB_HEART_DECONNECT = 3;
26 private int heartTime; // frequency of heartBeat
27 @SuppressWarnings("unused")
28 private int timeBeforeKill; // wait 3 non-response of heartBeat before
29 // considering de Daemon as dead
30 private boolean broadcasting = false;
31 @SuppressWarnings("unused")
33 private static int nbOfDaemonsPerSpawner;
34 private static int nbOfDeamonsPerThread;
35 private Vector<Object> spawnersList;
37 private int nbSavingNodes;
39 // Variables for Mapping
41 private double paramAlgo ;
42 private String idAlgo ;
44 public JaceSpawner(String superNode, int port, String comProtocol,
45 String[] args, int nbDaemonPerSpawner, int nbDaemonPerThread,
46 int nbSavingNodes, int _algo, double _paramAlgo) {
47 // superNode_IP = LocalHost.Instance().resolve(superNode);
49 paramAlgo = _paramAlgo ;
51 superNode_IP = superNode;
53 protocol = comProtocol;
54 nbOfDaemonsPerSpawner = nbDaemonPerSpawner;
55 nbOfDeamonsPerThread = nbDaemonPerThread;
56 this.nbSavingNodes = nbSavingNodes;
59 // if less than 2 params (nb of tasks and name of the appli), error
60 System.err.println( "Parameters error !" ) ;
64 nbTasks = new Integer(args[0]).intValue(); // nb of tasks
67 } catch (Exception e) {
68 System.err.println("Number format exception :" + e ) ;
71 appliName = args[1]; // name of the class to launch
72 if (args.length > 2) { // get the eventual param of the appli
73 params = new String[args.length - 2];
74 for (int i = 0; i < params.length; i++) {
75 params[i] = args[2 + i];
79 c = load.load(appliName);
81 tache = ((Task) c.newInstance());
82 tache.setParam(params);
83 tache.setJaceSize(nbTasks);
87 } catch (Exception e) {
88 System.err.println( "Unable to instantiate the class " + e ) ;
98 public JaceSpawner(String[] params, String appliName, Register reg,
99 int nbTasks, JaceSuperNodeInterface snodeStub, int rank,
100 int heartTime, int tag, int nbdc, int nbsdc,
101 int nbDaemonPerSpawner, int nbDaemonPerThread) {
103 nbOfDaemonsPerSpawner = nbDaemonPerSpawner;
104 nbOfDeamonsPerThread = nbDaemonPerThread;
105 if (params.length != 0) {
106 this.params = new String[params.length];
107 for (int i = 0; i < params.length; i++)
108 this.params[i] = params[i];
111 System.err.println( "There is no parameter !" ) ;
113 } catch (Exception e) {
114 System.err.println("Error in copying the parameters: " + e ) ;
116 // System.out.println("xxxxxxxxxxxxxxx reg size="+reg.getSize()+" xxxxxxxxxxxxxx");
117 this.appliName = appliName;
119 this.nbTasks = nbTasks;
120 this.heartTime = heartTime;
121 LocalHost.Instance().setSuperNodeStub(snodeStub);
122 centralServer = snodeStub;
124 Register.Instance().replaceBy(reg);
125 Register.Instance().setSpawnerStub(this.spawnerRef);
126 Register.Instance().getListeOfTasks().viewAll();
130 c = load.load(appliName);
132 tache = ((Task) c.newInstance());
133 tache.setParam(params);
134 tache.setJaceSize(nbTasks);
135 // ****************//
137 } catch (Exception e) {
138 System.err.println("Unable to instantiate the class " + e);
140 RunningApplication.Instance().getChrono().start();
142 RunningApplication.Instance().setName(appliName);
143 RunningApplication.Instance().setNbTasks(nbTasks);
144 RunningApplication.Instance().setRunning(true);
145 RunningApplication.Instance().setNumberOfDisconnections(nbdc);
146 RunningApplication.Instance().setNumberOfSpawnerDisconnections(nbsdc);
147 // System.out.println("+++++++++++++++++++++++++");
150 broadcastRegister(1);
153 * x=Register.Instance().getListeOfTasks().getSize()/nbOfDaemonsPerSpawner
154 * ; int s; if(rank==x)
155 * s=(reg.getListeOfTasks().getSize()%nbOfDaemonsPerSpawner
156 * )/nbOfDeamonsPerThread; else
157 * s=nbOfDaemonsPerSpawner/nbOfDeamonsPerThread;
159 * int debut=nbOfDaemonsPerSpawnerrank;
162 * for(int i=0;i<s+1;i++){
164 * new BroadcastSpawner(i,
165 * debut,nbOfDaemonsPerSpawner,nbOfDeamonsPerThread).start(); }
169 System.out.println("########################");
172 public synchronized static JaceSpawner Instance() {
176 public int getNbOfDeamonsPerThread() {
177 return nbOfDeamonsPerThread;
180 public int getNbOfDeamonsPerSpawner() {
181 return nbOfDaemonsPerSpawner;
184 public void startProcess(Vector<Object> spawnersList) {
185 this.spawnersList = spawnersList;
187 int is = spawnersList.indexOf((Object) Register.Instance()
192 if (is == spawnersList.size() - 1)
195 nextNeighbour = is + 1;
197 * while((spawnersList.elementAt(nextNeighbour) instanceof Node))
199 * System.out.println("waiting till transform of spawner "+nextNeighbour
200 * +" is finished, for setServer"); Thread.sleep(20);
201 * }catch(Exception e1){}
203 HeartBeatSpawner.Instance().setServer(
204 (JaceSpawnerInterface) spawnersList.get(nextNeighbour));
205 HeartBeatSpawner.Instance().setHeartTime(heartTime);
206 HeartBeatSpawner.Instance().start();
207 int previousNeighbour;
209 previousNeighbour = spawnersList.size() - 1;
211 previousNeighbour = is - 1;
212 ScanThreadSpawner.Instance().setHeartTime(heartTime);
213 ScanThreadSpawner.Instance().setServer(
214 (JaceSpawnerInterface) spawnersList.get(previousNeighbour));
215 ScanThreadSpawner.Instance().start();
218 // System.out.println("apres broadcastScanning");
219 new StartScanning().start();
221 System.err.println("Cannot find myself in the spawnersList !");
226 public void setBroadcasting(boolean bool) {
230 public void initialize() {
231 // if(protocol.equals("rmi")){
232 // launch the JaceSpawnerServer
237 // get a Register on the Super Node
238 // completed with the required number of Daemons
239 getRegisterOnSuperNode();
242 createSpawnerNetwork();
247 public void startScanning() {
249 long time = RunningApplication.Instance().getChrono().getValue() / 1000;
250 System.out.println("Start scanning at time: " + time + "s");
251 // lancer le chrono qui gere les heart beat
252 while (RunningApplication.Instance().isRunning() == true) {
253 // 1 etape : scaner tous les "heartTime" milisecondes si les noeuds
254 // enregistes sont encore vivants
255 // res = scanConnectedHosts();
257 // 2 etape : a garder ou pas !!!!! regarder si l'appli est en
258 // attente de noeud pr lui en attribuer 1 nvx
261 Thread.sleep(heartTime);
262 } catch (Exception e) {
265 // /System.out.println("is running = false");
266 if (!JaceDaemon.Instance().isRunning())
270 public synchronized void signalDeadNode(JaceInterface host, int rankOfDead) {
272 TaskId myTaskId = null;
276 RunningApplication.Instance().incrementNumberOfDisconnections();
278 time = RunningApplication.Instance().getChrono().getValue() / 1000;
279 nb = RunningApplication.Instance().getNumberOfDisconnections();
280 nbC = RunningApplication.Instance().getNumberOfCouilles();
281 System.out.println("At time = " + time + "s, NbDisconnection = "
282 + nb + ", NbProblem = " + nbC);
284 // !!!!!!!!!!!!!!actualiser le ListeTask ds le Register
285 myTaskId = Register.Instance().getListeOfTasks()
286 .getTaskIdOfHostStub(host);
287 if (myTaskId == null) {
288 Register.Instance.getListeOfTasks().viewAll();
289 myTaskId = Register.Instance().getListeOfTasks()
290 .getTaskIdOfRank(rankOfDead);
291 JaceInterface deadStub = myTaskId.getHostStub();
292 deadStub.suicide("Not doing a good work");
294 myTaskId.setHostIP(null);
295 myTaskId.setHostName(null);
296 Node noeud = Register.Instance().getNodeOfStub(
297 myTaskId.getHostStub());
298 myTaskId.setHostStub(null);
299 int rankDeaD = myTaskId.getRank();
301 String nomNoeud = noeud.getName();
302 // Register.Instance().removeNodeAt(i);
303 // Register.Instance().removeNode(host.getIP());
304 // System.out.println("fait le remove : taille = " +
305 // Register.Instance().getSize());
307 boolean b = Register.Instance().removeNodeOfName(noeud.getName());
310 System.out.println("Removing Node of rank "
311 + rankDeaD + " : size = "
312 + Register.Instance().getSize());
315 .println("Cannot remove the Node, it doesn't exist anymore: size = "
316 + Register.Instance().getSize());
319 Calendar cal = new GregorianCalendar();
320 System.out.println("At time=" + cal.get(Calendar.MINUTE) + ":"
321 + cal.get(Calendar.SECOND));
323 // retrouver SI POSSIBLE un autre libre pr remplacer celui la pr
326 /**** Sébastien Miquée **/
327 //Node tmpNode = foundToReplaceThisNode(rankDeaD, nomNoeud);
328 Node tmpNode = foundToReplaceThisNode(rankDeaD, nomNoeud, noeud);
330 // broadcastRegister(0);
331 updateConcernedNodes(rankDeaD, noeud, tmpNode);
334 System.out.println("Set scanning on %%%%%%");
335 tmpNode.getStub().setScanning(true);
336 } catch (Exception e) {
337 System.err.println("Unable to setScannig on for the new node: "
341 // Register.Instance().getListeOfTasks().viewAll();
342 for (int z = 0; z < spawnersList.size(); z++)
343 if (!((JaceSpawnerInterface) spawnersList.get(z))
344 .equals(Register.Instance().getSpawnerStub()))
346 ((JaceSpawnerInterface) spawnersList.get(z))
347 .replaceDeamonBy(noeud, tmpNode, rankDeaD);
349 } catch (Exception e) {
351 .println("Unable to broadcast the modifications to all the spawners: "
354 } catch (Exception ee) {
355 System.err.println("Error in signalDeadNode() :" + ee);
359 // verifie si les noeud notes vivant ds le Register.Instance() du SuperNode
361 // retourne 0 si erreur, 1 sinon
363 * private synchronized int scanConnectedHosts() { long time = 0; Node host;
364 * Node tmpNode; long workerTime; long currentTime; int rank; int restempo;
365 * int nb = 0; int nbC = 0; boolean changed = false; int index=0; try{
366 * JaceSpawnerInterface spawnerStub=Register.Instance().getSpawnerStub();
367 * if(spawnerStub.getFinished()==true){
368 * System.out.println("nbre de taches="+Register.Instance().getSize());
369 * ListeTask t=Register.Instance().getListeOfTasks();
370 * for(index=z;index<t.getSize();index++){ TaskId recev = null;
371 * System.out.println("deleting Task************"+index);
373 * recev = t.get(index); JaceInterface stub=recev.getHostStub();
374 * spawnerStub.killApplication(stub); }
378 * } }catch(Exception e){
379 * System.out.println("w aiiiiiiiiiiiiiirrrr"+e+" index="+index); z=index;
382 * if (Register.Instance().getSize() == 0) {
383 * System.out.println("aucun noeuds a scanner");
384 * RunningApplication.Instance().purge(); System.exit(0);
391 // trouver un noeud sur les superNode
392 // pr les requisitionner
394 /*** Sébastien Miquée ***/
396 //private synchronized Node foundToReplaceThisNode(int theRank, String nom) {
397 private synchronized Node foundToReplaceThisNode(int theRank, String nom, Node _n) {
399 boolean found = false;
402 while (found == false) {
405 //node = centralServer.getNewNode(LocalHost.Instance().getIP());
406 node = centralServer.getNewNode(LocalHost.Instance().getIP(), _n);
412 Thread.sleep( 1000 ) ;
413 System.out.println("Pas de bon retour !");
415 } catch (Exception e) {
416 // trouver un autre superNode et lui demander le noeud a lui
418 System.err.println("Cannot localize SuperNode ! " + e);
426 System.out.println("Using Node " + node.getName() + " ("
427 + node.getIP() + ") in order to replace " + nom
428 + " size before add: " + Register.Instance().getSize()
430 node.setAliveFlag(true);
433 // rajouter le noeud ds le Register
434 node.setAppliName(RunningApplication.Instance().getName());
436 // lui envoyer mon stub pr qu'il commence a me pinguer des
438 // TODO a mettre ds un thread ????
442 * neighborTask=Register.Instance().getListeOfTasks().getTaskIdOfRank
443 * ((theRank+1)%Register.Instance().getListeOfTasks().getSize());
444 * try{ node.getStub().updateHeart(neighborTask.getHostStub()); }
445 * catch(Exception e) {
446 * System.out.println("nvx noeud deja plu dispo2"); //node = null; }
448 // TODO verif pourkoi superNode me le redonne
449 // alors qu'il fait deja du calcul
450 // int is = Register.Instance().existNode(node.getIP());
451 int is = Register.Instance().existNode(node);
453 System.out.println("The Node is already in the register ! I don't add it.");
454 System.out.println("Node " + node.getName() + " not added !") ;
457 Register.Instance().addNode(node);
459 // !!!!!!!!!!!!!!actualiser le ListeTask
460 TaskId myTaskId = Register.Instance().getListeOfTasks()
461 .getTaskIdOfRank(theRank);
462 myTaskId.setHostIP(node.getIP());
463 myTaskId.setHostName(node.getName());
464 myTaskId.setHostStub(node.getStub());
466 // Register.Instance().getListeOfTasks().viewAll();
469 neighborRank = Register.Instance().getSize() - 1;
471 neighborRank = theRank - 1;
472 TaskId neighborTask2 = Register.Instance().getListeOfTasks()
473 .getTaskIdOfRank(neighborRank);
475 JaceInterface jaceStub = neighborTask2.getHostStub();
476 jaceStub.updateHeart(node.getStub());
477 } catch (Exception e) {
478 System.err.println("Next node unreachable ! " + e);
485 System.out.println("I didn't receive a new Node !");
490 public void replaceBy(JaceSpawnerInterface oldStub,
491 JaceSpawnerInterface stub) {
492 int index = spawnersList.indexOf((Object) oldStub);
494 spawnersList.setElementAt(stub, index);
496 System.err.println("Spawner's stub not foud in spawnersList !");
499 public void getNewSpawner(JaceSpawnerInterface previousSpawner) {
500 //boolean found = false;
503 JaceSpawnerInterface spawnerStub = null;
505 // while (found == false) {
507 // TODO : trouver l'erreur !!!
509 // "pas localise le super node java.lang.NullPointerException"
510 if (centralServer == null) {
511 System.err.println("Central Server not localized !");
513 node = centralServer.getNewNode( LocalHost.Instance().getIP(), null ) ;
514 RunningApplication.Instance()
515 .incrementNumberOfSpawnerDisconnections();
517 } catch (Exception e) {
518 // trouver un autre superNode et lui demander le noeud a lui
519 System.err.println("Super Node not localized !\n " + e);
520 // System.out.println("pas localise le super node " + e);
521 // System.out.println("pas localise le super node " + e);
522 // System.out.println("pas localise le super node " + e);
523 // System.out.println("pas localise le super node " + e);
524 // System.out.println("pas localise le super node " + e);
525 // System.out.println("pas localise le super node " + e);
526 // System.out.println("pas localise le super node " + e);
527 System.err.println("My IP : " + LocalHost.Instance().getIP());
528 if (centralServer == null) {
529 System.err.println("CentralServer is NULL !");
535 index = spawnersList.indexOf((Object) previousSpawner);
537 System.out.println("Using Node " + node.getName()
539 + LocalHost.Instance().resolve(node.getName())
540 + ") to replace a dead spawner\n\n");
542 // Register.Instance().viewAll();
543 // Register.Instance().getListeOfTasks().viewAll();
544 spawnerStub = node.getStub().transformIntoSpawner(
553 RunningApplication.Instance()
554 .getNumberOfDisconnections(),
555 RunningApplication.Instance()
556 .getNumberOfSpawnerDisconnections(),
557 nbOfDaemonsPerSpawner, nbOfDeamonsPerThread);
558 spawnersList.setElementAt(spawnerStub, index);
559 new StartProcessThread(index).start();
560 // spawnerStub.startProcess( spawnersList);
561 } catch (Exception e) {
562 System.err.println("Unable to reach the new spawner: " + e);
564 for (int j = 0; j < spawnersList.size(); j++)
566 if (!((JaceSpawnerInterface) spawnersList.get(j))
567 .equals(Register.Instance().getSpawnerStub())
568 && !((JaceSpawnerInterface) spawnersList.get(j))
569 .equals(spawnerStub)) {
571 .println("Trying to broadcast to spawner of rank "
574 ((JaceSpawnerInterface) spawnersList.get(j))
575 .replaceBy(previousSpawner, spawnerStub);
577 } catch (Exception e) {
579 .println("Unable to broadcast to spawner of rank: "
580 + j + ". Error:" + e);
582 ScanThreadSpawner.Instance().setServer(spawnerStub);
586 previous = spawnersList.size() - 1;
588 previous = index - 1;
590 ((JaceSpawnerInterface) spawnersList.get(previous))
591 .updateHeart(spawnerStub);
592 } catch (Exception e) {
594 .println("Unable to change the server of the heartbeatThread for the previous node of rank "
595 + previous + ". error:" + e);
599 System.err.println("Node is null !");
604 public void broadcastFinished(boolean bool) {
605 for (int i = 0; i < spawnersList.size(); i++)
607 ((JaceSpawnerInterface) spawnersList.get(i)).setFinished(bool);
608 } catch (Exception e) {
610 .println("Unable to propagate the end of the application :"
615 private synchronized void scanAppliNodes() {
618 //ListeTask tskList = null;
622 JaceSpawnerInterface spawnerStub = Register.Instance()
624 if (spawnerStub.getFinished() == true) {
625 System.out.println("Number of tasks ="
626 + Register.Instance().getSize());
628 int x = Register.Instance().getListeOfTasks().getSize()
629 / nbOfDaemonsPerSpawner;
632 s = (Register.Instance().getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
633 / nbOfDeamonsPerThread;
635 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
637 int debut = nbOfDaemonsPerSpawner * rank;
639 // for(int i=debut;i<nbOfDaemonsPerSpawner*(rank+1) &&
640 // i<reg.getSize();i++)
641 // System.out.println(((Node)nodes.elementAt(i)).getName());
643 ListeTask t = Register.Instance().getListeOfTasks();
644 ScanThreadSpawner.Instance().kill();
645 HeartBeatSpawner.Instance().kill();
647 for (int i = 0; i < s + 1; i++) {
649 new KillThread(i, debut, nbOfDaemonsPerSpawner,
650 nbOfDeamonsPerThread, t).start();
655 long finalTime = RunningApplication.Instance().getChrono()
658 int nbe = RunningApplication.Instance()
659 .getNumberOfDisconnections();
661 int nbsdc = RunningApplication.Instance()
662 .getNumberOfSpawnerDisconnections();
664 System.out.println("Application finished successfully !");
665 // System.out.println("Application finished successfully !!!!!!");
666 // System.out.println("Application finished successfully !!!!!!");
667 // System.out.println("Application finished successfully !!!!!!");
668 // System.out.println("Application finished successfully !!!!!!");
669 // System.out.println("Application finished successfully !!!!!!");
671 // .println("Application finished successfully !!!!!!\n");
672 System.out.println("TOTAL TIME in s : " + (finalTime / 1000));
673 System.out.println("nb of desconnections: " + nbe);
674 System.out.println("nb of spawners desconnections: " + nbsdc);
675 if (JaceDaemon.Instance().isRunning()) {
676 JaceDaemon.Instance().reconnectSuperNode();
678 RunningApplication.Instance().purge();
683 RunningApplication.Instance().purge();
687 /** Suprresion of the mapping algorithm on the SuperNode **/
688 centralServer.removeAlgo( idAlgo, 0 ) ;
690 } catch( Exception e ) {
691 System.err.println( "Error the application nodes scan!\n " + e ) ;
695 * if (Register.Instance().getSize() == 0) {
696 * System.out.println("aucun noeuds a scanner");
697 * RunningApplication.Instance().purge(); System.exit(0); return 0;
699 * } else{ tskList = Register.Instance().getListeOfTasks();
701 * //si mon appli a besoin d'un noeud //sinon, on fait rien if ((nbTasks
702 * - Register.Instance().getSize()) > 0) { cptReplaced = 0;
704 * //TODO demander des paquet de nodes, pas qu'un //on scanne toutes les
705 * taches de cette appli for (int ind = 0; ind < tskList.getSize();
706 * ind++) { //si 1 tache a pas de noeud, on trouve 1 remplacant
708 * //if (tskList.get(ind).getHostIP() == null) { if
709 * (tskList.get(ind).getHostStub() == null) { rank =
710 * tskList.get(ind).getRank(); node = foundToReplaceThisNodeTMP(rank);
711 * if (node != null) { cptReplaced++; }
715 * //qd fini de scanner taches, envoyer Register //si remplacement de
716 * noeud (c a d si Register modifier) if (cptReplaced != 0) {
717 * broadcastRegister(0); } try { Thread.currentThread().yield(); } catch
720 * }// fin if(appli.getNeededNodes() > 0) {
721 * //System.out.println("SCAN APPLI : taille : " +
722 * Register.Instance().getSize()); return 1; }
726 // @SuppressWarnings("unused")
727 // private synchronized Node foundToReplaceThisNodeTMP(int theRank) {
729 // boolean found = false;
731 // // while (found == false) {
733 // // TODO : trouver l'erreur !!!
735 // // "pas localise le super node java.lang.NullPointerException"
736 // if (centralServer == null) {
737 // System.out.println("centralServer est NUUUUUUUUULL");
739 // node = centralServer.getNewNode(LocalHost.Instance().getIP());
742 // } catch (Exception e) {
743 // // trouver un autre superNode et lui demander le noeud a lui
744 // System.out.println("TMP pas localise le super node " + e);
745 // System.out.println("TMP pas localise le super node " + e);
746 // System.out.println("TMP pas localise le super node " + e);
747 // System.out.println("TMP pas localise le super node " + e);
748 // System.out.println("TMP pas localise le super node " + e);
749 // System.out.println("TMP pas localise le super node " + e);
750 // System.out.println("TMP pas localise le super node " + e);
751 // System.out.println("mon IP : " + LocalHost.Instance().getIP());
752 // if (centralServer == null) {
753 // System.out.println("centralServer : NULL");
755 // connectSuperNode();
758 // if (node != null) {
759 // System.out.println("COOOOOOOOOOOOOOOOOOOOOOL : requisition de "
760 // + node.getName() + " taille avt add: "
761 // + Register.Instance().getSize() + "\n\n");
762 // node.setAliveFlag(true);
763 // node.setAliveTime();
765 // // rajouter le noeud ds le Register
766 // System.out.println("ds Register, manque "
767 // + (nbTasks - Register.Instance().getSize()));
768 // node.setAppliName(RunningApplication.Instance().getName());
770 // // lui envoyer mon stub pr qu'il commence a me pinguer des
772 // // TODO a mettre ds un thread ????
774 // TaskId neighborTask = Register.Instance().getListeOfTasks()
777 // % Register.Instance().getListeOfTasks()
779 // node.getStub().updateHeart(neighborTask.getHostStub());
780 // // node.getStub().updateHeart(this.spawnerRef);
782 // // int is = Register.Instance().existNode(node.getIP());
783 // int is = Register.Instance().existNode(node);
784 // // TODO verif pourkoi superNode me le redonne
785 // // alors qu'il fait deja du calcul
787 // System.out.println("j'ajoute pas le noeud, il y est deja");
788 // System.out.println("PAS AJOUTEE TMP " + node.getName());
789 // System.out.println("PAS AJOUTEE TMP " + node.getName());
790 // System.out.println("PAS AJOUTEE TMP " + node.getName());
791 // System.out.println("PAS AJOUTEE TMP " + node.getName());
792 // System.out.println("PAS AJOUTEE TMP " + node.getName());
795 // Register.Instance().addNode(node);
797 // // !!!!!!!!!!!!!!actualiser le ListeTask
798 // TaskId myTaskId = Register.Instance().getListeOfTasks()
799 // .getTaskIdOfRank(theRank);
800 // myTaskId.setHostIP(node.getIP());
801 // myTaskId.setHostName(node.getName());
802 // myTaskId.setHostStub(node.getStub());
803 // // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostIP(node.getIP());
804 // // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostName(node.getName());
805 // // Register.Instance().getListeOfTasks().getTaskIdOfRank(theRank).setHostStub(node.getStub());
807 // } catch (Exception e) {
808 // System.out.println("nvx noeud deja plu dispo");
812 // System.out.println("RADINNNNNNNNNNNNNN TMP ");
817 private void exportObject() {
819 JaceSpawnerServer spawnerServer = null;
821 System.out.println("Name of local machine is: "
822 + LocalHost.Instance().getName());
823 System.out.println("IP of local machine is: "
824 + LocalHost.Instance().getIP());
826 // launch the JaceSpawnerServer
827 spawnerServer = new JaceSpawnerServer();
828 java.rmi.registry.LocateRegistry.createRegistry(spawnerPort);
829 java.rmi.registry.LocateRegistry.getRegistry(spawnerPort).rebind(
830 "JaceSpawnerServer", spawnerServer);
831 spawnerRef = (JaceSpawnerInterface) Naming.lookup("rmi://"
832 + LocalHost.Instance().getIP() + ":" + spawnerPort
833 + "/JaceSpawnerServer");
835 } catch (Exception e) {
837 .println("JaceP2P_Error in JaceSpawner.exportObject() when creating the local JaceSpawnerServer "
839 // System.err.println("exit ds JaceSpawner.exportObject");
845 public void connectSuperNode() {
846 System.out.println("I'm looking for a super node");
847 boolean connected = false;
848 if (!(superNode_IP == null)) {
850 System.out.println("Trying to invoke super node "
852 centralServer = (JaceSuperNodeInterface) Naming.lookup("rmi://"
853 + superNode_IP + ":" + superNode_port
855 System.out.println("Succesfully located " + superNode_IP);
857 // add stub and IP in LocalHost to store it until super node
859 LocalHost.Instance().setSuperNodeStub(centralServer);
860 LocalHost.Instance().setSuperNodeIP(superNode_IP);
861 heartTime = centralServer.getSuperNodeBeat();
862 timeBeforeKill = NB_HEART_DECONNECT * heartTime;
865 } catch (Exception e) {
866 System.err.println("Super Node not accessible, try another one (1/2s)");
869 } catch (Exception e1) {
874 if (connected == false) {
876 SuperNodeListe.Instance().staticInitialization();
877 while (connected == false
878 && i < SuperNodeListe.Instance().getListe().size()) {
879 SuperNodeData d = null;
880 d = SuperNodeListe.Instance().getSuperNodeData(i);
882 superNode_IP = LocalHost.Instance().resolve(d.getIP());
883 superNode_port = d.getPort();
884 // superNode_port = d.getPort();
886 System.out.println("Trying to invoke Super Node "
888 centralServer = (JaceSuperNodeInterface) Naming
889 .lookup("rmi://" + superNode_IP + ":"
890 + superNode_port + "/JaceSuperNode");
891 System.out.println("Succesfully located SuperNode "
893 LocalHost.Instance().setSuperNodeStub(centralServer);
894 LocalHost.Instance().setSuperNodeIP(superNode_IP);
895 heartTime = centralServer.getSuperNodeBeat();
896 timeBeforeKill = NB_HEART_DECONNECT * heartTime;
899 } catch (Exception e) {
901 .println("SuperNode "
903 + " not accessible, trying to locate another one in 0.5s\n");
907 } catch (Exception e1) {
913 if (connected == false) {
914 System.err.println("All the Super Nodes in the list are not accessible. I'm unable to connect to the platform !");
920 // get a Register on the SuperNode
921 // completed with the required number of Daemons
923 public synchronized void getRegisterOnSuperNode() {
924 Register registerSpawner = null;
926 boolean recieved = false;
928 System.out.println("Trying to get a Register on the SuperNode");
929 int nbExtraSpawners = 0;
930 if (nbTasks > nbOfDaemonsPerSpawner) {
931 nbExtraSpawners = (nbTasks - 1) / nbOfDaemonsPerSpawner;
936 registerSpawner = centralServer.getRegisterSpawner(LocalHost
937 .Instance().getIP(), nbTasks, (Task) tache, nbTasks
938 + nbExtraSpawners, algo, paramAlgo);
940 } catch (Exception e) {
942 .println("Unable to recieve a register from superNode "
948 idAlgo = LocalHost.Instance().getIP() ;
950 if (registerSpawner.getSize() != (nbTasks + nbExtraSpawners)) {
951 System.err.println("I did not recieve enough nodes from superNode!!!! \n killing application !!!!");
952 for (int i = 0; i < registerSpawner.getSize(); i++) {
954 registerSpawner.getNodeAt(i).getStub().reconnectSuperNode();
955 } catch (Exception e) {
956 System.err.println("The reserved node was unable to reconnect to the super node");
962 spawnersList = new Vector<Object>();
963 for (int i = 0; i < nbExtraSpawners && i < registerSpawner.getSize(); i++) {
964 spawnersList.add(registerSpawner.getNodeAt(0));
965 // * nbOfDaemonsPerSpawner));
966 registerSpawner.removeNodeOfName(registerSpawner.getNodeAt(0).getName());
967 // * nbOfDaemonsPerSpawner));
970 registerSpawner.setNbOfTasks(nbTasks);
971 registerSpawner.setNumBackupNeighbors(nbSavingNodes);
973 * System.out.println("Trying to connect another SuperNode");
974 * connectSuperNode(); try { registerSpawner =
975 * centralServer.getRegisterSpawner(LocalHost.Instance().getIP(),
976 * nbTasks); } catch(Exception e1) {}
979 if (registerSpawner != null) {
980 System.out.println("I received the register");
981 // registerSpawner.setVersion(registerVersion);
982 // registerVersion++;
983 Register.Instance().replaceBy(registerSpawner);
984 System.out.println("It contains " + Register.Instance().getSize()
985 + " Nodes" + " " + nbExtraSpawners + " ExtraSpawners");
987 // set each Node aliveTime value to the Spawner current time
988 for (int i = 0; i < Register.Instance().getSize(); i++) {
989 noeud = Register.Instance().getNodeAt(i);
990 noeud.setAliveFlag(true);
991 noeud.setAliveTime();
995 System.err.println("\n---------------WARNING--------------");
996 System.err.println("No Daemon available on the SuperNode dispo, try later, please");
1003 * Set the identifier of the mapping algorithm used.
1004 * @param _s The mapping identifier
1006 * @author Sébastien Miquée
1008 public void setIdAlgo( String _s ) throws RemoteException
1013 public class TransformThread extends Thread {
1017 public TransformThread(int i, Node n) {
1025 System.out.println("Trying to transform the spawner ("
1026 + n.getName() + ") of rank " + i);
1027 spawnersList.setElementAt(n.getStub().transformIntoSpawner(
1028 params, appliName, Register.Instance(), nbTasks,
1029 centralServer, i, heartTime, 0, 0, 0,
1030 nbOfDaemonsPerSpawner, nbOfDeamonsPerThread), i);
1031 } catch (Exception e) {
1032 System.err.println("Error while contacting newly acquired spawner ("
1033 + n.getName() + "): " + e);
1035 n = centralServer.getNewNode( LocalHost.Instance().getIP(), null ) ;
1037 new TransformThread(i, n).start();
1038 } catch (Exception e1) {
1039 System.err.println("The Super Node is maybe dead: " + e1) ;
1040 for (int z = 0; z < Register.Instance().getSize(); z++) {
1042 Register.Instance().getNodeAt(z).getStub()
1043 .reconnectSuperNode();
1044 } catch (Exception ez) {
1045 System.err.println("The reserved node was unable to reconnect to the super node: \n"
1055 public class StartProcessThread extends Thread {
1058 public StartProcessThread(int i) {
1066 * while((spawnersList.elementAt(i) instanceof Node)) try{
1067 * System.out.println("waiting till transform of spawner "+i+
1068 * " is finished"); Thread.sleep(20); }catch(Exception e1){}
1070 // System.out.println("start process on spawner of rank "+i);
1071 JaceSpawnerInterface spawnerStub = (JaceSpawnerInterface) spawnersList.get(i);
1072 spawnerStub.startProcess(spawnersList);
1073 } catch (Exception e) {
1074 e.printStackTrace(System.out);
1075 System.err.println("Unable to start the process on the spawner of rank "
1076 + i + ".error: " + e);
1081 public void createSpawnerNetwork() {
1084 for (i = 0; i < spawnersList.size(); i++) {
1085 n = (Node) spawnersList.elementAt(i);
1087 // Register.Instance().getListeOfTasks().viewAll();
1088 // spawnersList.setElementAt(n.getStub().transformIntoSpawner(
1089 // params, appliName, Register.Instance(),nbTasks, centralServer,i,
1090 // heartTime,0,0),i);
1091 new TransformThread(i, n).start();
1094 // broadcast the Register.Instance() to all the JaceServer
1095 // in order to start each task on the Daemons
1097 spawnersList.add(Register.Instance().getSpawnerStub());
1098 System.out.println(" rank="+rank+" spawnersList.size()=" + spawnersList.size());
1099 rank = spawnersList.size() - 1;
1101 broadcastRegister(1);
1103 for (int j = 0; j < spawnersList.size(); j++) {
1104 System.out.println("waiting till transform of spawner " + j
1106 while ((spawnersList.elementAt(j) instanceof Node))
1111 } catch (Exception e) {
1116 ((JaceSpawnerInterface)spawnersList.elementAt( i )).setIdAlgo( idAlgo ) ;
1117 } catch (RemoteException e) {
1118 System.err.println( "Unable to set Mapping Algorithm identifier" ) ;
1119 e.printStackTrace();
1124 System.out.println("End Transformation of all spawners. Beginning the computing processes");
1126 for (i = 0; i < spawnersList.size(); i++) {
1128 // while(!(spawnersList.elementAt(i) instanceof
1129 // JaceSpawnerInterface))
1131 new StartProcessThread(i).start();
1134 System.out.println("End create Spawner Network!!!!!!!!!");
1137 public JaceSpawnerInterface getSpawnerResponsibleOn(int rank) {
1138 int id = rank / nbOfDaemonsPerSpawner;
1139 return (JaceSpawnerInterface) spawnersList.get(id);
1142 public void createAppli() {
1147 ListeTask tsk = new ListeTask();
1149 JaceInterface nodeStub = null;
1150 TaskId myTask = null;
1152 System.out.println("Application launched, starting the chronometer");
1153 RunningApplication.Instance().getChrono().start();
1155 RunningApplication.Instance().setName(appliName);
1156 RunningApplication.Instance().setNbTasks(nbTasks);
1157 // RunningApplication.Instance().setRegister(Register.Instance());
1159 Register.Instance().setParams(params);
1160 Register.Instance().setAppliName(appliName);
1161 Register.Instance().setSpawnerStub(this.spawnerRef);
1163 // assign a TaskId to each Node of the Register
1164 // and insert the TaskId in tke ListTask
1165 while (i < Register.Instance().getSize() && count < nbTasks) {
1166 tmpNode = Register.Instance().getNodeAt(i);
1167 if (tmpNode.getAliveFlag() == true) {
1168 tmpNode.setAppliName(appliName);
1169 nodeStub = tmpNode.getStub();
1170 nodeName = tmpNode.getName();
1171 nodeIP = tmpNode.getIP();
1173 myTask = new TaskId(appliName, count, nodeStub);
1174 myTask.setHostIP(nodeIP);
1175 myTask.setHostName(nodeName);
1177 tsk.addTask(myTask);
1183 // if not enough Nodes in the Register,
1184 // insert not assigned TaskId in the ListTask
1185 if (count < nbTasks) {
1186 for (int j = count; j < nbTasks; j++) {
1187 tsk.addTask(new TaskId(appliName, j, null));
1189 System.out.println("in Register, misses "
1190 + (nbTasks - Register.Instance().getSize()) + " nodes");
1193 // insert the ListeTask in the Register of the appli
1194 Register.Instance().setListeOfTasks(tsk);
1195 // Register.Instance().getListeOfTasks().viewAll();
1196 RunningApplication.Instance().setRunning(true);
1197 System.out.println("fin create appli");
1200 public class BroadcastSpawner extends Thread {
1203 int nbOfDaemonsPerThread, nbOfDeamonsPerSpawner;
1205 public BroadcastSpawner(int i, int debut, int nbOfDeamonsPerSpawner,
1206 int nbOfDaemonsPerThread) {
1209 this.nbOfDaemonsPerThread = nbOfDaemonsPerThread;
1210 this.nbOfDeamonsPerSpawner = nbOfDeamonsPerSpawner;
1215 for (int index = debut + i * nbOfDaemonsPerThread; index < debut
1216 + i * nbOfDeamonsPerThread + nbOfDeamonsPerThread
1217 && index < debut + nbOfDeamonsPerSpawner
1218 && index < Register.Instance().getListeOfTasks().getSize(); index++) {
1220 Register.Instance().getNodeAt(index).getStub().setSpawner(
1221 Register.Instance().getSpawnerStub());
1222 } catch (Exception e) {
1223 System.out.println("can't change spawner stub on node: "
1224 + Register.Instance().getNodeAt(i).getName()
1231 public class KillThread extends Thread {
1234 int nbOfDaemonsPerThread, nbOfDeamonsPerSpawner;
1237 public KillThread(int i, int debut, int nbOfDeamonsPerSpawner,
1238 int nbOfDaemonsPerThread, ListeTask t) {
1241 this.nbOfDaemonsPerThread = nbOfDaemonsPerThread;
1242 this.nbOfDeamonsPerSpawner = nbOfDeamonsPerSpawner;
1249 for (int index = debut + i * nbOfDaemonsPerThread; index < debut
1250 + i * nbOfDeamonsPerThread + nbOfDeamonsPerThread
1251 && index < debut + nbOfDeamonsPerSpawner
1252 && index < t.getSize(); index++) {
1255 TaskId recev = null;
1256 System.out.println("deleting Task" + index);
1258 recev = t.getTaskIdOfRank(index);
1260 JaceInterface stub = recev.getHostStub();
1261 System.out.println("name=" + recev.getHostName());
1262 noeud = Register.Instance().getNodeOfStub(stub);
1263 noeud.setAppliName(null);
1264 new ReconnectThread(stub, noeud.getName()).start();
1265 Register.Instance().removeNode(noeud);
1266 // LocalHost.Instance().getSpawnerStub().killApplication(stub);
1268 } catch (Exception e) {
1270 System.err.println("error in killThread on node "
1271 + noeud.getName() + ". " + e);
1272 } catch (Exception e2) {
1273 System.err.println("error in error :" + e2);
1279 class ReconnectThread extends Thread {
1280 JaceInterface stub = null;
1283 public ReconnectThread(JaceInterface s, String name) {
1290 // System.out.println("reconnexion SuperNode");
1291 // Register.Instance().getNode(workerIP).getWorkerStub().reconnectSuperNode();
1293 // stub.reconnectSuperNode();
1294 stub.suicide("fin d'appli");
1295 } catch (Exception e) {
1296 System.err.println("can't kill node " + name);
1304 // faire une copie du Register et l'envoyer aux noeuds qui le compose
1305 // car si il est modif en meme tmp, on envoi pas un truc coherent
1306 private synchronized void broadcastRegister(int requete) {
1307 // Register reg = Register.Instance().clone();
1308 Register reg = Register.Instance();
1311 System.out.println("name of spawner: "
1312 + Register.Instance().getSpawnerStub().getName());
1313 // launch 1 thread to send the Register to all the nodes
1314 while (broadcasting == true)
1316 broadcasting = true;
1317 // Register.Instance().setSpawnerStub(
1318 // Register.Instance().getSpawnerStub());
1319 int x = reg.getListeOfTasks().getSize() / nbOfDaemonsPerSpawner;
1322 if ((reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1323 % nbOfDeamonsPerThread == 0)
1324 s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1325 / nbOfDeamonsPerThread;
1327 s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1328 / nbOfDeamonsPerThread + 1;
1329 else if ((nbOfDaemonsPerSpawner % nbOfDeamonsPerThread) == 0)
1330 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
1332 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread + 1;
1333 int debut = nbOfDaemonsPerSpawner * rank;
1334 System.out.println("rank=" + rank + " debut=" + debut + " s=" + s
1335 + " nbOfDaemonsPerSpawner=" + nbOfDaemonsPerSpawner
1336 + " nbOfDeamonsPerThread=" + nbOfDeamonsPerThread + " x="
1338 for (int i = 0; i < s; i++)
1339 new UpdateRegisterThread(tache, reg, requete, i, debut).start();
1341 * This thread : -updates the goal of the Node beats if necessary
1342 * (stub.updateHeart) -updates the Register on each Node
1343 * (stub.updateRegister)
1345 JaceSpawner.Instance().setBroadcasting(false);
1348 } catch (Exception e) {
1351 } catch (Exception e) {
1353 .println("\n1 node has died during JaceSpawner.broadcastRegister()");
1357 private synchronized void broadcastScanning() {
1358 Register reg = Register.Instance();
1359 while (broadcasting == true)
1362 } catch (Exception e) {
1364 // Register.Instance().viewAll();
1365 Vector<?> nodes = (Vector<?>) Register.Instance().getListOfNodes().clone();
1366 int x = reg.getListeOfTasks().getSize() / nbOfDaemonsPerSpawner;
1369 s = (reg.getListeOfTasks().getSize() % nbOfDaemonsPerSpawner)
1370 / nbOfDeamonsPerThread;
1372 s = nbOfDaemonsPerSpawner / nbOfDeamonsPerThread;
1374 int debut = nbOfDaemonsPerSpawner * rank;
1376 // for(int i=debut;i<nbOfDaemonsPerSpawner*(rank+1) &&
1377 // i<reg.getSize();i++)
1378 // System.out.println(((Node)nodes.elementAt(i)).getName());
1380 for (int i = 0; i < s + 1; i++) {
1382 new StartScanThread(i, nodes, debut).start();
1387 public Register getRegister(int rank) {
1389 ListeTask listOfTasks = Register.Instance().getListeOfTasks();
1390 Vector<Integer> dependencies = getDependencies(rank, listOfTasks.getSize());
1391 Register g = new Register();
1392 ListeTask newListOfTasks = new ListeTask();
1393 g.setAppliName(Register.Instance().getAppliName());
1394 g.setParams(Register.Instance().getParams());
1395 g.setSpawnerStub(Register.Instance().getSpawnerStub());
1396 g.setNbOfTasks(Register.Instance().getNbOfTasks());
1397 // g.setVersion(reg.getVersion());
1398 for (int j = 0; j < dependencies.size(); j++) {
1399 TaskId id = listOfTasks.getTaskIdOfRank(((Integer) dependencies
1400 .elementAt(j)).intValue());
1401 newListOfTasks.addTask(id);
1402 if (id.getHostStub() != null) {
1403 Node noeud = Register.Instance()
1404 .getNodeOfStub(id.getHostStub());
1408 g.setListeOfTasks(newListOfTasks);
1412 private void updateConcernedNodes(int rank, Node oldNode, Node node) {
1413 ListeTask listOfTasks = Register.Instance().getListeOfTasks();
1414 Vector<?> dependencies = getDependencies(rank, listOfTasks.getSize());
1415 System.out.println("la liste des voisins concernes de : " + rank);
1416 for (int z = 0; z < dependencies.size(); z++)
1417 System.out.print(((Integer) dependencies.elementAt(z)).intValue()
1419 System.out.println();
1420 // Register.Instance().setVersion(registerVersion);
1421 // registerVersion++;
1423 .setSpawnerStub(Register.Instance().getSpawnerStub());
1425 if ((dependencies.size() % nbOfDeamonsPerThread) == 0)
1426 s = dependencies.size() / nbOfDeamonsPerThread;
1428 s = dependencies.size() / nbOfDeamonsPerThread + 1;
1429 Register reg = Register.Instance();
1431 for (int j = 0; j < s; j++) {
1432 new UpdateRegisterConcernedThread(dependencies, reg, j, rank,
1433 oldNode, node).start();
1437 private Vector<Integer> getDependencies(int id, int jaceSize) {
1438 // get computing dependencies
1439 Vector<Integer> neighbors = new Vector<Integer>();
1440 int[] dep = tache.getDependencies(id);
1441 for (int z = 0; z < taille(dep); z++)
1442 neighbors.add(dep[z]);
1443 // System.out.println("la liste des voisins de calcul de: "+id+" concerne");
1444 // for(int z=0;z<neighbors.size();z++)
1445 // System.out.print(((Integer)neighbors.elementAt(z)).intValue()+" ");
1446 // System.out.println();
1448 // get convergence neighbors
1450 while (Math.pow(2, d) < jaceSize) {
1451 if (id < Math.pow(2, d) && ((id + Math.pow(2, d)) < jaceSize))
1452 if (!neighbors.contains((Object) ((int) (id + Math.pow(2, d)))))
1453 neighbors.add((int) (id + Math.pow(2, d)));
1454 if (id < Math.pow(2, d + 1) && id >= Math.pow(2, d))
1455 if (!neighbors.contains((Object) ((int) (id - Math.pow(2, d)))))
1456 neighbors.add((int) (id - Math.pow(2, d)));
1460 // get backup neighbors
1461 int nb = Register.Instance().getNumBackupNeighbors();
1464 for (int j = 1; j <= nb; j++) {
1465 // ------------ 1 - for backups "j + n" (to the right of j)
1466 rankOfBackTask = (id + j) % jaceSize;
1467 if (!neighbors.contains((Object) rankOfBackTask))
1468 neighbors.add(rankOfBackTask);
1470 // ------------ 2 - for backups "j - n" (to the left of j)
1473 rankOfBackTask = tmp % jaceSize;
1475 rankOfBackTask = jaceSize - (Math.abs(tmp) % jaceSize);
1477 if (!neighbors.contains((Object) rankOfBackTask))
1478 neighbors.add(rankOfBackTask);
1486 public static int taille(int[] vect) {
1489 while (x < vect.length && vect[x] >= 0) {
1496 class StartScanning extends Thread {
1498 public StartScanning() {
1508 class StartScanThread extends Thread {
1511 int nbOfDeamonsPerThread, nbOfDeamonsPerSpawner;
1513 StartScanThread(int i, Vector<?> nodes, int debut) {
1517 nbOfDeamonsPerThread = JaceSpawner.Instance().getNbOfDeamonsPerThread();
1518 nbOfDeamonsPerSpawner = JaceSpawner.Instance()
1519 .getNbOfDeamonsPerSpawner();
1524 for (index = debut + i * nbOfDeamonsPerThread; index < debut + i
1525 * nbOfDeamonsPerThread + nbOfDeamonsPerThread
1526 && index < debut + nbOfDeamonsPerSpawner
1527 && index < nodes.size(); index++) {
1529 Node node = (Node) nodes.elementAt(index);
1530 JaceInterface stub = node.getStub();
1531 String name = node.getName();
1534 stub.setScanning(true);
1535 // System.out.println("modify scanning to "+name);
1537 } catch (Exception e) {
1538 System.out.println("unable to modify scanning to " + name + ":"
1542 // for(int x=0;x<nodes.size();x++)
1543 // System.out.println(((Node)nodes.elementAt(x)).getName());
1544 // System.out.println("nbre total: "+(index-1));