+++ /dev/null
-package example.symphony;\r
-\r
-import java.util.*;\r
-import java.util.logging.Level;\r
-import java.util.logging.Logger;\r
-\r
-import example.symphony.Message.MessageType;\r
-import example.symphony.SymphonyProtocol.BootstrapStatus;\r
-import peersim.cdsim.CDProtocol;\r
-import peersim.config.Configuration;\r
-import peersim.core.CommonState;\r
-import peersim.core.Fallible;\r
-import peersim.core.Node;\r
-import peersim.edsim.EDProtocol;\r
-import peersim.transport.Transport;\r
-\r
-/**\r
- *\r
- * @author Andrea Esposito <and1989@gmail.com>\r
- */\r
-public class SymphonyNetworkManager implements EDProtocol, CDProtocol {\r
-\r
- private static final String PAR_SYMPHONY = "symphony";\r
- private static final String PAR_TRANSP = "transport";\r
- private static final String PAR_ATTEMPTS = "attempts";\r
- private static final String PAR_NETSIZE = "networkestimator";\r
- private static final String PAR_NUM_TIMEOUT = "nTimeout";\r
- private static final String PAR_RELINKING = "relinking";\r
- private static final String PAR_RELINKING_LOWER_BOUND = "relinkingLowerBound";\r
- private static final String PAR_RELINKING_UPPER_BOUND = "relinkingUpperBound";\r
- private static final int DEFAULT_K = 1;\r
- private static final int DEFAULT_N = 2;\r
- private static final double DEFAULT_RELINKING_LOWER_BOUND = 0.5;\r
- private static final double DEFAULT_RELINKING_UPPER_BOUND = 2.0;\r
- private final String prefix;\r
- private final int symphonyID;\r
- private final int transportID;\r
- private final int networkEstimatorID;\r
- private final int attempts;\r
- private final int pid;\r
- private final int nTimeout;\r
- private final HashMap<Node, Integer> keepAliveMap;\r
- private final boolean relinkingProtocolActivated;\r
- private final double relinkingUpperBound;\r
- private final double relinkingLowerBound;\r
- private int k = DEFAULT_K; // Number of Long Range Link\r
- private int n = DEFAULT_N; // Estimation Network size\r
- private static boolean firstPrintConfig = true;\r
- /*\r
- * Estimation Network size at which last long distance link was established, at the beginning -1\r
- * to indicate that we never had Long Range Links\r
- */\r
- private int nLink = -1;\r
- private int currentAttempts;\r
-\r
- public SymphonyNetworkManager(String prefix) {\r
-\r
- this.prefix = prefix;\r
- pid = Configuration.lookupPid(prefix.replaceAll("protocol.", ""));\r
- symphonyID = Configuration.getPid(prefix + "." + PAR_SYMPHONY);\r
- transportID = Configuration.getPid(prefix + "." + PAR_TRANSP);\r
- networkEstimatorID = Configuration.getPid(prefix + "." + PAR_NETSIZE);\r
- attempts = Configuration.getInt(prefix + "." + PAR_ATTEMPTS);\r
- nTimeout = Configuration.getInt(prefix + "." + PAR_NUM_TIMEOUT, 10);\r
- relinkingProtocolActivated = !Configuration.getString(prefix + "." + PAR_RELINKING, "on").toLowerCase().equals("off");\r
- double relinkingLowerBoundAppo = Configuration.getDouble(prefix + "." + PAR_RELINKING_LOWER_BOUND, DEFAULT_RELINKING_LOWER_BOUND);\r
- double relinkingUpperBoundAppo = Configuration.getDouble(prefix + "." + PAR_RELINKING_UPPER_BOUND, DEFAULT_RELINKING_UPPER_BOUND);\r
- if (relinkingLowerBoundAppo > relinkingUpperBoundAppo) {\r
- relinkingLowerBound = DEFAULT_RELINKING_LOWER_BOUND;\r
- relinkingUpperBound = DEFAULT_RELINKING_UPPER_BOUND;\r
- } else {\r
- relinkingLowerBound = relinkingLowerBoundAppo;\r
- relinkingUpperBound = relinkingUpperBoundAppo;\r
- }\r
-\r
- keepAliveMap = new HashMap<Node, Integer>();\r
-\r
- printConfig();\r
- }\r
-\r
- private void printConfig() {\r
-\r
- if (firstPrintConfig) {\r
- firstPrintConfig = false;\r
- System.out.println(SymphonyNetworkManager.class.getSimpleName() + " Configuration:");\r
- System.out.println("- Attempts per LongRangeLinks: " + attempts);\r
- System.out.println("- Number of Timeout before a node is considered OFFLINE (through Keep-alive):" + nTimeout);\r
- System.out.println("- Relinking: " + (relinkingProtocolActivated ? "ON" : "OFF"));\r
- System.out.println("- Relinking Range: [" + relinkingLowerBound + ", " + relinkingUpperBound + "]");\r
- System.out.println("-------------------------------\n");\r
- }\r
- }\r
-\r
- public void join(final Node node, final Node bootstrapNode) throws RoutingException {\r
- final SymphonyProtocol bootstrapSymphony = (SymphonyProtocol) bootstrapNode.getProtocol(symphonyID);\r
- SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
-\r
- /*\r
- * Search (through the bootstrap node) and contact the Manager Node of myself such a way to\r
- * be able to insert myself into the ring and create the short links\r
- *\r
- */\r
- bootstrapSymphony.route(bootstrapNode, symphony.getIdentifier(), new Handler() {\r
-\r
- public void handle(SymphonyProtocol src, Tuple<Node, Double> tuple) {\r
- if (tuple == null) {\r
- Logger.getLogger(SymphonyNetworkManager.class.getName()).log(Level.SEVERE, "FAIL ROUTE JOIN");\r
- node.setFailState(Fallible.DEAD);\r
- return;\r
- }\r
-\r
- Node managerNode = tuple.x;\r
-\r
- Transport transport = (Transport) node.getProtocol(transportID);\r
- Message msg = new Message(node, node, MessageType.JOIN);\r
- transport.send(node, managerNode, msg, pid);\r
- }\r
- });\r
-\r
- // The Long Range Links are added after that i joined the ring (before i can't because i haven't got the nodes to do the routing)\r
- }\r
-\r
- /**\r
- * Conservative Re-Linking (i reuse the ones already created: not all fresh)\r
- *\r
- * @param node\r
- */\r
- public void updateLongRangeLinks(Node node) {\r
- SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
- Transport transport = (Transport) node.getProtocol(transportID);\r
-\r
- // if too much links i delete the farest ones\r
- while (symphony.longRangeLinksOutgoing.size() > k) {\r
- Node distantNode = Collections.max(symphony.longRangeLinksOutgoing, new SymphonyNodeComparator(symphonyID, node));\r
- symphony.longRangeLinksOutgoing.remove(distantNode);\r
-\r
- // Communicate to the outgoing node that it ins't anymore one of my long range links\r
- Message disconnectMsg = new Message(null, node, MessageType.DISCONNECT_LONG_RANGE_LINK);\r
- transport.send(node, distantNode, disconnectMsg, pid);\r
- }\r
-\r
- // I can search Long Range Links only if i'm into the ring and i'm able to do routing\r
- if (symphony.isBootstrapped()) {\r
- // if only few i try again, untill attempts times, to add new ones\r
- int difference = k - symphony.longRangeLinksOutgoing.size();\r
- currentAttempts = attempts;\r
- for (int i = 0; i < difference; i++) {\r
- sendLongRangeLinkRequest(symphony, node);\r
- }\r
- }\r
- }\r
- private static final int MAX_ANTILOOP_COUNT_MANAGER_MYSELF = 5;\r
- private int antiLoopManagerMySelf = 0;\r
-\r
- private void sendLongRangeLinkRequest(final SymphonyProtocol symphony, final Node node) {\r
- boolean routingOk;\r
- do {\r
- double distance = Math.exp((Math.log(n) / Math.log(2)) * (CommonState.r.nextDouble() - 1.0)); // Harmonic Distribution\r
- double targetIdentifier = (symphony.getIdentifier() + distance) % 1;\r
- try {\r
-\r
- symphony.route(node, targetIdentifier, new Handler() {\r
-\r
- public void handle(SymphonyProtocol src, Tuple<Node, Double> tuple) {\r
-\r
- if (tuple == null) {\r
- Logger.getLogger(SymphonyNetworkManager.class.getName()).log(Level.SEVERE, "FAIL ROUTE SENDLONGRANGELINKREQUEST");\r
- return;\r
- }\r
-\r
- Collection<Node> allShortLinks = new LinkedList<Node>();\r
- for (Tuple<Node, BootstrapStatus> shortTuple : symphony.leftShortRangeLinks) {\r
- allShortLinks.add(shortTuple.x);\r
- }\r
- for (Tuple<Node, BootstrapStatus> shortTuple : symphony.rightShortRangeLinks) {\r
- allShortLinks.add(shortTuple.x);\r
- }\r
-\r
- /*\r
- *\r
- * I'm myself one of my short links, special case... i try again without\r
- * reduce the attempts for a maximum of MAX_ANTILOOP_COUNT_MANAGER_MYSELF\r
- * times after that i start again to reduce the attempts\r
- */\r
- if (tuple.x.equals(node) || allShortLinks.contains(tuple.x)) {\r
-\r
- if (antiLoopManagerMySelf < MAX_ANTILOOP_COUNT_MANAGER_MYSELF) {\r
-\r
- antiLoopManagerMySelf++;\r
- sendLongRangeLinkRequest(symphony, node);\r
- } else {\r
- antiLoopManagerMySelf = 0;\r
- currentAttempts--;\r
- }\r
- } else {\r
-\r
- boolean alreadyAdded = symphony.longRangeLinksOutgoing.contains(tuple.x);\r
- /*\r
- *\r
- * OPINABLE: DESCREASE ATTEMPTS ONLY FOR REJECT? If yes i have to manage\r
- * the possible loop (nodi exhaurited so already all added)\r
- */\r
- if (alreadyAdded && currentAttempts > 0) {\r
- currentAttempts--;\r
- sendLongRangeLinkRequest(symphony, node);\r
- } else if (!alreadyAdded) {\r
- Message msg = new Message(null, node, MessageType.REQUEST_LONG_RANGE_LINK);\r
- Transport transport = (Transport) node.getProtocol(transportID);\r
- transport.send(node, tuple.x, msg, pid);\r
- }\r
- }\r
- }\r
- });\r
- routingOk = true;\r
- } catch (RoutingException ex) {\r
- routingOk = false;\r
- }\r
- } while (!routingOk);\r
- }\r
-\r
- public void leave(Node node) {\r
- SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
-\r
- if (symphony.loggedIntoNetwork != BootstrapStatus.OFFLINE) {\r
- Transport transport = (Transport) node.getProtocol(transportID);\r
-\r
- symphony.loggedIntoNetwork = BootstrapStatus.OFFLINE;\r
-\r
- // Communicate that i'm leaving to the outgoing (that i point to) nodes\r
- for (Node outgoingNode : symphony.longRangeLinksOutgoing) {\r
- Message disconnectMsg = new Message(null, node, MessageType.DISCONNECT_LONG_RANGE_LINK);\r
- transport.send(node, outgoingNode, disconnectMsg, pid);\r
- }\r
-\r
- // Communicate that i'm leaving to the incoming (that they point to me) nodes\r
- for (Node incomingNode : symphony.longRangeLinksIncoming) {\r
- Message unavailableMsg = new Message(null, node, MessageType.UNAVAILABLE_LONG_RANGE_LINK);\r
- transport.send(node, incomingNode, unavailableMsg, pid);\r
- }\r
-\r
- // Communicate to my neighbours (short range links) that i'm leaving and i send to them the near neighbours\r
- for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {\r
- Message leaveMsg = new Message(symphony.rightShortRangeLinks.clone(), node, MessageType.LEAVE);\r
- transport.send(node, leftTuple.x, leaveMsg, pid);\r
- }\r
-\r
- for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {\r
- Message leaveMsg = new Message(symphony.leftShortRangeLinks.clone(), node, MessageType.LEAVE);\r
- transport.send(node, rightTuple.x, leaveMsg, pid);\r
- }\r
-\r
- node.setFailState(Fallible.DEAD);\r
- }\r
- }\r
-\r
- public void processEvent(Node node, int pid, Object event) {\r
-\r
- Message msg = (Message) event;\r
-\r
- SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
- Transport transport = (Transport) node.getProtocol(transportID);\r
-\r
- Collection<Tuple<Node, BootstrapStatus>> collection = null;\r
- switch (msg.getType()) {\r
- case JOIN:\r
- // I send my current neighbours to the entering node\r
- collection = (Collection<Tuple<Node, BootstrapStatus>>) symphony.leftShortRangeLinks.clone();\r
- collection.addAll((Collection<Tuple<Node, BootstrapStatus>>) symphony.rightShortRangeLinks.clone());\r
- Message responseMsg = new Message(collection, node, MessageType.JOIN_RESPONSE);\r
- transport.send(node, msg.getSourceNode(), responseMsg, pid);\r
-\r
- /*\r
- * Update my neighbours list, adding the new one (for sure it is entering in the\r
- * left side)\r
- *\r
- * Put to "ONLINE_AND_ALL_NEIGHBOURS_OFFLINE" because maybe the bootstrap phase is\r
- * not terminated yet (ashyncronous communication)\r
- */\r
- symphony.leftShortRangeLinks.add(new Tuple<Node, BootstrapStatus>(msg.getSourceNode(), BootstrapStatus.ONLINE_AND_ALL_NEIGHBOURS_OFFLINE));\r
-\r
-\r
- fixNeighbours(node, symphony.leftShortRangeLinks);\r
- break;\r
- case JOIN_RESPONSE:\r
-\r
- Collection<Tuple<Node, BootstrapStatus>> tupleCollection = (Collection<Tuple<Node, BootstrapStatus>>) msg.getBody();\r
-\r
- /*\r
- *\r
- * My manager is a right neighbour. The manager is already inside the ring, boostrap\r
- * obliviously ok\r
- */\r
- symphony.rightShortRangeLinks.add(new Tuple<Node, BootstrapStatus>(msg.getSourceNode(), BootstrapStatus.ONLINE));\r
-\r
- // Set my neighbours in the correct position\r
- for (Tuple<Node, BootstrapStatus> tuple : tupleCollection) {\r
- if (SymphonyProtocol.isLeftNeighbour(node, tuple.x)) {\r
- symphony.leftShortRangeLinks.add(tuple);\r
- } else {\r
- symphony.rightShortRangeLinks.add(tuple);\r
- }\r
- }\r
-\r
- fixNeighbours(node, symphony.leftShortRangeLinks);\r
- fixNeighbours(node, symphony.rightShortRangeLinks);\r
-\r
- // Update bootstrap status\r
- checkBootstrapStatus(node);\r
-\r
- // I send the refresh command such a way to exchange the views\r
- refreshNeighbours(node);\r
-\r
- // Update Long Range Links, because it's at the beginning is the same as adding k\r
- updateLongRangeLinks(node);\r
- break;\r
- case UPDATE_NEIGHBOURS:\r
-\r
- Collection<Tuple<Node, BootstrapStatus>> collectionCloned = ((Collection<Tuple<Node, BootstrapStatus>>) symphony.leftShortRangeLinks.clone());\r
- collectionCloned.addAll(((Collection<Tuple<Node, BootstrapStatus>>) symphony.rightShortRangeLinks.clone()));\r
-\r
- // Send my neighbours such a way it can also update itself\r
- Message responseUpdateMsg = new Message(collectionCloned, node, MessageType.UPDATE_NEIGHBOURS_RESPONSE);\r
- transport.send(node, msg.getSourceNode(), responseUpdateMsg, pid);\r
-\r
- // Update my view with the new node\r
- Tuple<Node, BootstrapStatus> neighbourTuple = new Tuple<Node, BootstrapStatus>(msg.getSourceNode(), (BootstrapStatus) msg.getBody());\r
- if (SymphonyProtocol.isLeftNeighbour(node, msg.getSourceNode())) {\r
- collection = symphony.leftShortRangeLinks;\r
- } else {\r
- collection = symphony.rightShortRangeLinks;\r
- }\r
- collection.add(neighbourTuple);\r
-\r
- fixNeighbours(node, collection);\r
- fixLookAheadMap(node);\r
- break;\r
- case UPDATE_NEIGHBOURS_RESPONSE:\r
-\r
- Collection<Tuple<Node, BootstrapStatus>> responseCollection = (Collection<Tuple<Node, BootstrapStatus>>) msg.getBody();\r
-\r
- for (Tuple<Node, BootstrapStatus> neighbourResponseTuple : responseCollection) {\r
- if (SymphonyProtocol.isLeftNeighbour(node, neighbourResponseTuple.x)) {\r
- collection = symphony.leftShortRangeLinks;\r
- } else {\r
- collection = symphony.rightShortRangeLinks;\r
- }\r
- collection.add(neighbourResponseTuple);\r
- }\r
-\r
- // Fix the neighbours number to the maximum allow and maybe remove myself from the list\r
- fixNeighbours(node, symphony.leftShortRangeLinks);\r
- fixNeighbours(node, symphony.rightShortRangeLinks);\r
- fixLookAheadMap(node);\r
- break;\r
- case UPDATE_STATUS:\r
- case UPDATE_STATUS_RESPONSE:\r
-\r
- Node updNode = msg.getSourceNode();\r
- BootstrapStatus updStatus = (BootstrapStatus) msg.getBody();\r
-\r
- // I search the neighbour and i update its status\r
- boolean founded = false;\r
-\r
- // Try to see if it is on the left\r
- for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {\r
- if (leftTuple.x.equals(updNode)) {\r
- symphony.leftShortRangeLinks.remove(leftTuple);\r
- symphony.leftShortRangeLinks.add(new Tuple<Node, BootstrapStatus>(updNode, updStatus));\r
-\r
- founded = true;\r
- break;\r
- }\r
- }\r
-\r
- // If it isn't on the left i try with the neighbours on the right\r
- if (!founded) {\r
- for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {\r
- if (rightTuple.x.equals(updNode)) {\r
- symphony.rightShortRangeLinks.remove(rightTuple);\r
- symphony.rightShortRangeLinks.add(new Tuple<Node, BootstrapStatus>(updNode, updStatus));\r
-\r
- break;\r
- }\r
- }\r
-\r
- fixNeighbours(node, symphony.rightShortRangeLinks);\r
- } else {\r
- fixNeighbours(node, symphony.leftShortRangeLinks);\r
- }\r
-\r
- checkBootstrapStatusAndAlert(node);\r
-\r
- if (msg.getType() == MessageType.UPDATE_STATUS) {\r
- Message responseUpdStatus = new Message(symphony.loggedIntoNetwork, node, MessageType.UPDATE_STATUS_RESPONSE);\r
- transport.send(node, updNode, responseUpdStatus, pid);\r
- }\r
-\r
- break;\r
- case REQUEST_LONG_RANGE_LINK:\r
- MessageType responseType = MessageType.REJECT_LONG_RANGE_LINK;\r
- if (symphony.longRangeLinksIncoming.size() < (2 * k)) {\r
- boolean added = symphony.longRangeLinksIncoming.add(msg.getSourceNode());\r
- if (added) {\r
- responseType = MessageType.ACCEPTED_LONG_RANGE_LINK;\r
- }\r
- }\r
- Message responseLongLinkMsg = new Message(null, node, responseType);\r
- transport.send(node, msg.getSourceNode(), responseLongLinkMsg, pid);\r
- break;\r
- case ACCEPTED_LONG_RANGE_LINK:\r
- nLink = n;\r
- symphony.longRangeLinksOutgoing.add(msg.getSourceNode());\r
- break;\r
- case REJECT_LONG_RANGE_LINK:\r
- if (currentAttempts > 0) {\r
- currentAttempts--;\r
- sendLongRangeLinkRequest(symphony, node);\r
- }\r
- break;\r
- case DISCONNECT_LONG_RANGE_LINK:\r
- symphony.longRangeLinksIncoming.remove(msg.getSourceNode());\r
- symphony.lookAheadMap.put(msg.getSourceNode(), null);\r
- break;\r
- case UNAVAILABLE_LONG_RANGE_LINK:\r
- symphony.longRangeLinksOutgoing.remove(msg.getSourceNode());\r
- symphony.lookAheadMap.put(msg.getSourceNode(), null);\r
- break;\r
- case LEAVE:\r
- Tuple<Node, BootstrapStatus> foundedTuple = null;\r
-\r
- // Verify if the node that is leaving is a left neighbour\r
- for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {\r
- if (leftTuple.x.equals(msg.getSourceNode())) {\r
- collection = symphony.leftShortRangeLinks;\r
- foundedTuple = leftTuple;\r
- break;\r
- }\r
- }\r
-\r
- // Verify if the node that is leaving is a right neighbour\r
- if (collection == null) {\r
- for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {\r
- if (rightTuple.x.equals(msg.getSourceNode())) {\r
- collection = symphony.rightShortRangeLinks;\r
- foundedTuple = rightTuple;\r
- break;\r
- }\r
- }\r
- }\r
-\r
- // if i've found the neighbour i remove it and i add to myself its neighbours\r
- if (collection != null) {\r
- collection.addAll((Collection<Tuple<Node, BootstrapStatus>>) msg.getBody());\r
- collection.remove(foundedTuple);\r
- fixNeighbours(node, collection);\r
-\r
- // Update status and ready to send an alert in case i'm out of the ring\r
- checkBootstrapStatusAndAlert(node);\r
- }\r
- break;\r
- case KEEP_ALIVE:\r
- Set<Double>[] lookAheadSetArray = new LinkedHashSet[2];\r
-\r
- /*\r
- * Check if the contacting node is doing lookAhead and in case of affirmative answer\r
- * i provide to it the long range link identifiers (according to my routing mode)\r
- */\r
- if ((Boolean) msg.getBody()) {\r
- int i = 0;\r
- Iterable[] iterableArray;\r
- if (symphony.bidirectionalRouting) {\r
- iterableArray = new Iterable[]{symphony.longRangeLinksOutgoing, symphony.longRangeLinksIncoming};\r
- } else {\r
- iterableArray = new Iterable[]{symphony.longRangeLinksOutgoing};\r
- }\r
-\r
- for (Iterable<Node> iterable : iterableArray) {\r
- lookAheadSetArray[i] = new LinkedHashSet<Double>();\r
- Set<Double> lookAheadSet = lookAheadSetArray[i];\r
- Iterator<Node> it = iterable.iterator();\r
- while (it.hasNext()) {\r
- Node longLinkNode = it.next();\r
- lookAheadSet.add(((SymphonyProtocol) longLinkNode.getProtocol(symphonyID)).getIdentifier());\r
- }\r
- i++;\r
- }\r
- }\r
-\r
- transport.send(node, msg.getSourceNode(), new Message(lookAheadSetArray, node, MessageType.KEEP_ALIVE_RESPONSE), pid);\r
- break;\r
- case KEEP_ALIVE_RESPONSE:\r
- // Reset the counter to 0\r
- keepAliveMap.put(msg.getSourceNode(), 0);\r
-\r
- if (symphony.lookAhead) {\r
- symphony.lookAheadMap.put(msg.getSourceNode(), (Set<Double>[]) msg.getBody());\r
- }\r
-\r
- break;\r
- }\r
- }\r
-\r
- /**\r
- *\r
- * Update the status and communicate immediately to the neighbours if the node is gone out from\r
- * the ring (and before it was inside)\r
- *\r
- * @param node\r
- */\r
- private void checkBootstrapStatusAndAlert(Node node) {\r
- SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
- BootstrapStatus beforeStatus = symphony.loggedIntoNetwork;\r
-\r
- checkBootstrapStatus(node);\r
-\r
- // Instead of waiting that the update happens periodically i do it now because i'm out of the ring and before i wasn't\r
- if (symphony.loggedIntoNetwork != beforeStatus && !symphony.isBootstrapped()) {\r
- updateBootstrapStatusNeighbours(node, true);\r
- }\r
- }\r
-\r
- private void fixNeighbours(Node node, Collection<Tuple<Node, BootstrapStatus>> neighbours) {\r
-\r
- SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
-\r
- // Remove duplicates, remove that ones that are in an obsolete status\r
- Collection<Tuple<Node, BootstrapStatus>> removedNeighbours = new LinkedHashSet<Tuple<Node, BootstrapStatus>>();\r
- for (Tuple<Node, BootstrapStatus> tuple : neighbours) {\r
-\r
- // Remove myself from the neighbours list\r
- if (tuple.x.equals(node)) {\r
- removedNeighbours.add(tuple);\r
- continue;\r
- }\r
-\r
- EnumSet<BootstrapStatus> status = EnumSet.allOf(BootstrapStatus.class);\r
- status.remove(tuple.y);\r
-\r
- for (BootstrapStatus opposite : status) {\r
- Tuple<Node, BootstrapStatus> oppositeNeighbour = new Tuple<Node, BootstrapStatus>(tuple.x, opposite);\r
- if (neighbours.contains(oppositeNeighbour)) {\r
- if (tuple.y != BootstrapStatus.ONLINE) {\r
- removedNeighbours.add(new Tuple<Node, BootstrapStatus>(tuple.x, BootstrapStatus.OFFLINE));\r
- if (opposite == BootstrapStatus.ONLINE) {\r
- removedNeighbours.add(new Tuple<Node, BootstrapStatus>(tuple.x, BootstrapStatus.ONLINE_AND_ALL_NEIGHBOURS_OFFLINE));\r
- }\r
- }\r
- }\r
- }\r
- }\r
- neighbours.removeAll(removedNeighbours);\r
-\r
- /*\r
- *\r
- * I count the neighbours that are in the ONLINE status but before i remove the ones that\r
- * are gone in timeout during the keep-alive procedure because can be someone that is old\r
- * but not remove from the exchanging views (UPDATE_NEIGHBOURS) procedure and are not\r
- * effectively online. To do anyway the possibility to the node to join again i decrease its\r
- * timeout value. This only if the node is ONLINE and so i'm really interested that it is ok\r
- * for the routing.\r
- *\r
- */\r
- int onlineNeighbours = 0;\r
- for (Tuple<Node, BootstrapStatus> tuple : neighbours) {\r
-\r
- Integer value = keepAliveMap.get(tuple.x);\r
- if (value != null && value >= nTimeout && tuple.y == BootstrapStatus.ONLINE) {\r
- keepAliveMap.put(tuple.x, value - 1);\r
- removedNeighbours.add(tuple);\r
- } else {\r
-\r
- if (tuple.y == BootstrapStatus.ONLINE) {\r
- onlineNeighbours++;\r
- }\r
- }\r
- }\r
- neighbours.removeAll(removedNeighbours);\r
-\r
- // Fix the neighbours number to the maximum allowed\r
- SymphonyNodeComparator comparator = new SymphonyNodeComparator(symphonyID, node);\r
- AdapterSymphonyNodeComparator adapterComparator = new AdapterSymphonyNodeComparator(comparator);\r
- while (neighbours.size() > symphony.numberShortRangeLinksPerSide) {\r
- Tuple<Node, BootstrapStatus> distantTuple = Collections.max(neighbours, adapterComparator);\r
-\r
- // Mantain the link with the ring\r
- if (distantTuple.y == BootstrapStatus.ONLINE) {\r
- if (onlineNeighbours > 1) {\r
- neighbours.remove(distantTuple);\r
- onlineNeighbours--;\r
- } else {\r
- /*\r
- * If will be only one neighbour that is online i save it and i'm going to\r
- * eliminate another one (for sure it'll be not online)\r
- *\r
- */\r
- Tuple<Node, BootstrapStatus> backupOnlineNeighbour = distantTuple;\r
- neighbours.remove(backupOnlineNeighbour);\r
- distantTuple = Collections.max(neighbours, adapterComparator);\r
- neighbours.add(backupOnlineNeighbour);\r
- neighbours.remove(distantTuple);\r
- }\r
-\r
- } else {\r
- neighbours.remove(distantTuple);\r
- }\r
- }\r
- }\r
-\r
- @Override\r
- public Object clone() {\r
- SymphonyNetworkManager dolly = new SymphonyNetworkManager(prefix);\r
- return dolly;\r
- }\r
-\r
- public void nextCycle(Node node, int protocolID) {\r
-\r
- if (node.isUp()) {\r
-\r
- // Update the estimated network size\r
- updateN(node);\r
-\r
- // Update the estimated K\r
- updateK(node);\r
-\r
- // Update the bootstrap status of my neighbours that were joining the ring\r
- updateBootstrapStatusNeighbours(node, false);\r
-\r
- // Refresh the neighbours views\r
- refreshNeighbours(node);\r
-\r
- // I send and check the connection status of the neighbours\r
- keepAlive(node);\r
-\r
- // Update the bootstrap status\r
- checkBootstrapStatus(node);\r
-\r
- // If it's active i check the Relinking criteria\r
- if (relinkingProtocolActivated) {\r
- reLinkingProtocol(node);\r
- }\r
-\r
- // Update the long range links (conservative)\r
- updateLongRangeLinks(node);\r
- }\r
- }\r
-\r
- /**\r
- *\r
- * @param allNeighbours true, communicate/receive the status update from all the neighbours.\r
- * false, communicate/receive the status update only from the neighbours that are NOT ONLINE\r
- *\r
- */\r
- private void updateBootstrapStatusNeighbours(Node node, boolean allNeighbours) {\r
- SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
- Transport transport = (Transport) node.getProtocol(transportID);\r
-\r
- Collection<Tuple<Node, BootstrapStatus>> collection = new LinkedHashSet<Tuple<Node, BootstrapStatus>>();\r
- collection.addAll(symphony.leftShortRangeLinks);\r
- collection.addAll(symphony.rightShortRangeLinks);\r
-\r
- for (Tuple<Node, BootstrapStatus> neighbourTuple : collection) {\r
- if (allNeighbours || neighbourTuple.y != BootstrapStatus.ONLINE) {\r
- Message msg = new Message(symphony.loggedIntoNetwork, node, MessageType.UPDATE_STATUS);\r
- transport.send(node, neighbourTuple.x, msg, pid);\r
- }\r
- }\r
- }\r
-\r
- private void updateN(Node node) {\r
- NetworkSizeEstimatorProtocolInterface networkEstimator = (NetworkSizeEstimatorProtocolInterface) node.getProtocol(networkEstimatorID);\r
- n = networkEstimator.getNetworkSize(node);\r
- if (n <= 0) {\r
- n = DEFAULT_N;\r
- }\r
- }\r
-\r
- /**\r
- * Update the K value with the current expectation of the network size\r
- */\r
- private void updateK(Node node) {\r
-\r
- SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
- if (!symphony.fixedLongRangeLinks) {\r
- k = (int) Math.ceil(Math.log(n) / Math.log(2));\r
-\r
- if (k <= 0) {\r
- k = DEFAULT_K;\r
- }\r
- } else {\r
- k = symphony.numberFixedLongRangeLinks;\r
- }\r
- }\r
-\r
- private void refreshNeighbours(Node node) {\r
- SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
- Transport transport = (Transport) node.getProtocol(transportID);\r
-\r
- for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {\r
- Node leftNode = leftTuple.x;\r
- Message updateNeighbourMsg = new Message(symphony.loggedIntoNetwork, node, MessageType.UPDATE_NEIGHBOURS);\r
- transport.send(node, leftNode, updateNeighbourMsg, pid);\r
- }\r
-\r
- for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {\r
- Node rightNode = rightTuple.x;\r
- Message updateNeighbourMsg = new Message(symphony.loggedIntoNetwork, node, MessageType.UPDATE_NEIGHBOURS);\r
- transport.send(node, rightNode, updateNeighbourMsg, pid);\r
- }\r
- }\r
-\r
- /**\r
- * Method to update the (connection) status of the node. Perform the update to the "up" so:\r
- * OFFLINE -> ONLINE_AND_ALL_NEIGHBOURS_OFFLINE -> ONLINE\r
- *\r
- * and to the "down" only: ONLINE -> ONLINE_AND_ALL_NEIGHBOURS_OFFLINE\r
- *\r
- * @param node\r
- */\r
- private void checkBootstrapStatus(Node node) {\r
- SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
-\r
- if (symphony.loggedIntoNetwork != BootstrapStatus.OFFLINE) {\r
-\r
- symphony.loggedIntoNetwork = BootstrapStatus.ONLINE_AND_ALL_NEIGHBOURS_OFFLINE;\r
-\r
- // Check if i'm inside the ring and i'm able to do routing\r
- if (!symphony.leftShortRangeLinks.isEmpty() && !symphony.rightShortRangeLinks.isEmpty()) {\r
-\r
- boolean leftOk = false;\r
- for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {\r
- if (leftTuple.y == BootstrapStatus.ONLINE) {\r
- leftOk = true;\r
- break;\r
- }\r
- }\r
-\r
- if (leftOk) {\r
- for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {\r
- if (rightTuple.y == BootstrapStatus.ONLINE) {\r
- symphony.loggedIntoNetwork = BootstrapStatus.ONLINE;\r
- break;\r
- }\r
- }\r
- }\r
- }\r
- }\r
- }\r
-\r
- /**\r
- * Remove the possible wrong entries from the lookAhead table\r
- */\r
- private void fixLookAheadMap(Node node) {\r
- SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
- for (Tuple<Node, BootstrapStatus> tuple : symphony.leftShortRangeLinks) {\r
- symphony.lookAheadMap.put(tuple.x, null);\r
- }\r
- for (Tuple<Node, BootstrapStatus> tuple : symphony.rightShortRangeLinks) {\r
- symphony.lookAheadMap.put(tuple.x, null);\r
- }\r
- }\r
-\r
- /**\r
- * Sent keep-alive messages to verify if the links still online\r
- *\r
- * if enable the lookAhead mode i require the neighbours list from my neighbours (1-lookAhead).\r
- *\r
- * Note: I don't reuse the UPDATE_STATUS messages because i want to mantain separate the\r
- * semantic and have more clear source code\r
- */\r
- private void keepAlive(Node node) {\r
-\r
- SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
- Transport transport = (Transport) node.getProtocol(transportID);\r
-\r
- // Send and check for the long range links (both incoming and outgoing)\r
- for (Iterable<Node> iterable : new Iterable[]{symphony.longRangeLinksOutgoing, symphony.longRangeLinksIncoming}) {\r
- Iterator<Node> longLinksIterator = iterable.iterator();\r
- while (longLinksIterator.hasNext()) {\r
- Node longLinkNode = longLinksIterator.next();\r
- Integer value = keepAliveMap.get(longLinkNode);\r
- if (value == null) {\r
- value = 0;\r
- }\r
-\r
- /*\r
- * Verify if i reached the sufficient time of sending and not receiving an answer\r
- * and so i can consider that node as disconnected\r
- */\r
- if (value >= nTimeout) {\r
- symphony.lookAheadMap.put(longLinkNode, null); // Do it anyway if it's enabled the lookAhead or not\r
- longLinksIterator.remove();\r
- } else {\r
- keepAliveMap.put(longLinkNode, value + 1);\r
-\r
- Message keepAliveMsg = new Message(symphony.lookAhead, node, MessageType.KEEP_ALIVE);\r
- transport.send(node, longLinkNode, keepAliveMsg, pid);\r
- }\r
- }\r
- }\r
-\r
- // Send and check for the short links\r
- for (Iterable<Tuple<Node, BootstrapStatus>> iterable : new Iterable[]{symphony.rightShortRangeLinks, symphony.leftShortRangeLinks}) {\r
- Iterator<Tuple<Node, BootstrapStatus>> shortLinksIterator = iterable.iterator();\r
- while (shortLinksIterator.hasNext()) {\r
- Node shortLinkNode = shortLinksIterator.next().x;\r
- Integer value = keepAliveMap.get(shortLinkNode);\r
- if (value == null) {\r
- value = 0;\r
- }\r
-\r
- // the same as above\r
- if (value >= nTimeout) {\r
- shortLinksIterator.remove();\r
- } else {\r
- keepAliveMap.put(shortLinkNode, value + 1);\r
-\r
- // LookAhead is not to be performed to the short links!\r
- Message keepAliveMsg = new Message(false, node, MessageType.KEEP_ALIVE);\r
- transport.send(node, shortLinkNode, keepAliveMsg, pid);\r
- }\r
- }\r
- }\r
- }\r
-\r
- /**\r
- * Implement the Re-Linking criteria of the Long Range Links. It does the complete refresh. The\r
- * repopulation is done through the 'updateLongRangeLinks' method.\r
- */\r
- private void reLinkingProtocol(Node node) {\r
- // I do the check only if i succeed at least one time to create a long range link\r
- if (nLink > 0) {\r
- double criterionValue = n / nLink;\r
-\r
- if (!(criterionValue >= relinkingLowerBound && criterionValue <= relinkingUpperBound)) {\r
-\r
- /*\r
- * Not explicitly precised in the paper: if i haven't created a new link i update\r
- * anyway nLink because can happen a special case that i will not be able to create\r
- * links because the reLinkingProtocol procedure is "faster".\r
- */\r
- nLink = n;\r
-\r
- SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);\r
- Transport transport = (Transport) node.getProtocol(transportID);\r
-\r
- // Communicate to the all Outgoing Long Range Links that they aren't anymore\r
- for (Node longRangeLinkOutgoingNode : symphony.longRangeLinksOutgoing) {\r
- Message disconnectMsg = new Message(null, node, MessageType.DISCONNECT_LONG_RANGE_LINK);\r
- transport.send(node, longRangeLinkOutgoingNode, disconnectMsg, pid);\r
- }\r
-\r
- symphony.longRangeLinksOutgoing.clear();\r
- }\r
- }\r
- }\r
-\r
- public int getK() {\r
- return k;\r
- }\r
-\r
- public int getN() {\r
- return n;\r
- }\r
-}\r