--- /dev/null
+package example.symphony;\r
+\r
+import java.lang.ref.SoftReference;\r
+import java.util.*;\r
+import java.util.logging.Level;\r
+import java.util.logging.Logger;\r
+\r
+import example.symphony.Message.MessageType;\r
+import peersim.config.Configuration;\r
+import peersim.core.CommonState;\r
+import peersim.core.Node;\r
+import peersim.edsim.EDProtocol;\r
+import peersim.transport.Transport;\r
+\r
+/**\r
+ *\r
+ * @author Andrea Esposito <and1989@gmail.com>\r
+ */\r
+public class SymphonyProtocol implements EDProtocol {\r
+\r
+ private static final String PAR_SHORT_LINK = "shortlink";\r
+ private static final String PAR_LONG_LINK = "longlink";\r
+ private static final String PAR_TRANSP = "transport";\r
+ private static final String PAR_ROUTING = "routing";\r
+ private static final String PAR_LOOKAHEAD = "lookahead";\r
+ private static Set<Double> allIdentifier = new HashSet<Double>();\r
+ private final String prefix;\r
+ private static int pid;\r
+ private final int transportID;\r
+ private final double identifier;\r
+ public final int sequentialIdentifier;\r
+ private static int sequentialCounter = 0;\r
+ public final int numberShortRangeLinksPerSide;\r
+ public final boolean bidirectionalRouting;\r
+ public final boolean lookAhead;\r
+ public final boolean fixedLongRangeLinks;\r
+ public final int numberFixedLongRangeLinks;\r
+ public LinkedHashSet<Node> longRangeLinksOutgoing;\r
+ public LinkedHashSet<Node> longRangeLinksIncoming;\r
+ public LinkedHashSet<Tuple<Node, BootstrapStatus>> rightShortRangeLinks;\r
+ public LinkedHashSet<Tuple<Node, BootstrapStatus>> leftShortRangeLinks;\r
+ /**\r
+ * Array Contract: at position 0 -> OutgoingLongRangeLinks, 1 -> IncomingLongRangeLinks\r
+ */\r
+ public final LinkedHashMap<Node, Set<Double>[]> lookAheadMap;\r
+ private HashMap<Double, Handler> mapHandler;\r
+ /**\r
+ * IDs Set to verify if there are cycles\r
+ */\r
+ private Set<Long> messageHistoryID;\r
+ /**\r
+ *\r
+ * Tuple chronology that contains: <received message, the possible answer message>\r
+ *\r
+ * I use SoftReference as a trade off between memory usage and accurancy\r
+ */\r
+ private Set<SoftReference<Tuple<Message, Message>>> messageHistory;\r
+ private static boolean firstPrintConfig = true;\r
+\r
+ public enum BootstrapStatus {\r
+\r
+ NEW, OFFLINE, ONLINE_AND_ALL_NEIGHBOURS_OFFLINE, ONLINE\r
+ }\r
+ public BootstrapStatus loggedIntoNetwork;\r
+\r
+ public SymphonyProtocol(String prefix) {\r
+\r
+ this.prefix = prefix;\r
+ pid = Configuration.lookupPid(prefix.replaceAll("protocol.", ""));\r
+ transportID = Configuration.getPid(prefix + "." + PAR_TRANSP);\r
+ numberShortRangeLinksPerSide = Configuration.getInt(prefix + "." + PAR_SHORT_LINK, 2) / 2;\r
+ bidirectionalRouting = !Configuration.getString(prefix + "." + PAR_ROUTING, "bidirectional").toLowerCase().equals("unidirectional");\r
+ lookAhead = !Configuration.getString(prefix + "." + PAR_LOOKAHEAD, "on").toLowerCase().equals("off");\r
+ numberFixedLongRangeLinks = Configuration.getInt(prefix + "." + PAR_LONG_LINK, -1);\r
+ fixedLongRangeLinks = numberFixedLongRangeLinks >= 0;\r
+\r
+ longRangeLinksOutgoing = new LinkedHashSet<Node>();\r
+ longRangeLinksIncoming = new LinkedHashSet<Node>();\r
+ rightShortRangeLinks = new LinkedHashSet<Tuple<Node, BootstrapStatus>>();\r
+ leftShortRangeLinks = new LinkedHashSet<Tuple<Node, BootstrapStatus>>();\r
+ lookAheadMap = new LinkedHashMap<Node, Set<Double>[]>();\r
+\r
+ identifier = generateUniqueIdentifier();\r
+ sequentialIdentifier = sequentialCounter++;\r
+\r
+ mapHandler = new HashMap<Double, Handler>();\r
+\r
+ messageHistoryID = new HashSet<Long>();\r
+ messageHistory = new LinkedHashSet<SoftReference<Tuple<Message, Message>>>();\r
+ loggedIntoNetwork = BootstrapStatus.NEW;\r
+\r
+ printConfig();\r
+ }\r
+\r
+ private void printConfig() {\r
+\r
+ if (firstPrintConfig) {\r
+ firstPrintConfig = false;\r
+ System.out.println(SymphonyProtocol.class.getSimpleName() + " Configuration:");\r
+ System.out.println("- Number of short range links per side: " + numberShortRangeLinksPerSide);\r
+ System.out.println("- Number of long range links: " + (fixedLongRangeLinks ? numberFixedLongRangeLinks : "log(n)"));\r
+ System.out.println("- Routing mode: " + (bidirectionalRouting ? "Bidirectional" : "Unidirectional"));\r
+ System.out.println("- LookAhead status: " + (lookAhead ? "ON" : "OFF"));\r
+ System.out.println("-------------------------------\n");\r
+ }\r
+ }\r
+\r
+ /**\r
+ *\r
+ * Method to identify the next node that has to be contacted. It's going to be used the mode\r
+ * that is described into the configuration file\r
+ */\r
+ public Node getCandidateForRouting(double identifierToRoute) throws RoutingException {\r
+ if (bidirectionalRouting) {\r
+ return getCandidateForBidirectionalRoute(identifierToRoute);\r
+ } else {\r
+ return getCandidateForUnidirectionalRoute(identifierToRoute);\r
+ }\r
+ }\r
+\r
+ /**\r
+ *\r
+ * Method to individuate the next node that as to be contacted through Unidirectional Routing\r
+ * mode\r
+ */\r
+ public Node getCandidateForUnidirectionalRoute(double identifierToRoute) throws RoutingException {\r
+\r
+ LinkedHashSet<Node> allLinks = new LinkedHashSet<Node>();\r
+ Node manager = putShortRangeLinksIntoContainerForRouting(allLinks, identifierToRoute);\r
+\r
+ if (manager != null) {\r
+ return manager;\r
+ }\r
+\r
+ allLinks.addAll(longRangeLinksOutgoing);\r
+\r
+ return findClosestNode(identifierToRoute, allLinks, true);\r
+ }\r
+\r
+ /**\r
+ * Method to individuate the next node that as to be contacted through Bidirectional Routing\r
+ * mode\r
+ */\r
+ public Node getCandidateForBidirectionalRoute(double identifierToRoute) throws RoutingException {\r
+\r
+ LinkedHashSet<Node> allLinks = new LinkedHashSet<Node>();\r
+ Node manager = putShortRangeLinksIntoContainerForRouting(allLinks, identifierToRoute);\r
+\r
+ if (manager != null) {\r
+ return manager;\r
+ }\r
+\r
+ allLinks.addAll(longRangeLinksOutgoing);\r
+ allLinks.addAll(longRangeLinksIncoming);\r
+\r
+ return findClosestNode(identifierToRoute, allLinks, false);\r
+ }\r
+\r
+ /**\r
+ * @return Null if it is NOT found the manager. Node if it is found.\r
+ */\r
+ private Node putShortRangeLinksIntoContainerForRouting(Set<Node> container, double identifierToRoute) {\r
+ for (Tuple<Node, BootstrapStatus> rightTuple : rightShortRangeLinks) {\r
+ if (rightTuple.y == BootstrapStatus.ONLINE) {\r
+ container.add(rightTuple.x);\r
+ }\r
+ }\r
+\r
+ if (!container.isEmpty()) {\r
+\r
+ // Special case: i verify if the neighbour at my right (ONLINE) is the manager\r
+ SymphonyNodeComparator comparator = new SymphonyNodeComparator(pid, identifier);\r
+ Node nearRightNeighbour = Collections.min(container, comparator);\r
+ if (nearRightNeighbour != null) {\r
+ SymphonyProtocol symphony = (SymphonyProtocol) nearRightNeighbour.getProtocol(pid);\r
+ if (!isLeftNeighbour(identifier, identifierToRoute) && isLeftNeighbour(symphony.getIdentifier(), identifierToRoute)) {\r
+ return nearRightNeighbour;\r
+ }\r
+ }\r
+ }\r
+\r
+ for (Tuple<Node, BootstrapStatus> leftTuple : leftShortRangeLinks) {\r
+ if (leftTuple.y == BootstrapStatus.ONLINE) {\r
+ container.add(leftTuple.x);\r
+ }\r
+ }\r
+\r
+ return null;\r
+ }\r
+\r
+ /**\r
+ *\r
+ * Individuates effectively the next candidate for the routing. Checks if the lookahead is\r
+ * activated and in case of affirmative answer it's going to use that information.\r
+ *\r
+ * @param identifierToRoute Identifier to reach\r
+ * @param container Candidate Nodes Container\r
+ * @param clockwise true, does unidirectional routing. false, does bidirectional routing.\r
+ * @return The nearest node to reach identifierToRoute\r
+ * @throws RoutingException Throw in case no candidate is found\r
+ */\r
+ public Node findClosestNode(final double identifierToRoute, final Iterable<Node> container, final boolean clockwise) throws RoutingException {\r
+ Node ret = null;\r
+ double min = Double.MAX_VALUE;\r
+\r
+ for (Node node : container) {\r
+ SymphonyProtocol symphonyNodeContainer = (SymphonyProtocol) node.getProtocol(pid);\r
+ double realCandidateIdentifier = symphonyNodeContainer.getIdentifier();\r
+\r
+ Set<Double> candidateIdentifierSet = new LinkedHashSet<Double>();\r
+ candidateIdentifierSet.add(realCandidateIdentifier);\r
+\r
+ boolean lookAheadClockwise = true;\r
+\r
+ /*\r
+ *\r
+ * If lookahead is activated add all the reachable identifiers. No checks are performed\r
+ * on the node type (short/long) because at maximum the map return null.\r
+ */\r
+ if (lookAhead) {\r
+ Set<Double>[] lookAheadIdentifierSetArray = lookAheadMap.get(node);\r
+\r
+ if (lookAheadIdentifierSetArray != null) {\r
+ Set<Double> lookAheadIdentifierSet = lookAheadIdentifierSetArray[0];\r
+\r
+ if (lookAheadIdentifierSet == null) {\r
+ lookAheadIdentifierSet = new LinkedHashSet<Double>();\r
+ }\r
+\r
+ /*\r
+ *\r
+ * If bidirectional routing is going to be performed so i put into account also\r
+ * the Incoming Long Range Links of the current neighbour\r
+ */\r
+ if (bidirectionalRouting && lookAheadIdentifierSetArray[1] != null) {\r
+ lookAheadIdentifierSet.addAll(lookAheadIdentifierSetArray[1]);\r
+ lookAheadClockwise = false;\r
+ }\r
+\r
+ if (!lookAheadIdentifierSet.isEmpty()) {\r
+ candidateIdentifierSet.addAll(lookAheadIdentifierSet);\r
+ }\r
+ }\r
+ }\r
+\r
+ for (Double candidateIdentifier : candidateIdentifierSet) {\r
+ // if it is a my neighbour i use my routing mode instead if it is a looAhead one i use its routing mode\r
+ boolean currentClockwise = candidateIdentifier.equals(realCandidateIdentifier) ? clockwise : lookAheadClockwise;\r
+\r
+ double distance = Math.abs(candidateIdentifier - identifierToRoute);\r
+ distance = Math.min(distance, 1.0 - distance);\r
+\r
+ // if clockwise i have to exclude the case: candidateIdentifier - indentifierToRoute - identifier\r
+ if (currentClockwise) {\r
+ if (isLeftNeighbour(candidateIdentifier, identifierToRoute)) {\r
+\r
+ // Special case (0.9 - 0.1) the normal order is not more meanful to decide the side\r
+ if (identifierToRoute >= candidateIdentifier) {\r
+ distance = identifierToRoute - candidateIdentifier;\r
+ } else {\r
+ distance = (1.0 - candidateIdentifier) + identifierToRoute;\r
+ }\r
+ } else {\r
+ distance = (1.0 - (candidateIdentifier - identifierToRoute)) % 1;\r
+ }\r
+ }\r
+\r
+ /*\r
+ *\r
+ * Priority to the node that i'm directly connected and only after i use the\r
+ * lookAhead information\r
+ */\r
+ if (min >= Math.abs(distance)\r
+ && (candidateIdentifier.equals(realCandidateIdentifier)\r
+ || ret == null\r
+ || min > Math.abs(distance))) {\r
+ ret = node;\r
+ min = Math.abs(distance);\r
+ }\r
+ }\r
+ }\r
+\r
+ if (ret == null) {\r
+ throw new RoutingException("Impossible do routing. [Hit: Neighbour links (maybe) not yet online.");\r
+ }\r
+\r
+ return ret;\r
+ }\r
+\r
+ /**\r
+ *\r
+ * @param neighbourNode Neighbour Node\r
+ * @return true if the node is a left neighbour (or itself), false if it is a right one\r
+ */\r
+ public static boolean isLeftNeighbour(Node rootNode, Node neighbourNode) {\r
+ SymphonyProtocol rootSymphony = (SymphonyProtocol) rootNode.getProtocol(pid);\r
+ SymphonyProtocol neighbourSymphony = (SymphonyProtocol) neighbourNode.getProtocol(pid);\r
+\r
+ return isLeftNeighbour(rootSymphony.getIdentifier(), neighbourSymphony.getIdentifier());\r
+ }\r
+\r
+ public static boolean isLeftNeighbour(double rootIdentifier, double neighbourIdentifier) {\r
+\r
+ // I calculate putting the hypotesis that i have to translate/"normalize", after i'll check if it was useless\r
+ double traslateRootIdentifier = (rootIdentifier + 0.5) % 1;\r
+ double traslateNeighbourIdentifier = (neighbourIdentifier + 0.5) % 1;\r
+ double distance = traslateNeighbourIdentifier - traslateRootIdentifier;\r
+\r
+ // I verify if the neighbourIdentifier is over half ring, if yes i don't need to do the translation/"normalization"\r
+ if ((neighbourIdentifier + 0.5) != traslateNeighbourIdentifier) {\r
+ distance = neighbourIdentifier - rootIdentifier;\r
+ }\r
+\r
+ return distance >= 0 && distance <= 0.5;\r
+ }\r
+\r
+ public void route(Node src, double key, Handler handler) throws RoutingException {\r
+\r
+ mapHandler.put(key, handler);\r
+\r
+ Message msg = new Message(key, src, MessageType.ROUTE);\r
+\r
+ Node targetNode = src;\r
+\r
+ if (!isManagerOf(key)) {\r
+ targetNode = getCandidateForRouting(key);\r
+ Transport transport = (Transport) src.getProtocol(transportID);\r
+ transport.send(src, targetNode, msg, pid);\r
+ }\r
+\r
+ // Insert the message into the chronology\r
+ Tuple<Message, Message> historyTuple = new Tuple<Message, Message>();\r
+ try {\r
+ historyTuple.x = msg;\r
+ historyTuple.y = (Message) msg.clone();\r
+ historyTuple.y.setCurrentHop(targetNode);\r
+ } catch (CloneNotSupportedException ex) {\r
+ Logger.getLogger(SymphonyProtocol.class.getName()).log(Level.SEVERE, "Impossible to clonate the message!");\r
+ historyTuple.x = null;\r
+ historyTuple.y = msg;\r
+ msg.setCurrentHop(targetNode);\r
+ }\r
+ messageHistory.add(new SoftReference<Tuple<Message, Message>>(historyTuple));\r
+ messageHistoryID.add(msg.getID());\r
+\r
+ /*\r
+ *\r
+ * If i am the manager (brutally through the reference), i don't do the loopback routing but\r
+ * i soddisfy immediately the request\r
+ */\r
+ if (targetNode == src) {\r
+\r
+ // Uppdate the chronology\r
+ historyTuple.y = new Message(key, targetNode, MessageType.ROUTE_RESPONSE);\r
+\r
+ Tuple<Node, Double> tuple = new Tuple<Node, Double>(src, key);\r
+ mapHandler.remove(key);\r
+ handler.handle(this, tuple);\r
+ }\r
+ }\r
+\r
+ public void processEvent(Node node, int pid, Object event) {\r
+ Message msg = (Message) event;\r
+ msg.incrementHop(); // I increment the message Hop\r
+\r
+ Tuple<Message, Message> historyTuple = new Tuple<Message, Message>();\r
+ try {\r
+ // I clone the message such a way to store into the chronology the hop sender's information\r
+ historyTuple.x = (Message) msg.clone();\r
+ } catch (CloneNotSupportedException ex) {\r
+ Logger.getLogger(SymphonyProtocol.class.getName()).log(Level.SEVERE, "Impossible to clonate the message!");\r
+ historyTuple.x = msg;\r
+ }\r
+\r
+ messageHistory.add(new SoftReference<Tuple<Message, Message>>(historyTuple));\r
+\r
+ Double key;\r
+ Transport transport;\r
+ Handler handler;\r
+\r
+ // Individuate cycles\r
+ if (messageHistoryID.contains(msg.getID())) {\r
+ Message responseMsg = new Message(msg, node, MessageType.ROUTE_FAIL);\r
+\r
+ historyTuple.y = responseMsg;\r
+\r
+ transport = (Transport) node.getProtocol(transportID);\r
+ transport.send(node, msg.getSourceNode(), responseMsg, pid);\r
+ return;\r
+ }\r
+\r
+ /*\r
+ * If i'm arrived till here means that i'm not into a cycle --> i insert the message ID into\r
+ * the chronology\r
+ */\r
+ messageHistoryID.add(msg.getID());\r
+\r
+ switch (msg.getType()) {\r
+ case ROUTE:\r
+ key = (Double) msg.getBody();\r
+ Logger.getLogger(SymphonyProtocol.class.getName()).log(Level.FINEST, key + " " + identifier);\r
+ if (isManagerOf(key)) {\r
+ transport = (Transport) msg.getSourceNode().getProtocol(transportID);\r
+ Message responseMsg = new Message(new Tuple<Node, Double>(node, key), node, MessageType.ROUTE_RESPONSE);\r
+ historyTuple.y = responseMsg;\r
+ transport.send(node, msg.getSourceNode(), responseMsg, pid);\r
+ } else {\r
+ try {\r
+ Node targetNode = getCandidateForRouting(key);\r
+\r
+ try {\r
+ // I clone the message such a way to store the info (into the chronology) of the hop receiver\r
+ historyTuple.y = (Message) msg.clone();\r
+ historyTuple.y.setCurrentHop(targetNode);\r
+ } catch (CloneNotSupportedException ex) {\r
+ Logger.getLogger(SymphonyProtocol.class.getName()).log(Level.SEVERE, "Impossible to clonate the message!");\r
+ historyTuple.y = msg;\r
+ msg.setCurrentHop(targetNode);\r
+ }\r
+\r
+ transport = (Transport) node.getProtocol(transportID);\r
+ transport.send(node, targetNode, msg, pid);\r
+ } catch (RoutingException ex) {\r
+ /*\r
+ *\r
+ * I send the same message to myself (it is going to queue into the event\r
+ * queue and in this way i "earn" time (postpone) and i hope that the\r
+ * network will be ok in the meanwhile)\r
+ */\r
+ historyTuple.y = msg;\r
+ msg.setCurrentHop(node);\r
+ transport = (Transport) node.getProtocol(transportID);\r
+ transport.send(node, node, msg, pid);\r
+ }\r
+ }\r
+ break;\r
+ case ROUTE_RESPONSE:\r
+ Tuple<Node, Double> tuple = (Tuple<Node, Double>) msg.getBody();\r
+ key = tuple.y;\r
+ handler = mapHandler.get(key);\r
+ mapHandler.remove(key);\r
+ handler.handle(this, tuple);\r
+ break;\r
+ case ROUTE_FAIL:\r
+ Message requestMsg = (Message) msg.getBody();\r
+ key = (Double) requestMsg.getBody();\r
+ handler = mapHandler.get(key);\r
+ mapHandler.remove(key);\r
+ handler.handle(this, null);\r
+ break;\r
+ }\r
+ }\r
+\r
+ public boolean isManagerOf(double key) {\r
+\r
+ if (key == identifier) {\r
+ return true;\r
+ }\r
+\r
+ SymphonyNodeComparator comparator = new SymphonyNodeComparator(pid, identifier);\r
+ AdapterSymphonyNodeComparator adapterComparator = new AdapterSymphonyNodeComparator(comparator);\r
+\r
+ Collection<Tuple<Node, BootstrapStatus>> leftShortRangeLinksCloned = (Collection<Tuple<Node, BootstrapStatus>>) leftShortRangeLinks.clone();\r
+ Node targetNode = null;\r
+\r
+ while (targetNode == null && !leftShortRangeLinksCloned.isEmpty()) {\r
+ Tuple<Node, BootstrapStatus> nearTuple = Collections.min(leftShortRangeLinksCloned, adapterComparator);\r
+ if (nearTuple.y == BootstrapStatus.ONLINE) {\r
+ targetNode = nearTuple.x;\r
+ } else {\r
+ leftShortRangeLinksCloned.remove(nearTuple);\r
+ }\r
+ }\r
+\r
+ // SPECIAL CASE: NO LEFT NEIGHBOURS. I became the Manager.\r
+ if (targetNode == null) {\r
+ return true;\r
+ }\r
+\r
+ SymphonyProtocol symphony = (SymphonyProtocol) targetNode.getProtocol(pid);\r
+ // Check if it's the situation: right neighbour - key - me. So if i'm the manager or not.\r
+ boolean ret = isLeftNeighbour(identifier, key) && (!isLeftNeighbour(symphony.getIdentifier(), key) && symphony.getIdentifier() != key);\r
+\r
+ return ret;\r
+ }\r
+\r
+ public double getIdentifier() {\r
+ return identifier;\r
+ }\r
+\r
+ public Tuple<Message, Message>[] getHistoryMessage() {\r
+ SoftReference<Tuple<Message, Message>>[] array = messageHistory.toArray(new SoftReference[0]);\r
+ LinkedList<Tuple<Message, Message>> list = new LinkedList<Tuple<Message, Message>>();\r
+ for (SoftReference<Tuple<Message, Message>> reference : array) {\r
+ Tuple<Message, Message> tuple = reference.get();\r
+ if (tuple != null) {\r
+ list.add(tuple);\r
+ }\r
+ }\r
+ return list.toArray(new Tuple[0]);\r
+ }\r
+\r
+ public void clearHistoryMessage() {\r
+ messageHistory.clear();\r
+ }\r
+\r
+ private double generateUniqueIdentifier() {\r
+ boolean duplicated = true;\r
+ Double id = null;\r
+\r
+ while (duplicated) {\r
+ id = CommonState.r.nextDouble();\r
+ duplicated = allIdentifier.contains(id);\r
+ }\r
+\r
+ allIdentifier.add(id);\r
+\r
+ return id;\r
+ }\r
+\r
+ @Override\r
+ public Object clone() {\r
+ SymphonyProtocol dolly = new SymphonyProtocol(prefix);\r
+ return dolly;\r
+ }\r
+\r
+ public boolean isBootstrapped() {\r
+ return loggedIntoNetwork == BootstrapStatus.ONLINE;\r
+ }\r
+}\r