Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Energy, onHostDestruction: ensured ptr existence
[simgrid.git] / contrib / psg / src / example / symphony / SymphonyNetworkManager.java
1 package example.symphony;\r
2 \r
3 import java.util.*;\r
4 import java.util.logging.Level;\r
5 import java.util.logging.Logger;\r
6 \r
7 import example.symphony.Message.MessageType;\r
8 import example.symphony.SymphonyProtocol.BootstrapStatus;\r
9 import peersim.cdsim.CDProtocol;\r
10 import peersim.config.Configuration;\r
11 import peersim.core.CommonState;\r
12 import peersim.core.Fallible;\r
13 import peersim.core.Node;\r
14 import peersim.edsim.EDProtocol;\r
15 import peersim.transport.Transport;\r
16 \r
17 /**\r
18  *\r
19  * @author Andrea Esposito <and1989@gmail.com>\r
20  */\r
21 public class SymphonyNetworkManager implements EDProtocol, CDProtocol {\r
22 \r
23     private static final String PAR_SYMPHONY = "symphony";\r
24     private static final String PAR_TRANSP = "transport";\r
25     private static final String PAR_ATTEMPTS = "attempts";\r
26     private static final String PAR_NETSIZE = "networkestimator";\r
27     private static final String PAR_NUM_TIMEOUT = "nTimeout";\r
28     private static final String PAR_RELINKING = "relinking";\r
29     private static final String PAR_RELINKING_LOWER_BOUND = "relinkingLowerBound";\r
30     private static final String PAR_RELINKING_UPPER_BOUND = "relinkingUpperBound";\r
31     private static final int DEFAULT_K = 1;\r
32     private static final int DEFAULT_N = 2;\r
33     private static final double DEFAULT_RELINKING_LOWER_BOUND = 0.5;\r
34     private static final double DEFAULT_RELINKING_UPPER_BOUND = 2.0;\r
35     private final String prefix;\r
36     private final int symphonyID;\r
37     private final int transportID;\r
38     private final int networkEstimatorID;\r
39     private final int attempts;\r
40     private final int pid;\r
41     private final int nTimeout;\r
42     private final HashMap<Node, Integer> keepAliveMap;\r
43     private final boolean relinkingProtocolActivated;\r
44     private final double relinkingUpperBound;\r
45     private final double relinkingLowerBound;\r
46     private int k = DEFAULT_K; // Number of Long Range Link\r
47     private int n = DEFAULT_N; // Estimation Network size\r
48     private static boolean firstPrintConfig = true;\r
49     /*\r
50      * Estimation Network size at which last long distance link was established, at the beginning -1\r
51      * to indicate that we never had Long Range Links\r
52      */\r
53     private int nLink = -1;\r
54     private int currentAttempts;\r
55 \r
56     public SymphonyNetworkManager(String prefix) {\r
57 \r
58         this.prefix = prefix;\r
59         pid = Configuration.lookupPid(prefix.replaceAll("protocol.", ""));\r
60         symphonyID = Configuration.getPid(prefix + "." + PAR_SYMPHONY);\r
61         transportID = Configuration.getPid(prefix + "." + PAR_TRANSP);\r
62         networkEstimatorID = Configuration.getPid(prefix + "." + PAR_NETSIZE);\r
63         attempts = Configuration.getInt(prefix + "." + PAR_ATTEMPTS);\r
64         nTimeout = Configuration.getInt(prefix + "." + PAR_NUM_TIMEOUT, 10);\r
65         relinkingProtocolActivated = !Configuration.getString(prefix + "." + PAR_RELINKING, "on").toLowerCase().equals("off");\r
66         double relinkingLowerBoundAppo = Configuration.getDouble(prefix + "." + PAR_RELINKING_LOWER_BOUND, DEFAULT_RELINKING_LOWER_BOUND);\r
67         double relinkingUpperBoundAppo = Configuration.getDouble(prefix + "." + PAR_RELINKING_UPPER_BOUND, DEFAULT_RELINKING_UPPER_BOUND);\r
68         if (relinkingLowerBoundAppo > relinkingUpperBoundAppo) {\r
69             relinkingLowerBound = DEFAULT_RELINKING_LOWER_BOUND;\r
70             relinkingUpperBound = DEFAULT_RELINKING_UPPER_BOUND;\r
71         } else {\r
72             relinkingLowerBound = relinkingLowerBoundAppo;\r
73             relinkingUpperBound = relinkingUpperBoundAppo;\r
74         }\r
75 \r
76         keepAliveMap = new HashMap<Node, Integer>();\r
77 \r
78         printConfig();\r
79     }\r
80 \r
81     private void printConfig() {\r
82 \r
83         if (firstPrintConfig) {\r
84             firstPrintConfig = false;\r
85             System.out.println(SymphonyNetworkManager.class.getSimpleName() + " Configuration:");\r
86             System.out.println("- Attempts per LongRangeLinks: " + attempts);\r
87             System.out.println("- Number of Timeout before a node is considered OFFLINE (through Keep-alive):" + nTimeout);\r
88             System.out.println("- Relinking: " + (relinkingProtocolActivated ? "ON" : "OFF"));\r
89             System.out.println("- Relinking Range: [" + relinkingLowerBound + ", " + relinkingUpperBound + "]");\r
90             System.out.println("-------------------------------\n");\r
91         }\r
92     }\r
93 \r
94     public void join(final Node node, final Node bootstrapNode) throws RoutingException {\r
95         final SymphonyProtocol bootstrapSymphony = (SymphonyProtocol) bootstrapNode.getProtocol(symphonyID);\r
96         SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
97 \r
98         /*\r
99          * Search (through the bootstrap node) and contact the Manager Node of myself such a way to\r
100          * be able to insert myself into the ring and create the short links\r
101          *\r
102          */\r
103         bootstrapSymphony.route(bootstrapNode, symphony.getIdentifier(), new Handler() {\r
104 \r
105             public void handle(SymphonyProtocol src, Tuple<Node, Double> tuple) {\r
106                 if (tuple == null) {\r
107                     Logger.getLogger(SymphonyNetworkManager.class.getName()).log(Level.SEVERE, "FAIL ROUTE JOIN");\r
108                     node.setFailState(Fallible.DEAD);\r
109                     return;\r
110                 }\r
111 \r
112                 Node managerNode = tuple.x;\r
113 \r
114                 Transport transport = (Transport) node.getProtocol(transportID);\r
115                 Message msg = new Message(node, node, MessageType.JOIN);\r
116                 transport.send(node, managerNode, msg, pid);\r
117             }\r
118         });\r
119 \r
120         // The Long Range Links are added after that i joined the ring (before i can't because i haven't got the nodes to do the routing)\r
121     }\r
122 \r
123     /**\r
124      * Conservative Re-Linking (i reuse the ones already created: not all fresh)\r
125      *\r
126      * @param node\r
127      */\r
128     public void updateLongRangeLinks(Node node) {\r
129         SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
130         Transport transport = (Transport) node.getProtocol(transportID);\r
131 \r
132         // if too much links i delete the farest ones\r
133         while (symphony.longRangeLinksOutgoing.size() > k) {\r
134             Node distantNode = Collections.max(symphony.longRangeLinksOutgoing, new SymphonyNodeComparator(symphonyID, node));\r
135             symphony.longRangeLinksOutgoing.remove(distantNode);\r
136 \r
137             // Communicate to the outgoing node that it ins't anymore one of my long range links\r
138             Message disconnectMsg = new Message(null, node, MessageType.DISCONNECT_LONG_RANGE_LINK);\r
139             transport.send(node, distantNode, disconnectMsg, pid);\r
140         }\r
141 \r
142         // I can search Long Range Links only if i'm into the ring and i'm able to do routing\r
143         if (symphony.isBootstrapped()) {\r
144             // if only few i try again, untill attempts times, to add new ones\r
145             int difference = k - symphony.longRangeLinksOutgoing.size();\r
146             currentAttempts = attempts;\r
147             for (int i = 0; i < difference; i++) {\r
148                 sendLongRangeLinkRequest(symphony, node);\r
149             }\r
150         }\r
151     }\r
152     private static final int MAX_ANTILOOP_COUNT_MANAGER_MYSELF = 5;\r
153     private int antiLoopManagerMySelf = 0;\r
154 \r
155     private void sendLongRangeLinkRequest(final SymphonyProtocol symphony, final Node node) {\r
156         boolean routingOk;\r
157         do {\r
158             double distance = Math.exp((Math.log(n) / Math.log(2)) * (CommonState.r.nextDouble() - 1.0)); // Harmonic Distribution\r
159             double targetIdentifier = (symphony.getIdentifier() + distance) % 1;\r
160             try {\r
161 \r
162                 symphony.route(node, targetIdentifier, new Handler() {\r
163 \r
164                     public void handle(SymphonyProtocol src, Tuple<Node, Double> tuple) {\r
165 \r
166                         if (tuple == null) {\r
167                             Logger.getLogger(SymphonyNetworkManager.class.getName()).log(Level.SEVERE, "FAIL ROUTE SENDLONGRANGELINKREQUEST");\r
168                             return;\r
169                         }\r
170 \r
171                         Collection<Node> allShortLinks = new LinkedList<Node>();\r
172                         for (Tuple<Node, BootstrapStatus> shortTuple : symphony.leftShortRangeLinks) {\r
173                             allShortLinks.add(shortTuple.x);\r
174                         }\r
175                         for (Tuple<Node, BootstrapStatus> shortTuple : symphony.rightShortRangeLinks) {\r
176                             allShortLinks.add(shortTuple.x);\r
177                         }\r
178 \r
179                         /*\r
180                          *\r
181                          * I'm myself one of my short links, special case... i try again without\r
182                          * reduce the attempts for a maximum of MAX_ANTILOOP_COUNT_MANAGER_MYSELF\r
183                          * times after that i start again to reduce the attempts\r
184                          */\r
185                         if (tuple.x.equals(node) || allShortLinks.contains(tuple.x)) {\r
186 \r
187                             if (antiLoopManagerMySelf < MAX_ANTILOOP_COUNT_MANAGER_MYSELF) {\r
188 \r
189                                 antiLoopManagerMySelf++;\r
190                                 sendLongRangeLinkRequest(symphony, node);\r
191                             } else {\r
192                                 antiLoopManagerMySelf = 0;\r
193                                 currentAttempts--;\r
194                             }\r
195                         } else {\r
196 \r
197                             boolean alreadyAdded = symphony.longRangeLinksOutgoing.contains(tuple.x);\r
198                             /*\r
199                              *\r
200                              * OPINABLE: DESCREASE ATTEMPTS ONLY FOR REJECT? If yes i have to manage\r
201                              * the possible loop (nodi exhaurited so already all added)\r
202                              */\r
203                             if (alreadyAdded && currentAttempts > 0) {\r
204                                 currentAttempts--;\r
205                                 sendLongRangeLinkRequest(symphony, node);\r
206                             } else if (!alreadyAdded) {\r
207                                 Message msg = new Message(null, node, MessageType.REQUEST_LONG_RANGE_LINK);\r
208                                 Transport transport = (Transport) node.getProtocol(transportID);\r
209                                 transport.send(node, tuple.x, msg, pid);\r
210                             }\r
211                         }\r
212                     }\r
213                 });\r
214                 routingOk = true;\r
215             } catch (RoutingException ex) {\r
216                 routingOk = false;\r
217             }\r
218         } while (!routingOk);\r
219     }\r
220 \r
221     public void leave(Node node) {\r
222         SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
223 \r
224         if (symphony.loggedIntoNetwork != BootstrapStatus.OFFLINE) {\r
225             Transport transport = (Transport) node.getProtocol(transportID);\r
226 \r
227             symphony.loggedIntoNetwork = BootstrapStatus.OFFLINE;\r
228 \r
229             // Communicate that i'm leaving to the outgoing (that i point to) nodes\r
230             for (Node outgoingNode : symphony.longRangeLinksOutgoing) {\r
231                 Message disconnectMsg = new Message(null, node, MessageType.DISCONNECT_LONG_RANGE_LINK);\r
232                 transport.send(node, outgoingNode, disconnectMsg, pid);\r
233             }\r
234 \r
235             // Communicate that i'm leaving to the incoming (that they point to me) nodes\r
236             for (Node incomingNode : symphony.longRangeLinksIncoming) {\r
237                 Message unavailableMsg = new Message(null, node, MessageType.UNAVAILABLE_LONG_RANGE_LINK);\r
238                 transport.send(node, incomingNode, unavailableMsg, pid);\r
239             }\r
240 \r
241             // Communicate to my neighbours (short range links) that i'm leaving and i send to them the near neighbours\r
242             for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {\r
243                 Message leaveMsg = new Message(symphony.rightShortRangeLinks.clone(), node, MessageType.LEAVE);\r
244                 transport.send(node, leftTuple.x, leaveMsg, pid);\r
245             }\r
246 \r
247             for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {\r
248                 Message leaveMsg = new Message(symphony.leftShortRangeLinks.clone(), node, MessageType.LEAVE);\r
249                 transport.send(node, rightTuple.x, leaveMsg, pid);\r
250             }\r
251 \r
252             node.setFailState(Fallible.DEAD);\r
253         }\r
254     }\r
255 \r
256     public void processEvent(Node node, int pid, Object event) {\r
257 \r
258         Message msg = (Message) event;\r
259 \r
260         SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
261         Transport transport = (Transport) node.getProtocol(transportID);\r
262 \r
263         Collection<Tuple<Node, BootstrapStatus>> collection = null;\r
264         switch (msg.getType()) {\r
265             case JOIN:\r
266                 // I send my  current neighbours to the entering node\r
267                 collection = (Collection<Tuple<Node, BootstrapStatus>>) symphony.leftShortRangeLinks.clone();\r
268                 collection.addAll((Collection<Tuple<Node, BootstrapStatus>>) symphony.rightShortRangeLinks.clone());\r
269                 Message responseMsg = new Message(collection, node, MessageType.JOIN_RESPONSE);\r
270                 transport.send(node, msg.getSourceNode(), responseMsg, pid);\r
271 \r
272                 /*\r
273                  * Update my neighbours list, adding the new one (for sure it is entering in the\r
274                  * left side)\r
275                  *\r
276                  * Put to "ONLINE_AND_ALL_NEIGHBOURS_OFFLINE" because maybe the bootstrap phase is\r
277                  * not terminated yet (ashyncronous communication)\r
278                  */\r
279                 symphony.leftShortRangeLinks.add(new Tuple<Node, BootstrapStatus>(msg.getSourceNode(), BootstrapStatus.ONLINE_AND_ALL_NEIGHBOURS_OFFLINE));\r
280 \r
281 \r
282                 fixNeighbours(node, symphony.leftShortRangeLinks);\r
283                 break;\r
284             case JOIN_RESPONSE:\r
285 \r
286                 Collection<Tuple<Node, BootstrapStatus>> tupleCollection = (Collection<Tuple<Node, BootstrapStatus>>) msg.getBody();\r
287 \r
288                 /*\r
289                  *\r
290                  * My manager is a right neighbour. The manager is already inside the ring, boostrap\r
291                  * obliviously ok\r
292                  */\r
293                 symphony.rightShortRangeLinks.add(new Tuple<Node, BootstrapStatus>(msg.getSourceNode(), BootstrapStatus.ONLINE));\r
294 \r
295                 // Set my neighbours in the correct position\r
296                 for (Tuple<Node, BootstrapStatus> tuple : tupleCollection) {\r
297                     if (SymphonyProtocol.isLeftNeighbour(node, tuple.x)) {\r
298                         symphony.leftShortRangeLinks.add(tuple);\r
299                     } else {\r
300                         symphony.rightShortRangeLinks.add(tuple);\r
301                     }\r
302                 }\r
303 \r
304                 fixNeighbours(node, symphony.leftShortRangeLinks);\r
305                 fixNeighbours(node, symphony.rightShortRangeLinks);\r
306 \r
307                 // Update bootstrap status\r
308                 checkBootstrapStatus(node);\r
309 \r
310                 // I send the refresh command such a way to exchange the views\r
311                 refreshNeighbours(node);\r
312 \r
313                 // Update Long Range Links, because it's at the beginning is the same as adding k\r
314                 updateLongRangeLinks(node);\r
315                 break;\r
316             case UPDATE_NEIGHBOURS:\r
317 \r
318                 Collection<Tuple<Node, BootstrapStatus>> collectionCloned = ((Collection<Tuple<Node, BootstrapStatus>>) symphony.leftShortRangeLinks.clone());\r
319                 collectionCloned.addAll(((Collection<Tuple<Node, BootstrapStatus>>) symphony.rightShortRangeLinks.clone()));\r
320 \r
321                 // Send my neighbours such a way it can also update itself\r
322                 Message responseUpdateMsg = new Message(collectionCloned, node, MessageType.UPDATE_NEIGHBOURS_RESPONSE);\r
323                 transport.send(node, msg.getSourceNode(), responseUpdateMsg, pid);\r
324 \r
325                 // Update my view with the new node\r
326                 Tuple<Node, BootstrapStatus> neighbourTuple = new Tuple<Node, BootstrapStatus>(msg.getSourceNode(), (BootstrapStatus) msg.getBody());\r
327                 if (SymphonyProtocol.isLeftNeighbour(node, msg.getSourceNode())) {\r
328                     collection = symphony.leftShortRangeLinks;\r
329                 } else {\r
330                     collection = symphony.rightShortRangeLinks;\r
331                 }\r
332                 collection.add(neighbourTuple);\r
333 \r
334                 fixNeighbours(node, collection);\r
335                 fixLookAheadMap(node);\r
336                 break;\r
337             case UPDATE_NEIGHBOURS_RESPONSE:\r
338 \r
339                 Collection<Tuple<Node, BootstrapStatus>> responseCollection = (Collection<Tuple<Node, BootstrapStatus>>) msg.getBody();\r
340 \r
341                 for (Tuple<Node, BootstrapStatus> neighbourResponseTuple : responseCollection) {\r
342                     if (SymphonyProtocol.isLeftNeighbour(node, neighbourResponseTuple.x)) {\r
343                         collection = symphony.leftShortRangeLinks;\r
344                     } else {\r
345                         collection = symphony.rightShortRangeLinks;\r
346                     }\r
347                     collection.add(neighbourResponseTuple);\r
348                 }\r
349 \r
350                 // Fix the neighbours number to the maximum allow and maybe remove myself from the list\r
351                 fixNeighbours(node, symphony.leftShortRangeLinks);\r
352                 fixNeighbours(node, symphony.rightShortRangeLinks);\r
353                 fixLookAheadMap(node);\r
354                 break;\r
355             case UPDATE_STATUS:\r
356             case UPDATE_STATUS_RESPONSE:\r
357 \r
358                 Node updNode = msg.getSourceNode();\r
359                 BootstrapStatus updStatus = (BootstrapStatus) msg.getBody();\r
360 \r
361                 // I search the neighbour and i update its status\r
362                 boolean founded = false;\r
363 \r
364                 // Try to see if it is on the left\r
365                 for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {\r
366                     if (leftTuple.x.equals(updNode)) {\r
367                         symphony.leftShortRangeLinks.remove(leftTuple);\r
368                         symphony.leftShortRangeLinks.add(new Tuple<Node, BootstrapStatus>(updNode, updStatus));\r
369 \r
370                         founded = true;\r
371                         break;\r
372                     }\r
373                 }\r
374 \r
375                 // If it isn't on the left i try with the neighbours on the right\r
376                 if (!founded) {\r
377                     for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {\r
378                         if (rightTuple.x.equals(updNode)) {\r
379                             symphony.rightShortRangeLinks.remove(rightTuple);\r
380                             symphony.rightShortRangeLinks.add(new Tuple<Node, BootstrapStatus>(updNode, updStatus));\r
381 \r
382                             break;\r
383                         }\r
384                     }\r
385 \r
386                     fixNeighbours(node, symphony.rightShortRangeLinks);\r
387                 } else {\r
388                     fixNeighbours(node, symphony.leftShortRangeLinks);\r
389                 }\r
390 \r
391                 checkBootstrapStatusAndAlert(node);\r
392 \r
393                 if (msg.getType() == MessageType.UPDATE_STATUS) {\r
394                     Message responseUpdStatus = new Message(symphony.loggedIntoNetwork, node, MessageType.UPDATE_STATUS_RESPONSE);\r
395                     transport.send(node, updNode, responseUpdStatus, pid);\r
396                 }\r
397 \r
398                 break;\r
399             case REQUEST_LONG_RANGE_LINK:\r
400                 MessageType responseType = MessageType.REJECT_LONG_RANGE_LINK;\r
401                 if (symphony.longRangeLinksIncoming.size() < (2 * k)) {\r
402                     boolean added = symphony.longRangeLinksIncoming.add(msg.getSourceNode());\r
403                     if (added) {\r
404                         responseType = MessageType.ACCEPTED_LONG_RANGE_LINK;\r
405                     }\r
406                 }\r
407                 Message responseLongLinkMsg = new Message(null, node, responseType);\r
408                 transport.send(node, msg.getSourceNode(), responseLongLinkMsg, pid);\r
409                 break;\r
410             case ACCEPTED_LONG_RANGE_LINK:\r
411                 nLink = n;\r
412                 symphony.longRangeLinksOutgoing.add(msg.getSourceNode());\r
413                 break;\r
414             case REJECT_LONG_RANGE_LINK:\r
415                 if (currentAttempts > 0) {\r
416                     currentAttempts--;\r
417                     sendLongRangeLinkRequest(symphony, node);\r
418                 }\r
419                 break;\r
420             case DISCONNECT_LONG_RANGE_LINK:\r
421                 symphony.longRangeLinksIncoming.remove(msg.getSourceNode());\r
422                 symphony.lookAheadMap.put(msg.getSourceNode(), null);\r
423                 break;\r
424             case UNAVAILABLE_LONG_RANGE_LINK:\r
425                 symphony.longRangeLinksOutgoing.remove(msg.getSourceNode());\r
426                 symphony.lookAheadMap.put(msg.getSourceNode(), null);\r
427                 break;\r
428             case LEAVE:\r
429                 Tuple<Node, BootstrapStatus> foundedTuple = null;\r
430 \r
431                 // Verify if the node that is leaving is a left neighbour\r
432                 for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {\r
433                     if (leftTuple.x.equals(msg.getSourceNode())) {\r
434                         collection = symphony.leftShortRangeLinks;\r
435                         foundedTuple = leftTuple;\r
436                         break;\r
437                     }\r
438                 }\r
439 \r
440                 // Verify if the node that is leaving is a right neighbour\r
441                 if (collection == null) {\r
442                     for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {\r
443                         if (rightTuple.x.equals(msg.getSourceNode())) {\r
444                             collection = symphony.rightShortRangeLinks;\r
445                             foundedTuple = rightTuple;\r
446                             break;\r
447                         }\r
448                     }\r
449                 }\r
450 \r
451                 // if i've found the neighbour i remove it and i add to myself its neighbours\r
452                 if (collection != null) {\r
453                     collection.addAll((Collection<Tuple<Node, BootstrapStatus>>) msg.getBody());\r
454                     collection.remove(foundedTuple);\r
455                     fixNeighbours(node, collection);\r
456 \r
457                     // Update status and ready to send an alert in case i'm out of the ring\r
458                     checkBootstrapStatusAndAlert(node);\r
459                 }\r
460                 break;\r
461             case KEEP_ALIVE:\r
462                 Set<Double>[] lookAheadSetArray = new LinkedHashSet[2];\r
463 \r
464                 /*\r
465                  * Check if the contacting node is doing lookAhead and in case of affirmative answer\r
466                  * i provide to it the long range link identifiers (according to my routing mode)\r
467                  */\r
468                 if ((Boolean) msg.getBody()) {\r
469                     int i = 0;\r
470                     Iterable[] iterableArray;\r
471                     if (symphony.bidirectionalRouting) {\r
472                         iterableArray = new Iterable[]{symphony.longRangeLinksOutgoing, symphony.longRangeLinksIncoming};\r
473                     } else {\r
474                         iterableArray = new Iterable[]{symphony.longRangeLinksOutgoing};\r
475                     }\r
476 \r
477                     for (Iterable<Node> iterable : iterableArray) {\r
478                         lookAheadSetArray[i] = new LinkedHashSet<Double>();\r
479                         Set<Double> lookAheadSet = lookAheadSetArray[i];\r
480                         Iterator<Node> it = iterable.iterator();\r
481                         while (it.hasNext()) {\r
482                             Node longLinkNode = it.next();\r
483                             lookAheadSet.add(((SymphonyProtocol) longLinkNode.getProtocol(symphonyID)).getIdentifier());\r
484                         }\r
485                         i++;\r
486                     }\r
487                 }\r
488 \r
489                 transport.send(node, msg.getSourceNode(), new Message(lookAheadSetArray, node, MessageType.KEEP_ALIVE_RESPONSE), pid);\r
490                 break;\r
491             case KEEP_ALIVE_RESPONSE:\r
492                 // Reset the counter to 0\r
493                 keepAliveMap.put(msg.getSourceNode(), 0);\r
494 \r
495                 if (symphony.lookAhead) {\r
496                     symphony.lookAheadMap.put(msg.getSourceNode(), (Set<Double>[]) msg.getBody());\r
497                 }\r
498 \r
499                 break;\r
500         }\r
501     }\r
502 \r
503     /**\r
504      *\r
505      * Update the status and communicate immediately to the neighbours if the node is gone out from\r
506      * the ring (and before it was inside)\r
507      *\r
508      * @param node\r
509      */\r
510     private void checkBootstrapStatusAndAlert(Node node) {\r
511         SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
512         BootstrapStatus beforeStatus = symphony.loggedIntoNetwork;\r
513 \r
514         checkBootstrapStatus(node);\r
515 \r
516         // Instead of waiting that the update happens periodically i do it now because i'm out of the ring and before i wasn't\r
517         if (symphony.loggedIntoNetwork != beforeStatus && !symphony.isBootstrapped()) {\r
518             updateBootstrapStatusNeighbours(node, true);\r
519         }\r
520     }\r
521 \r
522     private void fixNeighbours(Node node, Collection<Tuple<Node, BootstrapStatus>> neighbours) {\r
523 \r
524         SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
525 \r
526         // Remove duplicates, remove that ones that are in an obsolete status\r
527         Collection<Tuple<Node, BootstrapStatus>> removedNeighbours = new LinkedHashSet<Tuple<Node, BootstrapStatus>>();\r
528         for (Tuple<Node, BootstrapStatus> tuple : neighbours) {\r
529 \r
530             // Remove myself from the neighbours list\r
531             if (tuple.x.equals(node)) {\r
532                 removedNeighbours.add(tuple);\r
533                 continue;\r
534             }\r
535 \r
536             EnumSet<BootstrapStatus> status = EnumSet.allOf(BootstrapStatus.class);\r
537             status.remove(tuple.y);\r
538 \r
539             for (BootstrapStatus opposite : status) {\r
540                 Tuple<Node, BootstrapStatus> oppositeNeighbour = new Tuple<Node, BootstrapStatus>(tuple.x, opposite);\r
541                 if (neighbours.contains(oppositeNeighbour)) {\r
542                     if (tuple.y != BootstrapStatus.ONLINE) {\r
543                         removedNeighbours.add(new Tuple<Node, BootstrapStatus>(tuple.x, BootstrapStatus.OFFLINE));\r
544                         if (opposite == BootstrapStatus.ONLINE) {\r
545                             removedNeighbours.add(new Tuple<Node, BootstrapStatus>(tuple.x, BootstrapStatus.ONLINE_AND_ALL_NEIGHBOURS_OFFLINE));\r
546                         }\r
547                     }\r
548                 }\r
549             }\r
550         }\r
551         neighbours.removeAll(removedNeighbours);\r
552 \r
553         /*\r
554          *\r
555          * I count the neighbours that are in the ONLINE status but before i remove the ones that\r
556          * are gone in timeout during the keep-alive procedure because can be someone that is old\r
557          * but not remove from the exchanging views (UPDATE_NEIGHBOURS) procedure and are not\r
558          * effectively online. To do anyway the possibility to the node to join again i decrease its\r
559          * timeout value. This only if the node is ONLINE and so i'm really interested that it is ok\r
560          * for the routing.\r
561          *\r
562          */\r
563         int onlineNeighbours = 0;\r
564         for (Tuple<Node, BootstrapStatus> tuple : neighbours) {\r
565 \r
566             Integer value = keepAliveMap.get(tuple.x);\r
567             if (value != null && value >= nTimeout && tuple.y == BootstrapStatus.ONLINE) {\r
568                 keepAliveMap.put(tuple.x, value - 1);\r
569                 removedNeighbours.add(tuple);\r
570             } else {\r
571 \r
572                 if (tuple.y == BootstrapStatus.ONLINE) {\r
573                     onlineNeighbours++;\r
574                 }\r
575             }\r
576         }\r
577         neighbours.removeAll(removedNeighbours);\r
578 \r
579         // Fix the neighbours number to the maximum allowed\r
580         SymphonyNodeComparator comparator = new SymphonyNodeComparator(symphonyID, node);\r
581         AdapterSymphonyNodeComparator adapterComparator = new AdapterSymphonyNodeComparator(comparator);\r
582         while (neighbours.size() > symphony.numberShortRangeLinksPerSide) {\r
583             Tuple<Node, BootstrapStatus> distantTuple = Collections.max(neighbours, adapterComparator);\r
584 \r
585             // Mantain the link with the ring\r
586             if (distantTuple.y == BootstrapStatus.ONLINE) {\r
587                 if (onlineNeighbours > 1) {\r
588                     neighbours.remove(distantTuple);\r
589                     onlineNeighbours--;\r
590                 } else {\r
591                     /*\r
592                      * If will be only one neighbour that is online i save it and i'm going to\r
593                      * eliminate another one (for sure it'll be not online)\r
594                      *\r
595                      */\r
596                     Tuple<Node, BootstrapStatus> backupOnlineNeighbour = distantTuple;\r
597                     neighbours.remove(backupOnlineNeighbour);\r
598                     distantTuple = Collections.max(neighbours, adapterComparator);\r
599                     neighbours.add(backupOnlineNeighbour);\r
600                     neighbours.remove(distantTuple);\r
601                 }\r
602 \r
603             } else {\r
604                 neighbours.remove(distantTuple);\r
605             }\r
606         }\r
607     }\r
608 \r
609     @Override\r
610     public Object clone() {\r
611         SymphonyNetworkManager dolly = new SymphonyNetworkManager(prefix);\r
612         return dolly;\r
613     }\r
614 \r
615     public void nextCycle(Node node, int protocolID) {\r
616 \r
617         if (node.isUp()) {\r
618 \r
619             // Update the estimated network size\r
620             updateN(node);\r
621 \r
622             // Update the estimated K\r
623             updateK(node);\r
624 \r
625             // Update the bootstrap status of my neighbours that were joining the ring\r
626             updateBootstrapStatusNeighbours(node, false);\r
627 \r
628             // Refresh the neighbours views\r
629             refreshNeighbours(node);\r
630 \r
631             // I send and check the connection status of the neighbours\r
632             keepAlive(node);\r
633 \r
634             // Update the bootstrap status\r
635             checkBootstrapStatus(node);\r
636 \r
637             // If it's active i check the Relinking criteria\r
638             if (relinkingProtocolActivated) {\r
639                 reLinkingProtocol(node);\r
640             }\r
641 \r
642             // Update the long range links (conservative)\r
643             updateLongRangeLinks(node);\r
644         }\r
645     }\r
646 \r
647     /**\r
648      *\r
649      * @param allNeighbours true, communicate/receive the status update from all the neighbours.\r
650      * false, communicate/receive the status update only from the neighbours that are NOT ONLINE\r
651      *\r
652      */\r
653     private void updateBootstrapStatusNeighbours(Node node, boolean allNeighbours) {\r
654         SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
655         Transport transport = (Transport) node.getProtocol(transportID);\r
656 \r
657         Collection<Tuple<Node, BootstrapStatus>> collection = new LinkedHashSet<Tuple<Node, BootstrapStatus>>();\r
658         collection.addAll(symphony.leftShortRangeLinks);\r
659         collection.addAll(symphony.rightShortRangeLinks);\r
660 \r
661         for (Tuple<Node, BootstrapStatus> neighbourTuple : collection) {\r
662             if (allNeighbours || neighbourTuple.y != BootstrapStatus.ONLINE) {\r
663                 Message msg = new Message(symphony.loggedIntoNetwork, node, MessageType.UPDATE_STATUS);\r
664                 transport.send(node, neighbourTuple.x, msg, pid);\r
665             }\r
666         }\r
667     }\r
668 \r
669     private void updateN(Node node) {\r
670         NetworkSizeEstimatorProtocolInterface networkEstimator = (NetworkSizeEstimatorProtocolInterface) node.getProtocol(networkEstimatorID);\r
671         n = networkEstimator.getNetworkSize(node);\r
672         if (n <= 0) {\r
673             n = DEFAULT_N;\r
674         }\r
675     }\r
676 \r
677     /**\r
678      * Update the K value with the current expectation of the network size\r
679      */\r
680     private void updateK(Node node) {\r
681 \r
682         SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
683         if (!symphony.fixedLongRangeLinks) {\r
684             k = (int) Math.ceil(Math.log(n) / Math.log(2));\r
685 \r
686             if (k <= 0) {\r
687                 k = DEFAULT_K;\r
688             }\r
689         } else {\r
690             k = symphony.numberFixedLongRangeLinks;\r
691         }\r
692     }\r
693 \r
694     private void refreshNeighbours(Node node) {\r
695         SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
696         Transport transport = (Transport) node.getProtocol(transportID);\r
697 \r
698         for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {\r
699             Node leftNode = leftTuple.x;\r
700             Message updateNeighbourMsg = new Message(symphony.loggedIntoNetwork, node, MessageType.UPDATE_NEIGHBOURS);\r
701             transport.send(node, leftNode, updateNeighbourMsg, pid);\r
702         }\r
703 \r
704         for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {\r
705             Node rightNode = rightTuple.x;\r
706             Message updateNeighbourMsg = new Message(symphony.loggedIntoNetwork, node, MessageType.UPDATE_NEIGHBOURS);\r
707             transport.send(node, rightNode, updateNeighbourMsg, pid);\r
708         }\r
709     }\r
710 \r
711     /**\r
712      * Method to update the (connection) status of the node. Perform the update to the "up" so:\r
713      * OFFLINE -> ONLINE_AND_ALL_NEIGHBOURS_OFFLINE -> ONLINE\r
714      *\r
715      * and to the "down" only: ONLINE -> ONLINE_AND_ALL_NEIGHBOURS_OFFLINE\r
716      *\r
717      * @param node\r
718      */\r
719     private void checkBootstrapStatus(Node node) {\r
720         SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
721 \r
722         if (symphony.loggedIntoNetwork != BootstrapStatus.OFFLINE) {\r
723 \r
724             symphony.loggedIntoNetwork = BootstrapStatus.ONLINE_AND_ALL_NEIGHBOURS_OFFLINE;\r
725 \r
726             // Check if i'm inside the ring and i'm able to do routing\r
727             if (!symphony.leftShortRangeLinks.isEmpty() && !symphony.rightShortRangeLinks.isEmpty()) {\r
728 \r
729                 boolean leftOk = false;\r
730                 for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {\r
731                     if (leftTuple.y == BootstrapStatus.ONLINE) {\r
732                         leftOk = true;\r
733                         break;\r
734                     }\r
735                 }\r
736 \r
737                 if (leftOk) {\r
738                     for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {\r
739                         if (rightTuple.y == BootstrapStatus.ONLINE) {\r
740                             symphony.loggedIntoNetwork = BootstrapStatus.ONLINE;\r
741                             break;\r
742                         }\r
743                     }\r
744                 }\r
745             }\r
746         }\r
747     }\r
748 \r
749     /**\r
750      * Remove the possible wrong entries from the lookAhead table\r
751      */\r
752     private void fixLookAheadMap(Node node) {\r
753         SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
754         for (Tuple<Node, BootstrapStatus> tuple : symphony.leftShortRangeLinks) {\r
755             symphony.lookAheadMap.put(tuple.x, null);\r
756         }\r
757         for (Tuple<Node, BootstrapStatus> tuple : symphony.rightShortRangeLinks) {\r
758             symphony.lookAheadMap.put(tuple.x, null);\r
759         }\r
760     }\r
761 \r
762     /**\r
763      * Sent keep-alive messages to verify if the links still online\r
764      *\r
765      * if enable the lookAhead mode i require the neighbours list from my neighbours (1-lookAhead).\r
766      *\r
767      * Note: I don't reuse the UPDATE_STATUS messages because i want to mantain separate the\r
768      * semantic and have more clear source code\r
769      */\r
770     private void keepAlive(Node node) {\r
771 \r
772         SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
773         Transport transport = (Transport) node.getProtocol(transportID);\r
774 \r
775         // Send and check for the long range links (both incoming and outgoing)\r
776         for (Iterable<Node> iterable : new Iterable[]{symphony.longRangeLinksOutgoing, symphony.longRangeLinksIncoming}) {\r
777             Iterator<Node> longLinksIterator = iterable.iterator();\r
778             while (longLinksIterator.hasNext()) {\r
779                 Node longLinkNode = longLinksIterator.next();\r
780                 Integer value = keepAliveMap.get(longLinkNode);\r
781                 if (value == null) {\r
782                     value = 0;\r
783                 }\r
784 \r
785                 /*\r
786                  * Verify if i reached the sufficient time of sending and not receiving an answer\r
787                  * and so i can consider that node as disconnected\r
788                  */\r
789                 if (value >= nTimeout) {\r
790                     symphony.lookAheadMap.put(longLinkNode, null); // Do it anyway if it's enabled the lookAhead or not\r
791                     longLinksIterator.remove();\r
792                 } else {\r
793                     keepAliveMap.put(longLinkNode, value + 1);\r
794 \r
795                     Message keepAliveMsg = new Message(symphony.lookAhead, node, MessageType.KEEP_ALIVE);\r
796                     transport.send(node, longLinkNode, keepAliveMsg, pid);\r
797                 }\r
798             }\r
799         }\r
800 \r
801         // Send and check for the short links\r
802         for (Iterable<Tuple<Node, BootstrapStatus>> iterable : new Iterable[]{symphony.rightShortRangeLinks, symphony.leftShortRangeLinks}) {\r
803             Iterator<Tuple<Node, BootstrapStatus>> shortLinksIterator = iterable.iterator();\r
804             while (shortLinksIterator.hasNext()) {\r
805                 Node shortLinkNode = shortLinksIterator.next().x;\r
806                 Integer value = keepAliveMap.get(shortLinkNode);\r
807                 if (value == null) {\r
808                     value = 0;\r
809                 }\r
810 \r
811                 // the same as above\r
812                 if (value >= nTimeout) {\r
813                     shortLinksIterator.remove();\r
814                 } else {\r
815                     keepAliveMap.put(shortLinkNode, value + 1);\r
816 \r
817                     // LookAhead is not to be performed to the short links!\r
818                     Message keepAliveMsg = new Message(false, node, MessageType.KEEP_ALIVE);\r
819                     transport.send(node, shortLinkNode, keepAliveMsg, pid);\r
820                 }\r
821             }\r
822         }\r
823     }\r
824 \r
825     /**\r
826      * Implement the Re-Linking criteria of the Long Range Links. It does the complete refresh. The\r
827      * repopulation is done through the 'updateLongRangeLinks' method.\r
828      */\r
829     private void reLinkingProtocol(Node node) {\r
830         // I do the check only if i succeed at least one time to create a long range link\r
831         if (nLink > 0) {\r
832             double criterionValue = n / nLink;\r
833 \r
834             if (!(criterionValue >= relinkingLowerBound && criterionValue <= relinkingUpperBound)) {\r
835 \r
836                 /*\r
837                  * Not explicitly precised in the paper: if i haven't created a new link i update\r
838                  * anyway nLink because can happen a special case that i will not be able to create\r
839                  * links because the reLinkingProtocol procedure is "faster".\r
840                  */\r
841                 nLink = n;\r
842 \r
843                 SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
844                 Transport transport = (Transport) node.getProtocol(transportID);\r
845 \r
846                 // Communicate to the all Outgoing Long Range Links that they aren't anymore\r
847                 for (Node longRangeLinkOutgoingNode : symphony.longRangeLinksOutgoing) {\r
848                     Message disconnectMsg = new Message(null, node, MessageType.DISCONNECT_LONG_RANGE_LINK);\r
849                     transport.send(node, longRangeLinkOutgoingNode, disconnectMsg, pid);\r
850                 }\r
851 \r
852                 symphony.longRangeLinksOutgoing.clear();\r
853             }\r
854         }\r
855     }\r
856 \r
857     public int getK() {\r
858         return k;\r
859     }\r
860 \r
861     public int getN() {\r
862         return n;\r
863     }\r
864 }\r