--- /dev/null
+package example.symphony;\r
+\r
+import java.util.*;\r
+import java.util.logging.Level;\r
+import java.util.logging.Logger;\r
+\r
+import example.symphony.Message.MessageType;\r
+import example.symphony.SymphonyProtocol.BootstrapStatus;\r
+import peersim.cdsim.CDProtocol;\r
+import peersim.config.Configuration;\r
+import peersim.core.CommonState;\r
+import peersim.core.Fallible;\r
+import peersim.core.Node;\r
+import peersim.edsim.EDProtocol;\r
+import peersim.transport.Transport;\r
+\r
+/**\r
+ *\r
+ * @author Andrea Esposito <and1989@gmail.com>\r
+ */\r
+public class SymphonyNetworkManager implements EDProtocol, CDProtocol {\r
+\r
+ private static final String PAR_SYMPHONY = "symphony";\r
+ private static final String PAR_TRANSP = "transport";\r
+ private static final String PAR_ATTEMPTS = "attempts";\r
+ private static final String PAR_NETSIZE = "networkestimator";\r
+ private static final String PAR_NUM_TIMEOUT = "nTimeout";\r
+ private static final String PAR_RELINKING = "relinking";\r
+ private static final String PAR_RELINKING_LOWER_BOUND = "relinkingLowerBound";\r
+ private static final String PAR_RELINKING_UPPER_BOUND = "relinkingUpperBound";\r
+ private static final int DEFAULT_K = 1;\r
+ private static final int DEFAULT_N = 2;\r
+ private static final double DEFAULT_RELINKING_LOWER_BOUND = 0.5;\r
+ private static final double DEFAULT_RELINKING_UPPER_BOUND = 2.0;\r
+ private final String prefix;\r
+ private final int symphonyID;\r
+ private final int transportID;\r
+ private final int networkEstimatorID;\r
+ private final int attempts;\r
+ private final int pid;\r
+ private final int nTimeout;\r
+ private final HashMap<Node, Integer> keepAliveMap;\r
+ private final boolean relinkingProtocolActivated;\r
+ private final double relinkingUpperBound;\r
+ private final double relinkingLowerBound;\r
+ private int k = DEFAULT_K; // Number of Long Range Link\r
+ private int n = DEFAULT_N; // Estimation Network size\r
+ private static boolean firstPrintConfig = true;\r
+ /*\r
+ * Estimation Network size at which last long distance link was established, at the beginning -1\r
+ * to indicate that we never had Long Range Links\r
+ */\r
+ private int nLink = -1;\r
+ private int currentAttempts;\r
+\r
+ public SymphonyNetworkManager(String prefix) {\r
+\r
+ this.prefix = prefix;\r
+ pid = Configuration.lookupPid(prefix.replaceAll("protocol.", ""));\r
+ symphonyID = Configuration.getPid(prefix + "." + PAR_SYMPHONY);\r
+ transportID = Configuration.getPid(prefix + "." + PAR_TRANSP);\r
+ networkEstimatorID = Configuration.getPid(prefix + "." + PAR_NETSIZE);\r
+ attempts = Configuration.getInt(prefix + "." + PAR_ATTEMPTS);\r
+ nTimeout = Configuration.getInt(prefix + "." + PAR_NUM_TIMEOUT, 10);\r
+ relinkingProtocolActivated = !Configuration.getString(prefix + "." + PAR_RELINKING, "on").toLowerCase().equals("off");\r
+ double relinkingLowerBoundAppo = Configuration.getDouble(prefix + "." + PAR_RELINKING_LOWER_BOUND, DEFAULT_RELINKING_LOWER_BOUND);\r
+ double relinkingUpperBoundAppo = Configuration.getDouble(prefix + "." + PAR_RELINKING_UPPER_BOUND, DEFAULT_RELINKING_UPPER_BOUND);\r
+ if (relinkingLowerBoundAppo > relinkingUpperBoundAppo) {\r
+ relinkingLowerBound = DEFAULT_RELINKING_LOWER_BOUND;\r
+ relinkingUpperBound = DEFAULT_RELINKING_UPPER_BOUND;\r
+ } else {\r
+ relinkingLowerBound = relinkingLowerBoundAppo;\r
+ relinkingUpperBound = relinkingUpperBoundAppo;\r
+ }\r
+\r
+ keepAliveMap = new HashMap<Node, Integer>();\r
+\r
+ printConfig();\r
+ }\r
+\r
+ private void printConfig() {\r
+\r
+ if (firstPrintConfig) {\r
+ firstPrintConfig = false;\r
+ System.out.println(SymphonyNetworkManager.class.getSimpleName() + " Configuration:");\r
+ System.out.println("- Attempts per LongRangeLinks: " + attempts);\r
+ System.out.println("- Number of Timeout before a node is considered OFFLINE (through Keep-alive):" + nTimeout);\r
+ System.out.println("- Relinking: " + (relinkingProtocolActivated ? "ON" : "OFF"));\r
+ System.out.println("- Relinking Range: [" + relinkingLowerBound + ", " + relinkingUpperBound + "]");\r
+ System.out.println("-------------------------------\n");\r
+ }\r
+ }\r
+\r
+ public void join(final Node node, final Node bootstrapNode) throws RoutingException {\r
+ final SymphonyProtocol bootstrapSymphony = (SymphonyProtocol) bootstrapNode.getProtocol(symphonyID);\r
+ SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
+\r
+ /*\r
+ * Search (through the bootstrap node) and contact the Manager Node of myself such a way to\r
+ * be able to insert myself into the ring and create the short links\r
+ *\r
+ */\r
+ bootstrapSymphony.route(bootstrapNode, symphony.getIdentifier(), new Handler() {\r
+\r
+ public void handle(SymphonyProtocol src, Tuple<Node, Double> tuple) {\r
+ if (tuple == null) {\r
+ Logger.getLogger(SymphonyNetworkManager.class.getName()).log(Level.SEVERE, "FAIL ROUTE JOIN");\r
+ node.setFailState(Fallible.DEAD);\r
+ return;\r
+ }\r
+\r
+ Node managerNode = tuple.x;\r
+\r
+ Transport transport = (Transport) node.getProtocol(transportID);\r
+ Message msg = new Message(node, node, MessageType.JOIN);\r
+ transport.send(node, managerNode, msg, pid);\r
+ }\r
+ });\r
+\r
+ // The Long Range Links are added after that i joined the ring (before i can't because i haven't got the nodes to do the routing)\r
+ }\r
+\r
+ /**\r
+ * Conservative Re-Linking (i reuse the ones already created: not all fresh)\r
+ *\r
+ * @param node\r
+ */\r
+ public void updateLongRangeLinks(Node node) {\r
+ SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
+ Transport transport = (Transport) node.getProtocol(transportID);\r
+\r
+ // if too much links i delete the farest ones\r
+ while (symphony.longRangeLinksOutgoing.size() > k) {\r
+ Node distantNode = Collections.max(symphony.longRangeLinksOutgoing, new SymphonyNodeComparator(symphonyID, node));\r
+ symphony.longRangeLinksOutgoing.remove(distantNode);\r
+\r
+ // Communicate to the outgoing node that it ins't anymore one of my long range links\r
+ Message disconnectMsg = new Message(null, node, MessageType.DISCONNECT_LONG_RANGE_LINK);\r
+ transport.send(node, distantNode, disconnectMsg, pid);\r
+ }\r
+\r
+ // I can search Long Range Links only if i'm into the ring and i'm able to do routing\r
+ if (symphony.isBootstrapped()) {\r
+ // if only few i try again, untill attempts times, to add new ones\r
+ int difference = k - symphony.longRangeLinksOutgoing.size();\r
+ currentAttempts = attempts;\r
+ for (int i = 0; i < difference; i++) {\r
+ sendLongRangeLinkRequest(symphony, node);\r
+ }\r
+ }\r
+ }\r
+ private static final int MAX_ANTILOOP_COUNT_MANAGER_MYSELF = 5;\r
+ private int antiLoopManagerMySelf = 0;\r
+\r
+ private void sendLongRangeLinkRequest(final SymphonyProtocol symphony, final Node node) {\r
+ boolean routingOk;\r
+ do {\r
+ double distance = Math.exp((Math.log(n) / Math.log(2)) * (CommonState.r.nextDouble() - 1.0)); // Harmonic Distribution\r
+ double targetIdentifier = (symphony.getIdentifier() + distance) % 1;\r
+ try {\r
+\r
+ symphony.route(node, targetIdentifier, new Handler() {\r
+\r
+ public void handle(SymphonyProtocol src, Tuple<Node, Double> tuple) {\r
+\r
+ if (tuple == null) {\r
+ Logger.getLogger(SymphonyNetworkManager.class.getName()).log(Level.SEVERE, "FAIL ROUTE SENDLONGRANGELINKREQUEST");\r
+ return;\r
+ }\r
+\r
+ Collection<Node> allShortLinks = new LinkedList<Node>();\r
+ for (Tuple<Node, BootstrapStatus> shortTuple : symphony.leftShortRangeLinks) {\r
+ allShortLinks.add(shortTuple.x);\r
+ }\r
+ for (Tuple<Node, BootstrapStatus> shortTuple : symphony.rightShortRangeLinks) {\r
+ allShortLinks.add(shortTuple.x);\r
+ }\r
+\r
+ /*\r
+ *\r
+ * I'm myself one of my short links, special case... i try again without\r
+ * reduce the attempts for a maximum of MAX_ANTILOOP_COUNT_MANAGER_MYSELF\r
+ * times after that i start again to reduce the attempts\r
+ */\r
+ if (tuple.x.equals(node) || allShortLinks.contains(tuple.x)) {\r
+\r
+ if (antiLoopManagerMySelf < MAX_ANTILOOP_COUNT_MANAGER_MYSELF) {\r
+\r
+ antiLoopManagerMySelf++;\r
+ sendLongRangeLinkRequest(symphony, node);\r
+ } else {\r
+ antiLoopManagerMySelf = 0;\r
+ currentAttempts--;\r
+ }\r
+ } else {\r
+\r
+ boolean alreadyAdded = symphony.longRangeLinksOutgoing.contains(tuple.x);\r
+ /*\r
+ *\r
+ * OPINABLE: DESCREASE ATTEMPTS ONLY FOR REJECT? If yes i have to manage\r
+ * the possible loop (nodi exhaurited so already all added)\r
+ */\r
+ if (alreadyAdded && currentAttempts > 0) {\r
+ currentAttempts--;\r
+ sendLongRangeLinkRequest(symphony, node);\r
+ } else if (!alreadyAdded) {\r
+ Message msg = new Message(null, node, MessageType.REQUEST_LONG_RANGE_LINK);\r
+ Transport transport = (Transport) node.getProtocol(transportID);\r
+ transport.send(node, tuple.x, msg, pid);\r
+ }\r
+ }\r
+ }\r
+ });\r
+ routingOk = true;\r
+ } catch (RoutingException ex) {\r
+ routingOk = false;\r
+ }\r
+ } while (!routingOk);\r
+ }\r
+\r
+ public void leave(Node node) {\r
+ SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
+\r
+ if (symphony.loggedIntoNetwork != BootstrapStatus.OFFLINE) {\r
+ Transport transport = (Transport) node.getProtocol(transportID);\r
+\r
+ symphony.loggedIntoNetwork = BootstrapStatus.OFFLINE;\r
+\r
+ // Communicate that i'm leaving to the outgoing (that i point to) nodes\r
+ for (Node outgoingNode : symphony.longRangeLinksOutgoing) {\r
+ Message disconnectMsg = new Message(null, node, MessageType.DISCONNECT_LONG_RANGE_LINK);\r
+ transport.send(node, outgoingNode, disconnectMsg, pid);\r
+ }\r
+\r
+ // Communicate that i'm leaving to the incoming (that they point to me) nodes\r
+ for (Node incomingNode : symphony.longRangeLinksIncoming) {\r
+ Message unavailableMsg = new Message(null, node, MessageType.UNAVAILABLE_LONG_RANGE_LINK);\r
+ transport.send(node, incomingNode, unavailableMsg, pid);\r
+ }\r
+\r
+ // Communicate to my neighbours (short range links) that i'm leaving and i send to them the near neighbours\r
+ for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {\r
+ Message leaveMsg = new Message(symphony.rightShortRangeLinks.clone(), node, MessageType.LEAVE);\r
+ transport.send(node, leftTuple.x, leaveMsg, pid);\r
+ }\r
+\r
+ for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {\r
+ Message leaveMsg = new Message(symphony.leftShortRangeLinks.clone(), node, MessageType.LEAVE);\r
+ transport.send(node, rightTuple.x, leaveMsg, pid);\r
+ }\r
+\r
+ node.setFailState(Fallible.DEAD);\r
+ }\r
+ }\r
+\r
+ public void processEvent(Node node, int pid, Object event) {\r
+\r
+ Message msg = (Message) event;\r
+\r
+ SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
+ Transport transport = (Transport) node.getProtocol(transportID);\r
+\r
+ Collection<Tuple<Node, BootstrapStatus>> collection = null;\r
+ switch (msg.getType()) {\r
+ case JOIN:\r
+ // I send my current neighbours to the entering node\r
+ collection = (Collection<Tuple<Node, BootstrapStatus>>) symphony.leftShortRangeLinks.clone();\r
+ collection.addAll((Collection<Tuple<Node, BootstrapStatus>>) symphony.rightShortRangeLinks.clone());\r
+ Message responseMsg = new Message(collection, node, MessageType.JOIN_RESPONSE);\r
+ transport.send(node, msg.getSourceNode(), responseMsg, pid);\r
+\r
+ /*\r
+ * Update my neighbours list, adding the new one (for sure it is entering in the\r
+ * left side)\r
+ *\r
+ * Put to "ONLINE_AND_ALL_NEIGHBOURS_OFFLINE" because maybe the bootstrap phase is\r
+ * not terminated yet (ashyncronous communication)\r
+ */\r
+ symphony.leftShortRangeLinks.add(new Tuple<Node, BootstrapStatus>(msg.getSourceNode(), BootstrapStatus.ONLINE_AND_ALL_NEIGHBOURS_OFFLINE));\r
+\r
+\r
+ fixNeighbours(node, symphony.leftShortRangeLinks);\r
+ break;\r
+ case JOIN_RESPONSE:\r
+\r
+ Collection<Tuple<Node, BootstrapStatus>> tupleCollection = (Collection<Tuple<Node, BootstrapStatus>>) msg.getBody();\r
+\r
+ /*\r
+ *\r
+ * My manager is a right neighbour. The manager is already inside the ring, boostrap\r
+ * obliviously ok\r
+ */\r
+ symphony.rightShortRangeLinks.add(new Tuple<Node, BootstrapStatus>(msg.getSourceNode(), BootstrapStatus.ONLINE));\r
+\r
+ // Set my neighbours in the correct position\r
+ for (Tuple<Node, BootstrapStatus> tuple : tupleCollection) {\r
+ if (SymphonyProtocol.isLeftNeighbour(node, tuple.x)) {\r
+ symphony.leftShortRangeLinks.add(tuple);\r
+ } else {\r
+ symphony.rightShortRangeLinks.add(tuple);\r
+ }\r
+ }\r
+\r
+ fixNeighbours(node, symphony.leftShortRangeLinks);\r
+ fixNeighbours(node, symphony.rightShortRangeLinks);\r
+\r
+ // Update bootstrap status\r
+ checkBootstrapStatus(node);\r
+\r
+ // I send the refresh command such a way to exchange the views\r
+ refreshNeighbours(node);\r
+\r
+ // Update Long Range Links, because it's at the beginning is the same as adding k\r
+ updateLongRangeLinks(node);\r
+ break;\r
+ case UPDATE_NEIGHBOURS:\r
+\r
+ Collection<Tuple<Node, BootstrapStatus>> collectionCloned = ((Collection<Tuple<Node, BootstrapStatus>>) symphony.leftShortRangeLinks.clone());\r
+ collectionCloned.addAll(((Collection<Tuple<Node, BootstrapStatus>>) symphony.rightShortRangeLinks.clone()));\r
+\r
+ // Send my neighbours such a way it can also update itself\r
+ Message responseUpdateMsg = new Message(collectionCloned, node, MessageType.UPDATE_NEIGHBOURS_RESPONSE);\r
+ transport.send(node, msg.getSourceNode(), responseUpdateMsg, pid);\r
+\r
+ // Update my view with the new node\r
+ Tuple<Node, BootstrapStatus> neighbourTuple = new Tuple<Node, BootstrapStatus>(msg.getSourceNode(), (BootstrapStatus) msg.getBody());\r
+ if (SymphonyProtocol.isLeftNeighbour(node, msg.getSourceNode())) {\r
+ collection = symphony.leftShortRangeLinks;\r
+ } else {\r
+ collection = symphony.rightShortRangeLinks;\r
+ }\r
+ collection.add(neighbourTuple);\r
+\r
+ fixNeighbours(node, collection);\r
+ fixLookAheadMap(node);\r
+ break;\r
+ case UPDATE_NEIGHBOURS_RESPONSE:\r
+\r
+ Collection<Tuple<Node, BootstrapStatus>> responseCollection = (Collection<Tuple<Node, BootstrapStatus>>) msg.getBody();\r
+\r
+ for (Tuple<Node, BootstrapStatus> neighbourResponseTuple : responseCollection) {\r
+ if (SymphonyProtocol.isLeftNeighbour(node, neighbourResponseTuple.x)) {\r
+ collection = symphony.leftShortRangeLinks;\r
+ } else {\r
+ collection = symphony.rightShortRangeLinks;\r
+ }\r
+ collection.add(neighbourResponseTuple);\r
+ }\r
+\r
+ // Fix the neighbours number to the maximum allow and maybe remove myself from the list\r
+ fixNeighbours(node, symphony.leftShortRangeLinks);\r
+ fixNeighbours(node, symphony.rightShortRangeLinks);\r
+ fixLookAheadMap(node);\r
+ break;\r
+ case UPDATE_STATUS:\r
+ case UPDATE_STATUS_RESPONSE:\r
+\r
+ Node updNode = msg.getSourceNode();\r
+ BootstrapStatus updStatus = (BootstrapStatus) msg.getBody();\r
+\r
+ // I search the neighbour and i update its status\r
+ boolean founded = false;\r
+\r
+ // Try to see if it is on the left\r
+ for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {\r
+ if (leftTuple.x.equals(updNode)) {\r
+ symphony.leftShortRangeLinks.remove(leftTuple);\r
+ symphony.leftShortRangeLinks.add(new Tuple<Node, BootstrapStatus>(updNode, updStatus));\r
+\r
+ founded = true;\r
+ break;\r
+ }\r
+ }\r
+\r
+ // If it isn't on the left i try with the neighbours on the right\r
+ if (!founded) {\r
+ for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {\r
+ if (rightTuple.x.equals(updNode)) {\r
+ symphony.rightShortRangeLinks.remove(rightTuple);\r
+ symphony.rightShortRangeLinks.add(new Tuple<Node, BootstrapStatus>(updNode, updStatus));\r
+\r
+ break;\r
+ }\r
+ }\r
+\r
+ fixNeighbours(node, symphony.rightShortRangeLinks);\r
+ } else {\r
+ fixNeighbours(node, symphony.leftShortRangeLinks);\r
+ }\r
+\r
+ checkBootstrapStatusAndAlert(node);\r
+\r
+ if (msg.getType() == MessageType.UPDATE_STATUS) {\r
+ Message responseUpdStatus = new Message(symphony.loggedIntoNetwork, node, MessageType.UPDATE_STATUS_RESPONSE);\r
+ transport.send(node, updNode, responseUpdStatus, pid);\r
+ }\r
+\r
+ break;\r
+ case REQUEST_LONG_RANGE_LINK:\r
+ MessageType responseType = MessageType.REJECT_LONG_RANGE_LINK;\r
+ if (symphony.longRangeLinksIncoming.size() < (2 * k)) {\r
+ boolean added = symphony.longRangeLinksIncoming.add(msg.getSourceNode());\r
+ if (added) {\r
+ responseType = MessageType.ACCEPTED_LONG_RANGE_LINK;\r
+ }\r
+ }\r
+ Message responseLongLinkMsg = new Message(null, node, responseType);\r
+ transport.send(node, msg.getSourceNode(), responseLongLinkMsg, pid);\r
+ break;\r
+ case ACCEPTED_LONG_RANGE_LINK:\r
+ nLink = n;\r
+ symphony.longRangeLinksOutgoing.add(msg.getSourceNode());\r
+ break;\r
+ case REJECT_LONG_RANGE_LINK:\r
+ if (currentAttempts > 0) {\r
+ currentAttempts--;\r
+ sendLongRangeLinkRequest(symphony, node);\r
+ }\r
+ break;\r
+ case DISCONNECT_LONG_RANGE_LINK:\r
+ symphony.longRangeLinksIncoming.remove(msg.getSourceNode());\r
+ symphony.lookAheadMap.put(msg.getSourceNode(), null);\r
+ break;\r
+ case UNAVAILABLE_LONG_RANGE_LINK:\r
+ symphony.longRangeLinksOutgoing.remove(msg.getSourceNode());\r
+ symphony.lookAheadMap.put(msg.getSourceNode(), null);\r
+ break;\r
+ case LEAVE:\r
+ Tuple<Node, BootstrapStatus> foundedTuple = null;\r
+\r
+ // Verify if the node that is leaving is a left neighbour\r
+ for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {\r
+ if (leftTuple.x.equals(msg.getSourceNode())) {\r
+ collection = symphony.leftShortRangeLinks;\r
+ foundedTuple = leftTuple;\r
+ break;\r
+ }\r
+ }\r
+\r
+ // Verify if the node that is leaving is a right neighbour\r
+ if (collection == null) {\r
+ for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {\r
+ if (rightTuple.x.equals(msg.getSourceNode())) {\r
+ collection = symphony.rightShortRangeLinks;\r
+ foundedTuple = rightTuple;\r
+ break;\r
+ }\r
+ }\r
+ }\r
+\r
+ // if i've found the neighbour i remove it and i add to myself its neighbours\r
+ if (collection != null) {\r
+ collection.addAll((Collection<Tuple<Node, BootstrapStatus>>) msg.getBody());\r
+ collection.remove(foundedTuple);\r
+ fixNeighbours(node, collection);\r
+\r
+ // Update status and ready to send an alert in case i'm out of the ring\r
+ checkBootstrapStatusAndAlert(node);\r
+ }\r
+ break;\r
+ case KEEP_ALIVE:\r
+ Set<Double>[] lookAheadSetArray = new LinkedHashSet[2];\r
+\r
+ /*\r
+ * Check if the contacting node is doing lookAhead and in case of affirmative answer\r
+ * i provide to it the long range link identifiers (according to my routing mode)\r
+ */\r
+ if ((Boolean) msg.getBody()) {\r
+ int i = 0;\r
+ Iterable[] iterableArray;\r
+ if (symphony.bidirectionalRouting) {\r
+ iterableArray = new Iterable[]{symphony.longRangeLinksOutgoing, symphony.longRangeLinksIncoming};\r
+ } else {\r
+ iterableArray = new Iterable[]{symphony.longRangeLinksOutgoing};\r
+ }\r
+\r
+ for (Iterable<Node> iterable : iterableArray) {\r
+ lookAheadSetArray[i] = new LinkedHashSet<Double>();\r
+ Set<Double> lookAheadSet = lookAheadSetArray[i];\r
+ Iterator<Node> it = iterable.iterator();\r
+ while (it.hasNext()) {\r
+ Node longLinkNode = it.next();\r
+ lookAheadSet.add(((SymphonyProtocol) longLinkNode.getProtocol(symphonyID)).getIdentifier());\r
+ }\r
+ i++;\r
+ }\r
+ }\r
+\r
+ transport.send(node, msg.getSourceNode(), new Message(lookAheadSetArray, node, MessageType.KEEP_ALIVE_RESPONSE), pid);\r
+ break;\r
+ case KEEP_ALIVE_RESPONSE:\r
+ // Reset the counter to 0\r
+ keepAliveMap.put(msg.getSourceNode(), 0);\r
+\r
+ if (symphony.lookAhead) {\r
+ symphony.lookAheadMap.put(msg.getSourceNode(), (Set<Double>[]) msg.getBody());\r
+ }\r
+\r
+ break;\r
+ }\r
+ }\r
+\r
+ /**\r
+ *\r
+ * Update the status and communicate immediately to the neighbours if the node is gone out from\r
+ * the ring (and before it was inside)\r
+ *\r
+ * @param node\r
+ */\r
+ private void checkBootstrapStatusAndAlert(Node node) {\r
+ SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
+ BootstrapStatus beforeStatus = symphony.loggedIntoNetwork;\r
+\r
+ checkBootstrapStatus(node);\r
+\r
+ // Instead of waiting that the update happens periodically i do it now because i'm out of the ring and before i wasn't\r
+ if (symphony.loggedIntoNetwork != beforeStatus && !symphony.isBootstrapped()) {\r
+ updateBootstrapStatusNeighbours(node, true);\r
+ }\r
+ }\r
+\r
+ private void fixNeighbours(Node node, Collection<Tuple<Node, BootstrapStatus>> neighbours) {\r
+\r
+ SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
+\r
+ // Remove duplicates, remove that ones that are in an obsolete status\r
+ Collection<Tuple<Node, BootstrapStatus>> removedNeighbours = new LinkedHashSet<Tuple<Node, BootstrapStatus>>();\r
+ for (Tuple<Node, BootstrapStatus> tuple : neighbours) {\r
+\r
+ // Remove myself from the neighbours list\r
+ if (tuple.x.equals(node)) {\r
+ removedNeighbours.add(tuple);\r
+ continue;\r
+ }\r
+\r
+ EnumSet<BootstrapStatus> status = EnumSet.allOf(BootstrapStatus.class);\r
+ status.remove(tuple.y);\r
+\r
+ for (BootstrapStatus opposite : status) {\r
+ Tuple<Node, BootstrapStatus> oppositeNeighbour = new Tuple<Node, BootstrapStatus>(tuple.x, opposite);\r
+ if (neighbours.contains(oppositeNeighbour)) {\r
+ if (tuple.y != BootstrapStatus.ONLINE) {\r
+ removedNeighbours.add(new Tuple<Node, BootstrapStatus>(tuple.x, BootstrapStatus.OFFLINE));\r
+ if (opposite == BootstrapStatus.ONLINE) {\r
+ removedNeighbours.add(new Tuple<Node, BootstrapStatus>(tuple.x, BootstrapStatus.ONLINE_AND_ALL_NEIGHBOURS_OFFLINE));\r
+ }\r
+ }\r
+ }\r
+ }\r
+ }\r
+ neighbours.removeAll(removedNeighbours);\r
+\r
+ /*\r
+ *\r
+ * I count the neighbours that are in the ONLINE status but before i remove the ones that\r
+ * are gone in timeout during the keep-alive procedure because can be someone that is old\r
+ * but not remove from the exchanging views (UPDATE_NEIGHBOURS) procedure and are not\r
+ * effectively online. To do anyway the possibility to the node to join again i decrease its\r
+ * timeout value. This only if the node is ONLINE and so i'm really interested that it is ok\r
+ * for the routing.\r
+ *\r
+ */\r
+ int onlineNeighbours = 0;\r
+ for (Tuple<Node, BootstrapStatus> tuple : neighbours) {\r
+\r
+ Integer value = keepAliveMap.get(tuple.x);\r
+ if (value != null && value >= nTimeout && tuple.y == BootstrapStatus.ONLINE) {\r
+ keepAliveMap.put(tuple.x, value - 1);\r
+ removedNeighbours.add(tuple);\r
+ } else {\r
+\r
+ if (tuple.y == BootstrapStatus.ONLINE) {\r
+ onlineNeighbours++;\r
+ }\r
+ }\r
+ }\r
+ neighbours.removeAll(removedNeighbours);\r
+\r
+ // Fix the neighbours number to the maximum allowed\r
+ SymphonyNodeComparator comparator = new SymphonyNodeComparator(symphonyID, node);\r
+ AdapterSymphonyNodeComparator adapterComparator = new AdapterSymphonyNodeComparator(comparator);\r
+ while (neighbours.size() > symphony.numberShortRangeLinksPerSide) {\r
+ Tuple<Node, BootstrapStatus> distantTuple = Collections.max(neighbours, adapterComparator);\r
+\r
+ // Mantain the link with the ring\r
+ if (distantTuple.y == BootstrapStatus.ONLINE) {\r
+ if (onlineNeighbours > 1) {\r
+ neighbours.remove(distantTuple);\r
+ onlineNeighbours--;\r
+ } else {\r
+ /*\r
+ * If will be only one neighbour that is online i save it and i'm going to\r
+ * eliminate another one (for sure it'll be not online)\r
+ *\r
+ */\r
+ Tuple<Node, BootstrapStatus> backupOnlineNeighbour = distantTuple;\r
+ neighbours.remove(backupOnlineNeighbour);\r
+ distantTuple = Collections.max(neighbours, adapterComparator);\r
+ neighbours.add(backupOnlineNeighbour);\r
+ neighbours.remove(distantTuple);\r
+ }\r
+\r
+ } else {\r
+ neighbours.remove(distantTuple);\r
+ }\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public Object clone() {\r
+ SymphonyNetworkManager dolly = new SymphonyNetworkManager(prefix);\r
+ return dolly;\r
+ }\r
+\r
+ public void nextCycle(Node node, int protocolID) {\r
+\r
+ if (node.isUp()) {\r
+\r
+ // Update the estimated network size\r
+ updateN(node);\r
+\r
+ // Update the estimated K\r
+ updateK(node);\r
+\r
+ // Update the bootstrap status of my neighbours that were joining the ring\r
+ updateBootstrapStatusNeighbours(node, false);\r
+\r
+ // Refresh the neighbours views\r
+ refreshNeighbours(node);\r
+\r
+ // I send and check the connection status of the neighbours\r
+ keepAlive(node);\r
+\r
+ // Update the bootstrap status\r
+ checkBootstrapStatus(node);\r
+\r
+ // If it's active i check the Relinking criteria\r
+ if (relinkingProtocolActivated) {\r
+ reLinkingProtocol(node);\r
+ }\r
+\r
+ // Update the long range links (conservative)\r
+ updateLongRangeLinks(node);\r
+ }\r
+ }\r
+\r
+ /**\r
+ *\r
+ * @param allNeighbours true, communicate/receive the status update from all the neighbours.\r
+ * false, communicate/receive the status update only from the neighbours that are NOT ONLINE\r
+ *\r
+ */\r
+ private void updateBootstrapStatusNeighbours(Node node, boolean allNeighbours) {\r
+ SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
+ Transport transport = (Transport) node.getProtocol(transportID);\r
+\r
+ Collection<Tuple<Node, BootstrapStatus>> collection = new LinkedHashSet<Tuple<Node, BootstrapStatus>>();\r
+ collection.addAll(symphony.leftShortRangeLinks);\r
+ collection.addAll(symphony.rightShortRangeLinks);\r
+\r
+ for (Tuple<Node, BootstrapStatus> neighbourTuple : collection) {\r
+ if (allNeighbours || neighbourTuple.y != BootstrapStatus.ONLINE) {\r
+ Message msg = new Message(symphony.loggedIntoNetwork, node, MessageType.UPDATE_STATUS);\r
+ transport.send(node, neighbourTuple.x, msg, pid);\r
+ }\r
+ }\r
+ }\r
+\r
+ private void updateN(Node node) {\r
+ NetworkSizeEstimatorProtocolInterface networkEstimator = (NetworkSizeEstimatorProtocolInterface) node.getProtocol(networkEstimatorID);\r
+ n = networkEstimator.getNetworkSize(node);\r
+ if (n <= 0) {\r
+ n = DEFAULT_N;\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Update the K value with the current expectation of the network size\r
+ */\r
+ private void updateK(Node node) {\r
+\r
+ SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
+ if (!symphony.fixedLongRangeLinks) {\r
+ k = (int) Math.ceil(Math.log(n) / Math.log(2));\r
+\r
+ if (k <= 0) {\r
+ k = DEFAULT_K;\r
+ }\r
+ } else {\r
+ k = symphony.numberFixedLongRangeLinks;\r
+ }\r
+ }\r
+\r
+ private void refreshNeighbours(Node node) {\r
+ SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
+ Transport transport = (Transport) node.getProtocol(transportID);\r
+\r
+ for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {\r
+ Node leftNode = leftTuple.x;\r
+ Message updateNeighbourMsg = new Message(symphony.loggedIntoNetwork, node, MessageType.UPDATE_NEIGHBOURS);\r
+ transport.send(node, leftNode, updateNeighbourMsg, pid);\r
+ }\r
+\r
+ for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {\r
+ Node rightNode = rightTuple.x;\r
+ Message updateNeighbourMsg = new Message(symphony.loggedIntoNetwork, node, MessageType.UPDATE_NEIGHBOURS);\r
+ transport.send(node, rightNode, updateNeighbourMsg, pid);\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Method to update the (connection) status of the node. Perform the update to the "up" so:\r
+ * OFFLINE -> ONLINE_AND_ALL_NEIGHBOURS_OFFLINE -> ONLINE\r
+ *\r
+ * and to the "down" only: ONLINE -> ONLINE_AND_ALL_NEIGHBOURS_OFFLINE\r
+ *\r
+ * @param node\r
+ */\r
+ private void checkBootstrapStatus(Node node) {\r
+ SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
+\r
+ if (symphony.loggedIntoNetwork != BootstrapStatus.OFFLINE) {\r
+\r
+ symphony.loggedIntoNetwork = BootstrapStatus.ONLINE_AND_ALL_NEIGHBOURS_OFFLINE;\r
+\r
+ // Check if i'm inside the ring and i'm able to do routing\r
+ if (!symphony.leftShortRangeLinks.isEmpty() && !symphony.rightShortRangeLinks.isEmpty()) {\r
+\r
+ boolean leftOk = false;\r
+ for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {\r
+ if (leftTuple.y == BootstrapStatus.ONLINE) {\r
+ leftOk = true;\r
+ break;\r
+ }\r
+ }\r
+\r
+ if (leftOk) {\r
+ for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {\r
+ if (rightTuple.y == BootstrapStatus.ONLINE) {\r
+ symphony.loggedIntoNetwork = BootstrapStatus.ONLINE;\r
+ break;\r
+ }\r
+ }\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Remove the possible wrong entries from the lookAhead table\r
+ */\r
+ private void fixLookAheadMap(Node node) {\r
+ SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
+ for (Tuple<Node, BootstrapStatus> tuple : symphony.leftShortRangeLinks) {\r
+ symphony.lookAheadMap.put(tuple.x, null);\r
+ }\r
+ for (Tuple<Node, BootstrapStatus> tuple : symphony.rightShortRangeLinks) {\r
+ symphony.lookAheadMap.put(tuple.x, null);\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Sent keep-alive messages to verify if the links still online\r
+ *\r
+ * if enable the lookAhead mode i require the neighbours list from my neighbours (1-lookAhead).\r
+ *\r
+ * Note: I don't reuse the UPDATE_STATUS messages because i want to mantain separate the\r
+ * semantic and have more clear source code\r
+ */\r
+ private void keepAlive(Node node) {\r
+\r
+ SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
+ Transport transport = (Transport) node.getProtocol(transportID);\r
+\r
+ // Send and check for the long range links (both incoming and outgoing)\r
+ for (Iterable<Node> iterable : new Iterable[]{symphony.longRangeLinksOutgoing, symphony.longRangeLinksIncoming}) {\r
+ Iterator<Node> longLinksIterator = iterable.iterator();\r
+ while (longLinksIterator.hasNext()) {\r
+ Node longLinkNode = longLinksIterator.next();\r
+ Integer value = keepAliveMap.get(longLinkNode);\r
+ if (value == null) {\r
+ value = 0;\r
+ }\r
+\r
+ /*\r
+ * Verify if i reached the sufficient time of sending and not receiving an answer\r
+ * and so i can consider that node as disconnected\r
+ */\r
+ if (value >= nTimeout) {\r
+ symphony.lookAheadMap.put(longLinkNode, null); // Do it anyway if it's enabled the lookAhead or not\r
+ longLinksIterator.remove();\r
+ } else {\r
+ keepAliveMap.put(longLinkNode, value + 1);\r
+\r
+ Message keepAliveMsg = new Message(symphony.lookAhead, node, MessageType.KEEP_ALIVE);\r
+ transport.send(node, longLinkNode, keepAliveMsg, pid);\r
+ }\r
+ }\r
+ }\r
+\r
+ // Send and check for the short links\r
+ for (Iterable<Tuple<Node, BootstrapStatus>> iterable : new Iterable[]{symphony.rightShortRangeLinks, symphony.leftShortRangeLinks}) {\r
+ Iterator<Tuple<Node, BootstrapStatus>> shortLinksIterator = iterable.iterator();\r
+ while (shortLinksIterator.hasNext()) {\r
+ Node shortLinkNode = shortLinksIterator.next().x;\r
+ Integer value = keepAliveMap.get(shortLinkNode);\r
+ if (value == null) {\r
+ value = 0;\r
+ }\r
+\r
+ // the same as above\r
+ if (value >= nTimeout) {\r
+ shortLinksIterator.remove();\r
+ } else {\r
+ keepAliveMap.put(shortLinkNode, value + 1);\r
+\r
+ // LookAhead is not to be performed to the short links!\r
+ Message keepAliveMsg = new Message(false, node, MessageType.KEEP_ALIVE);\r
+ transport.send(node, shortLinkNode, keepAliveMsg, pid);\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Implement the Re-Linking criteria of the Long Range Links. It does the complete refresh. The\r
+ * repopulation is done through the 'updateLongRangeLinks' method.\r
+ */\r
+ private void reLinkingProtocol(Node node) {\r
+ // I do the check only if i succeed at least one time to create a long range link\r
+ if (nLink > 0) {\r
+ double criterionValue = n / nLink;\r
+\r
+ if (!(criterionValue >= relinkingLowerBound && criterionValue <= relinkingUpperBound)) {\r
+\r
+ /*\r
+ * Not explicitly precised in the paper: if i haven't created a new link i update\r
+ * anyway nLink because can happen a special case that i will not be able to create\r
+ * links because the reLinkingProtocol procedure is "faster".\r
+ */\r
+ nLink = n;\r
+\r
+ SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
+ Transport transport = (Transport) node.getProtocol(transportID);\r
+\r
+ // Communicate to the all Outgoing Long Range Links that they aren't anymore\r
+ for (Node longRangeLinkOutgoingNode : symphony.longRangeLinksOutgoing) {\r
+ Message disconnectMsg = new Message(null, node, MessageType.DISCONNECT_LONG_RANGE_LINK);\r
+ transport.send(node, longRangeLinkOutgoingNode, disconnectMsg, pid);\r
+ }\r
+\r
+ symphony.longRangeLinksOutgoing.clear();\r
+ }\r
+ }\r
+ }\r
+\r
+ public int getK() {\r
+ return k;\r
+ }\r
+\r
+ public int getN() {\r
+ return n;\r
+ }\r
+}\r