Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Correction of some bugs and performance enhancement.
[jaceP2P.git] / src / jaceP2P / JaceSuperNode.java
1 package jaceP2P;
2
3 import java.rmi.Naming;
4 import java.rmi.RemoteException;
5
6 public class JaceSuperNode {
7         final int NB_HEART_DECONNECT = 3;
8         // attribute
9         private int heartTime; // frequency of heartBeat
10         private int port = 1098; // par la suite, donner par fichier de conf
11         private int timeBeforeKill; // wait for 3 non response of heartBeat to kill
12                                                                 // the Daemon
13         String protocol;
14         
15         private JaceSuperNodeServer snodeServer ;
16
17         public JaceSuperNode(String superNodeName, int port, String comProtocol,
18                         int beat) {
19                 heartTime = beat;
20                 timeBeforeKill = NB_HEART_DECONNECT * heartTime;
21                 this.port = port;
22                 protocol = comProtocol;
23                 snodeServer = null ;
24         }
25
26         public void initialize() {
27                 // if(protocol.equals("rmi")){
28
29                 // create his list of SuperNode
30                 // containing the IPs and ports
31                 // but not already the stubs
32                 if (System.getSecurityManager() == null) {
33             System.setSecurityManager(new SecurityManager());
34         }
35
36                 
37                 SuperNodeListe.Instance().staticInitialization();
38
39                 HeartBeatSNode.Instance().setHeartTime(heartTime);
40                 JaceSuperNodeInterface myStub = null;
41
42                 try {
43                         snodeServer = new JaceSuperNodeServer(heartTime);
44
45                         // lauch the rmiregistry
46                         java.rmi.registry.LocateRegistry.createRegistry(port);
47                         java.rmi.registry.LocateRegistry.getRegistry(port).rebind(
48                                         "JaceSuperNode", snodeServer);
49                         myStub = (JaceSuperNodeInterface) Naming.lookup("rmi://"
50                                         + LocalHost.Instance().getIP() + ":" + port
51                                         + "/JaceSuperNode");
52                         LocalHost.Instance().setSuperNodeStub(myStub);
53                         LocalHost.Instance().setPort(port);
54                         System.out.println("SuperNode " + LocalHost.Instance().getIP()
55                                         + " launched and waiting for invokations on port " + port);
56                 } catch (Exception e) {
57                         System.err
58                                         .println("JaceP2P_Error in JaceSuperNode.initialize() when biding the JaceSuperNodeServer : "
59                                                         + e);
60                         System.err.println("Exit in JaceSuperNode.initialise()");
61                         System.exit(1);
62                 }
63
64                 // get the stubs of the conected SuperNodes
65                 SuperNodeListe.Instance().locateSuperNodes(myStub);
66
67                 // }
68
69                 int index = SuperNodeListe.Instance().existSuperNode(
70                                 LocalHost.Instance().getIP());
71                 int next, previous;
72                 if (index == (SuperNodeListe.Instance().getListe().size() - 1))
73                         next = 0;
74                 else
75                         next = index + 1;
76                 HeartBeatSNode.Instance().setServer(
77                                 ((SuperNodeData) SuperNodeListe.Instance().getListe()
78                                                 .get(next)).getStub());
79                 if (index == 0)
80                         previous = SuperNodeListe.Instance().getListe().size() - 1;
81                 else
82                         previous = index - 1;
83                 System.out.println(index + " " + next + " " + previous);
84                 try {
85                         ((SuperNodeData) SuperNodeListe.Instance().getListe().get(
86                                         previous)).getStub().updateHeart(
87                                         ((SuperNodeData) SuperNodeListe.Instance().getListe()
88                                                         .get(index)).getStub());
89                 } catch (Exception e) {
90
91                         System.err
92                                         .println("Unable to modify heartbeat server for previous node"
93                                                         + e);
94                 }
95
96                 HeartBeatSNode.Instance().start();
97                 try {
98                         Thread.sleep(HeartBeatSNode.Instance().getHeartTime());
99                 } catch (Exception e) {
100                 }
101                 TokenThread.Instance().start();
102                 ScanThreadSuperNode.Instance().start();
103                 startScanningNodes();
104         }
105
106         // mettre des threads pr scanner
107         public void startScanningNodes() {
108                 while (true) {
109                         // scan at every "heartTime" milisecondes if nodes registered are
110                         // still alive
111                         scanConnectedHosts();
112
113                         try {
114                                 Thread.sleep(heartTime);
115                         } catch (Exception e) {
116                         }
117                 }
118         }
119
120         // verify if the nodes labeled "alive" in
121         // Register.Instance() of the SuperNode are still alive
122         private synchronized void scanConnectedHosts() {
123                 Node host;
124                 long workerTime;
125                 long currentTime;
126
127                 // detects the nodes connected that should have died
128                 for (int i = 0; i < Register.Instance().getSize(); i++) {
129                         host = Register.Instance().getNodeAt(i);
130                         if (host.getAliveFlag() == true && host.getAppliName() == null) {
131                                 workerTime = host.getAliveTime();
132                                 currentTime = System.currentTimeMillis();
133
134                                 // if the worker time has not changed since more than
135                                 // "timeBeforeKill" milisecondes, it is considered down
136                                 if (currentTime - workerTime > timeBeforeKill) {
137                                         // System.out.println(host.getName() +
138                                         // " : difference of time = " + (currentTime - workerTime));
139                                         host.setAliveFlag(false);
140                                         // try to reconnect the daemon to the super node
141                                         try {
142                                                 // if(protocol.equals("rmi")){
143                                                 host.getStub().reconnectSuperNode();
144                                                 // System.out.println("Daemon reconnected to the super node");
145                                                 // }
146                                         } catch (Exception e) {
147                                                 System.out.println("\nDISCONNECTION of " + host.getName()
148                                                                 + " size : " + Register.Instance().getSize());
149                                         }
150
151                                         // System.out.println("I remove the node because it doesnt answer anymore");
152                                         Register.Instance().removeNode(host);
153                                         int index = SuperNodeListe.Instance().existSuperNode(
154                                                         LocalHost.Instance().getIP());
155                                         ((SuperNodeData) SuperNodeListe.Instance().getListe().get(
156                                                         index)).setNbOfNodes(Register.Instance().getSize());
157                                         SuperNodeListe.Instance().forwardCountNode();
158                                         
159                                         try {
160                                                 snodeServer.delGNodeFromList( host, 0, "" ) ;
161                                         } catch (RemoteException e) {
162                                                 System.err.println( "Unable to remove the dead node from the list !" ) ;
163                                                 e.printStackTrace();
164                                         }
165 //                                      SuperNodeListe.Instance().removeGNode( deadGNode ) ;
166
167                                         // Register.Instance().viewAll();
168                                         // SuperNodeListe.Instance().viewAll();
169                                 }
170                         }
171                 }
172         }
173 }