Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Adapatation for fault tolerance with the Mapping library.
[jaceP2P.git] / src / jaceP2P / JaceSuperNode.java
1 package jaceP2P;
2
3 import java.rmi.Naming;
4
5 import and.Mapping.GNode;
6
7 public class JaceSuperNode {
8         final int NB_HEART_DECONNECT = 3;
9         // attribute
10         private int heartTime; // frequency of heartBeat
11         private int port = 1098; // par la suite, donner par fichier de conf
12         private int timeBeforeKill; // wait for 3 non response of heartBeat to kill
13                                                                 // the Daemon
14         String protocol;
15         
16         private JaceSuperNodeServer snodeServer ;
17
18         public JaceSuperNode(String superNodeName, int port, String comProtocol,
19                         int beat) {
20                 heartTime = beat;
21                 timeBeforeKill = NB_HEART_DECONNECT * heartTime;
22                 this.port = port;
23                 protocol = comProtocol;
24                 snodeServer = null ;
25         }
26
27         public void initialize() {
28                 // if(protocol.equals("rmi")){
29
30                 // create his list of SuperNode
31                 // containing the IPs and ports
32                 // but not already the stubs
33                 SuperNodeListe.Instance().staticInitialization();
34
35                 HeartBeatSNode.Instance().setHeartTime(heartTime);
36                 JaceSuperNodeInterface myStub = null;
37
38                 try {
39                         snodeServer = new JaceSuperNodeServer(heartTime);
40
41                         // lauch the rmiregistry
42                         java.rmi.registry.LocateRegistry.createRegistry(port);
43                         java.rmi.registry.LocateRegistry.getRegistry(port).rebind(
44                                         "JaceSuperNode", snodeServer);
45                         myStub = (JaceSuperNodeInterface) Naming.lookup("rmi://"
46                                         + LocalHost.Instance().getIP() + ":" + port
47                                         + "/JaceSuperNode");
48                         LocalHost.Instance().setSuperNodeStub(myStub);
49                         LocalHost.Instance().setPort(port);
50                         System.out.println("SuperNode " + LocalHost.Instance().getIP()
51                                         + " launched and waiting for invokations on port " + port);
52                 } catch (Exception e) {
53                         System.err
54                                         .println("JaceP2P_Error in JaceSuperNode.initialize() when biding the JaceSuperNodeServer : "
55                                                         + e);
56                         System.err.println("Exit in JaceSuperNode.initialise()");
57                         System.exit(1);
58                 }
59
60                 // get the stubs of the conected SuperNodes
61                 SuperNodeListe.Instance().locateSuperNodes(myStub);
62
63                 // }
64
65                 int index = SuperNodeListe.Instance().existSuperNode(
66                                 LocalHost.Instance().getIP());
67                 int next, previous;
68                 if (index == (SuperNodeListe.Instance().getListe().size() - 1))
69                         next = 0;
70                 else
71                         next = index + 1;
72                 HeartBeatSNode.Instance().setServer(
73                                 ((SuperNodeData) SuperNodeListe.Instance().getListe()
74                                                 .elementAt(next)).getStub());
75                 if (index == 0)
76                         previous = SuperNodeListe.Instance().getListe().size() - 1;
77                 else
78                         previous = index - 1;
79                 System.out.println(index + " " + next + " " + previous);
80                 try {
81                         ((SuperNodeData) SuperNodeListe.Instance().getListe().elementAt(
82                                         previous)).getStub().updateHeart(
83                                         ((SuperNodeData) SuperNodeListe.Instance().getListe()
84                                                         .elementAt(index)).getStub());
85                 } catch (Exception e) {
86
87                         System.err
88                                         .println("Unable to modify heartbeat server for previous node"
89                                                         + e);
90                 }
91
92                 HeartBeatSNode.Instance().start();
93                 try {
94                         Thread.sleep(HeartBeatSNode.Instance().getHeartTime());
95                 } catch (Exception e) {
96                 }
97                 TokenThread.Instance().start();
98                 ScanThreadSuperNode.Instance().start();
99                 startScanningNodes();
100         }
101
102         // mettre des threads pr scanner
103         public void startScanningNodes() {
104                 while (true) {
105                         // scan at every "heartTime" milisecondes if nodes registered are
106                         // still alive
107                         scanConnectedHosts();
108
109                         try {
110                                 Thread.sleep(heartTime);
111                         } catch (Exception e) {
112                         }
113                 }
114         }
115
116         // verify if the nodes labeled "alive" in
117         // Register.Instance() of the SuperNode are still alive
118         private synchronized void scanConnectedHosts() {
119                 Node host;
120                 long workerTime;
121                 long currentTime;
122
123                 // detects the nodes connected that should have died
124                 for (int i = 0; i < Register.Instance().getSize(); i++) {
125                         host = Register.Instance().getNodeAt(i);
126                         if (host.getAliveFlag() == true && host.getAppliName() == null) {
127                                 workerTime = host.getAliveTime();
128                                 currentTime = System.currentTimeMillis();
129
130                                 // if the worker time has not changed since more than
131                                 // "timeBeforeKill" milisecondes, it is considered down
132                                 if (currentTime - workerTime > timeBeforeKill) {
133                                         // System.out.println(host.getName() +
134                                         // " : difference of time = " + (currentTime - workerTime));
135                                         host.setAliveFlag(false);
136                                         // try to reconnect the daemon to the super node
137                                         try {
138                                                 // if(protocol.equals("rmi")){
139                                                 host.getStub().reconnectSuperNode();
140                                                 // System.out.println("Daemon reconnected to the super node");
141                                                 // }
142                                         } catch (Exception e) {
143                                                 System.out.println("\nDISCONNECTION of " + host.getName()
144                                                                 + " size : " + Register.Instance().getSize());
145                                         }
146
147                                         // System.out.println("I remove the node because it doesnt answer anymore");
148                                         Register.Instance().removeNode(host);
149                                         int index = SuperNodeListe.Instance().existSuperNode(
150                                                         LocalHost.Instance().getIP());
151                                         ((SuperNodeData) SuperNodeListe.Instance().getListe().get(
152                                                         index)).setNbOfNodes(Register.Instance().getSize());
153                                         SuperNodeListe.Instance().forwardCountNode();
154                                         
155                                         GNode deadGNode = snodeServer.delGNodeFromList( host, 0 ) ;
156                                         SuperNodeListe.Instance().removeGNode( deadGNode ) ;
157
158                                         // Register.Instance().viewAll();
159                                         // SuperNodeListe.Instance().viewAll();
160                                 }
161                         }
162                 }
163         }
164 }