Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
peersimgrid release 1.0
[simgrid.git] / contrib / psg / src / example / symphony / SymphonyProtocol.java
diff --git a/contrib/psg/src/example/symphony/SymphonyProtocol.java b/contrib/psg/src/example/symphony/SymphonyProtocol.java
new file mode 100644 (file)
index 0000000..478f81e
--- /dev/null
@@ -0,0 +1,530 @@
+package example.symphony;\r
+\r
+import java.lang.ref.SoftReference;\r
+import java.util.*;\r
+import java.util.logging.Level;\r
+import java.util.logging.Logger;\r
+\r
+import example.symphony.Message.MessageType;\r
+import peersim.config.Configuration;\r
+import peersim.core.CommonState;\r
+import peersim.core.Node;\r
+import peersim.edsim.EDProtocol;\r
+import peersim.transport.Transport;\r
+\r
+/**\r
+ *\r
+ * @author Andrea Esposito <and1989@gmail.com>\r
+ */\r
+public class SymphonyProtocol implements EDProtocol {\r
+\r
+    private static final String PAR_SHORT_LINK = "shortlink";\r
+    private static final String PAR_LONG_LINK = "longlink";\r
+    private static final String PAR_TRANSP = "transport";\r
+    private static final String PAR_ROUTING = "routing";\r
+    private static final String PAR_LOOKAHEAD = "lookahead";\r
+    private static Set<Double> allIdentifier = new HashSet<Double>();\r
+    private final String prefix;\r
+    private static int pid;\r
+    private final int transportID;\r
+    private final double identifier;\r
+    public final int sequentialIdentifier;\r
+    private static int sequentialCounter = 0;\r
+    public final int numberShortRangeLinksPerSide;\r
+    public final boolean bidirectionalRouting;\r
+    public final boolean lookAhead;\r
+    public final boolean fixedLongRangeLinks;\r
+    public final int numberFixedLongRangeLinks;\r
+    public LinkedHashSet<Node> longRangeLinksOutgoing;\r
+    public LinkedHashSet<Node> longRangeLinksIncoming;\r
+    public LinkedHashSet<Tuple<Node, BootstrapStatus>> rightShortRangeLinks;\r
+    public LinkedHashSet<Tuple<Node, BootstrapStatus>> leftShortRangeLinks;\r
+    /**\r
+     * Array Contract: at position 0 -> OutgoingLongRangeLinks, 1 -> IncomingLongRangeLinks\r
+     */\r
+    public final LinkedHashMap<Node, Set<Double>[]> lookAheadMap;\r
+    private HashMap<Double, Handler> mapHandler;\r
+    /**\r
+     * IDs Set to verify if there are cycles\r
+     */\r
+    private Set<Long> messageHistoryID;\r
+    /**\r
+     *\r
+     * Tuple chronology that contains: <received message, the possible answer message>\r
+     *\r
+     * I use SoftReference as a trade off between memory usage and accurancy\r
+     */\r
+    private Set<SoftReference<Tuple<Message, Message>>> messageHistory;\r
+    private static boolean firstPrintConfig = true;\r
+\r
+    public enum BootstrapStatus {\r
+\r
+        NEW, OFFLINE, ONLINE_AND_ALL_NEIGHBOURS_OFFLINE, ONLINE\r
+    }\r
+    public BootstrapStatus loggedIntoNetwork;\r
+\r
+    public SymphonyProtocol(String prefix) {\r
+\r
+        this.prefix = prefix;\r
+        pid = Configuration.lookupPid(prefix.replaceAll("protocol.", ""));\r
+        transportID = Configuration.getPid(prefix + "." + PAR_TRANSP);\r
+        numberShortRangeLinksPerSide = Configuration.getInt(prefix + "." + PAR_SHORT_LINK, 2) / 2;\r
+        bidirectionalRouting = !Configuration.getString(prefix + "." + PAR_ROUTING, "bidirectional").toLowerCase().equals("unidirectional");\r
+        lookAhead = !Configuration.getString(prefix + "." + PAR_LOOKAHEAD, "on").toLowerCase().equals("off");\r
+        numberFixedLongRangeLinks = Configuration.getInt(prefix + "." + PAR_LONG_LINK, -1);\r
+        fixedLongRangeLinks = numberFixedLongRangeLinks >= 0;\r
+\r
+        longRangeLinksOutgoing = new LinkedHashSet<Node>();\r
+        longRangeLinksIncoming = new LinkedHashSet<Node>();\r
+        rightShortRangeLinks = new LinkedHashSet<Tuple<Node, BootstrapStatus>>();\r
+        leftShortRangeLinks = new LinkedHashSet<Tuple<Node, BootstrapStatus>>();\r
+        lookAheadMap = new LinkedHashMap<Node, Set<Double>[]>();\r
+\r
+        identifier = generateUniqueIdentifier();\r
+        sequentialIdentifier = sequentialCounter++;\r
+\r
+        mapHandler = new HashMap<Double, Handler>();\r
+\r
+        messageHistoryID = new HashSet<Long>();\r
+        messageHistory = new LinkedHashSet<SoftReference<Tuple<Message, Message>>>();\r
+        loggedIntoNetwork = BootstrapStatus.NEW;\r
+\r
+        printConfig();\r
+    }\r
+\r
+    private void printConfig() {\r
+\r
+        if (firstPrintConfig) {\r
+            firstPrintConfig = false;\r
+            System.out.println(SymphonyProtocol.class.getSimpleName() + " Configuration:");\r
+            System.out.println("- Number of short range links per side: " + numberShortRangeLinksPerSide);\r
+            System.out.println("- Number of long range links: " + (fixedLongRangeLinks ? numberFixedLongRangeLinks : "log(n)"));\r
+            System.out.println("- Routing mode: " + (bidirectionalRouting ? "Bidirectional" : "Unidirectional"));\r
+            System.out.println("- LookAhead status: " + (lookAhead ? "ON" : "OFF"));\r
+            System.out.println("-------------------------------\n");\r
+        }\r
+    }\r
+\r
+    /**\r
+     *\r
+     * Method to identify the next node that has to be contacted. It's going to be used the mode\r
+     * that is described into the configuration file\r
+     */\r
+    public Node getCandidateForRouting(double identifierToRoute) throws RoutingException {\r
+        if (bidirectionalRouting) {\r
+            return getCandidateForBidirectionalRoute(identifierToRoute);\r
+        } else {\r
+            return getCandidateForUnidirectionalRoute(identifierToRoute);\r
+        }\r
+    }\r
+\r
+    /**\r
+     *\r
+     * Method to individuate the next node that as to be contacted through Unidirectional Routing\r
+     * mode\r
+     */\r
+    public Node getCandidateForUnidirectionalRoute(double identifierToRoute) throws RoutingException {\r
+\r
+        LinkedHashSet<Node> allLinks = new LinkedHashSet<Node>();\r
+        Node manager = putShortRangeLinksIntoContainerForRouting(allLinks, identifierToRoute);\r
+\r
+        if (manager != null) {\r
+            return manager;\r
+        }\r
+\r
+        allLinks.addAll(longRangeLinksOutgoing);\r
+\r
+        return findClosestNode(identifierToRoute, allLinks, true);\r
+    }\r
+\r
+    /**\r
+     * Method to individuate the next node that as to be contacted through Bidirectional Routing\r
+     * mode\r
+     */\r
+    public Node getCandidateForBidirectionalRoute(double identifierToRoute) throws RoutingException {\r
+\r
+        LinkedHashSet<Node> allLinks = new LinkedHashSet<Node>();\r
+        Node manager = putShortRangeLinksIntoContainerForRouting(allLinks, identifierToRoute);\r
+\r
+        if (manager != null) {\r
+            return manager;\r
+        }\r
+\r
+        allLinks.addAll(longRangeLinksOutgoing);\r
+        allLinks.addAll(longRangeLinksIncoming);\r
+\r
+        return findClosestNode(identifierToRoute, allLinks, false);\r
+    }\r
+\r
+    /**\r
+     * @return Null if it is NOT found the manager. Node if it is found.\r
+     */\r
+    private Node putShortRangeLinksIntoContainerForRouting(Set<Node> container, double identifierToRoute) {\r
+        for (Tuple<Node, BootstrapStatus> rightTuple : rightShortRangeLinks) {\r
+            if (rightTuple.y == BootstrapStatus.ONLINE) {\r
+                container.add(rightTuple.x);\r
+            }\r
+        }\r
+\r
+        if (!container.isEmpty()) {\r
+\r
+            // Special case: i verify if the neighbour at my right (ONLINE) is the manager\r
+            SymphonyNodeComparator comparator = new SymphonyNodeComparator(pid, identifier);\r
+            Node nearRightNeighbour = Collections.min(container, comparator);\r
+            if (nearRightNeighbour != null) {\r
+                SymphonyProtocol symphony = (SymphonyProtocol) nearRightNeighbour.getProtocol(pid);\r
+                if (!isLeftNeighbour(identifier, identifierToRoute) && isLeftNeighbour(symphony.getIdentifier(), identifierToRoute)) {\r
+                    return nearRightNeighbour;\r
+                }\r
+            }\r
+        }\r
+\r
+        for (Tuple<Node, BootstrapStatus> leftTuple : leftShortRangeLinks) {\r
+            if (leftTuple.y == BootstrapStatus.ONLINE) {\r
+                container.add(leftTuple.x);\r
+            }\r
+        }\r
+\r
+        return null;\r
+    }\r
+\r
+    /**\r
+     *\r
+     * Individuates effectively the next candidate for the routing. Checks if the lookahead is\r
+     * activated and in case of affirmative answer it's going to use that information.\r
+     *\r
+     * @param identifierToRoute Identifier to reach\r
+     * @param container Candidate Nodes Container\r
+     * @param clockwise true, does unidirectional routing. false, does bidirectional routing.\r
+     * @return The nearest node to reach identifierToRoute\r
+     * @throws RoutingException Throw in case no candidate is found\r
+     */\r
+    public Node findClosestNode(final double identifierToRoute, final Iterable<Node> container, final boolean clockwise) throws RoutingException {\r
+        Node ret = null;\r
+        double min = Double.MAX_VALUE;\r
+\r
+        for (Node node : container) {\r
+            SymphonyProtocol symphonyNodeContainer = (SymphonyProtocol) node.getProtocol(pid);\r
+            double realCandidateIdentifier = symphonyNodeContainer.getIdentifier();\r
+\r
+            Set<Double> candidateIdentifierSet = new LinkedHashSet<Double>();\r
+            candidateIdentifierSet.add(realCandidateIdentifier);\r
+\r
+            boolean lookAheadClockwise = true;\r
+\r
+            /*\r
+             *\r
+             * If lookahead is activated add all the reachable identifiers. No checks are performed\r
+             * on the node type (short/long) because at maximum the map return null.\r
+             */\r
+            if (lookAhead) {\r
+                Set<Double>[] lookAheadIdentifierSetArray = lookAheadMap.get(node);\r
+\r
+                if (lookAheadIdentifierSetArray != null) {\r
+                    Set<Double> lookAheadIdentifierSet = lookAheadIdentifierSetArray[0];\r
+\r
+                    if (lookAheadIdentifierSet == null) {\r
+                        lookAheadIdentifierSet = new LinkedHashSet<Double>();\r
+                    }\r
+\r
+                    /*\r
+                     *\r
+                     * If bidirectional routing is going to be performed so i put into account also\r
+                     * the Incoming Long Range Links of the current neighbour\r
+                     */\r
+                    if (bidirectionalRouting && lookAheadIdentifierSetArray[1] != null) {\r
+                        lookAheadIdentifierSet.addAll(lookAheadIdentifierSetArray[1]);\r
+                        lookAheadClockwise = false;\r
+                    }\r
+\r
+                    if (!lookAheadIdentifierSet.isEmpty()) {\r
+                        candidateIdentifierSet.addAll(lookAheadIdentifierSet);\r
+                    }\r
+                }\r
+            }\r
+\r
+            for (Double candidateIdentifier : candidateIdentifierSet) {\r
+                // if it is a my neighbour i use my routing mode instead if it is a looAhead one i use its routing mode\r
+                boolean currentClockwise = candidateIdentifier.equals(realCandidateIdentifier) ? clockwise : lookAheadClockwise;\r
+\r
+                double distance = Math.abs(candidateIdentifier - identifierToRoute);\r
+                distance = Math.min(distance, 1.0 - distance);\r
+\r
+                // if clockwise i have to exclude the case: candidateIdentifier - indentifierToRoute - identifier\r
+                if (currentClockwise) {\r
+                    if (isLeftNeighbour(candidateIdentifier, identifierToRoute)) {\r
+\r
+                        // Special case (0.9 - 0.1) the normal order is not more meanful to decide the side\r
+                        if (identifierToRoute >= candidateIdentifier) {\r
+                            distance = identifierToRoute - candidateIdentifier;\r
+                        } else {\r
+                            distance = (1.0 - candidateIdentifier) + identifierToRoute;\r
+                        }\r
+                    } else {\r
+                        distance = (1.0 - (candidateIdentifier - identifierToRoute)) % 1;\r
+                    }\r
+                }\r
+\r
+                /*\r
+                 *\r
+                 * Priority to the node that i'm directly connected and only after i use the\r
+                 * lookAhead information\r
+                 */\r
+                if (min >= Math.abs(distance)\r
+                        && (candidateIdentifier.equals(realCandidateIdentifier)\r
+                        || ret == null\r
+                        || min > Math.abs(distance))) {\r
+                    ret = node;\r
+                    min = Math.abs(distance);\r
+                }\r
+            }\r
+        }\r
+\r
+        if (ret == null) {\r
+            throw new RoutingException("Impossible do routing. [Hit: Neighbour links (maybe) not yet online.");\r
+        }\r
+\r
+        return ret;\r
+    }\r
+\r
+    /**\r
+     *\r
+     * @param neighbourNode Neighbour Node\r
+     * @return true if the node is a left neighbour (or itself), false if it is a right one\r
+     */\r
+    public static boolean isLeftNeighbour(Node rootNode, Node neighbourNode) {\r
+        SymphonyProtocol rootSymphony = (SymphonyProtocol) rootNode.getProtocol(pid);\r
+        SymphonyProtocol neighbourSymphony = (SymphonyProtocol) neighbourNode.getProtocol(pid);\r
+\r
+        return isLeftNeighbour(rootSymphony.getIdentifier(), neighbourSymphony.getIdentifier());\r
+    }\r
+\r
+    public static boolean isLeftNeighbour(double rootIdentifier, double neighbourIdentifier) {\r
+\r
+        // I calculate putting the hypotesis that i have to translate/"normalize", after i'll check if it was useless\r
+        double traslateRootIdentifier = (rootIdentifier + 0.5) % 1;\r
+        double traslateNeighbourIdentifier = (neighbourIdentifier + 0.5) % 1;\r
+        double distance = traslateNeighbourIdentifier - traslateRootIdentifier;\r
+\r
+        // I verify if the neighbourIdentifier is over half ring, if yes i don't need to do the translation/"normalization"\r
+        if ((neighbourIdentifier + 0.5) != traslateNeighbourIdentifier) {\r
+            distance = neighbourIdentifier - rootIdentifier;\r
+        }\r
+\r
+        return distance >= 0 && distance <= 0.5;\r
+    }\r
+\r
+    public void route(Node src, double key, Handler handler) throws RoutingException {\r
+\r
+        mapHandler.put(key, handler);\r
+\r
+        Message msg = new Message(key, src, MessageType.ROUTE);\r
+\r
+        Node targetNode = src;\r
+\r
+        if (!isManagerOf(key)) {\r
+            targetNode = getCandidateForRouting(key);\r
+            Transport transport = (Transport) src.getProtocol(transportID);\r
+            transport.send(src, targetNode, msg, pid);\r
+        }\r
+\r
+        // Insert the message into the chronology\r
+        Tuple<Message, Message> historyTuple = new Tuple<Message, Message>();\r
+        try {\r
+            historyTuple.x = msg;\r
+            historyTuple.y = (Message) msg.clone();\r
+            historyTuple.y.setCurrentHop(targetNode);\r
+        } catch (CloneNotSupportedException ex) {\r
+            Logger.getLogger(SymphonyProtocol.class.getName()).log(Level.SEVERE, "Impossible to clonate the message!");\r
+            historyTuple.x = null;\r
+            historyTuple.y = msg;\r
+            msg.setCurrentHop(targetNode);\r
+        }\r
+        messageHistory.add(new SoftReference<Tuple<Message, Message>>(historyTuple));\r
+        messageHistoryID.add(msg.getID());\r
+\r
+        /*\r
+         *\r
+         * If i am the manager (brutally through the reference), i don't do the loopback routing but\r
+         * i soddisfy immediately the request\r
+         */\r
+        if (targetNode == src) {\r
+\r
+            // Uppdate the chronology\r
+            historyTuple.y = new Message(key, targetNode, MessageType.ROUTE_RESPONSE);\r
+\r
+            Tuple<Node, Double> tuple = new Tuple<Node, Double>(src, key);\r
+            mapHandler.remove(key);\r
+            handler.handle(this, tuple);\r
+        }\r
+    }\r
+\r
+    public void processEvent(Node node, int pid, Object event) {\r
+        Message msg = (Message) event;\r
+        msg.incrementHop(); // I increment the message Hop\r
+\r
+        Tuple<Message, Message> historyTuple = new Tuple<Message, Message>();\r
+        try {\r
+            // I clone the message such a way to store into the chronology the hop sender's information\r
+            historyTuple.x = (Message) msg.clone();\r
+        } catch (CloneNotSupportedException ex) {\r
+            Logger.getLogger(SymphonyProtocol.class.getName()).log(Level.SEVERE, "Impossible to clonate the message!");\r
+            historyTuple.x = msg;\r
+        }\r
+\r
+        messageHistory.add(new SoftReference<Tuple<Message, Message>>(historyTuple));\r
+\r
+        Double key;\r
+        Transport transport;\r
+        Handler handler;\r
+\r
+        // Individuate cycles\r
+        if (messageHistoryID.contains(msg.getID())) {\r
+            Message responseMsg = new Message(msg, node, MessageType.ROUTE_FAIL);\r
+\r
+            historyTuple.y = responseMsg;\r
+\r
+            transport = (Transport) node.getProtocol(transportID);\r
+            transport.send(node, msg.getSourceNode(), responseMsg, pid);\r
+            return;\r
+        }\r
+\r
+        /*\r
+         * If i'm arrived till here means that i'm not into a cycle --> i insert the message ID into\r
+         * the chronology\r
+         */\r
+        messageHistoryID.add(msg.getID());\r
+\r
+        switch (msg.getType()) {\r
+            case ROUTE:\r
+                key = (Double) msg.getBody();\r
+                Logger.getLogger(SymphonyProtocol.class.getName()).log(Level.FINEST, key + " " + identifier);\r
+                if (isManagerOf(key)) {\r
+                    transport = (Transport) msg.getSourceNode().getProtocol(transportID);\r
+                    Message responseMsg = new Message(new Tuple<Node, Double>(node, key), node, MessageType.ROUTE_RESPONSE);\r
+                    historyTuple.y = responseMsg;\r
+                    transport.send(node, msg.getSourceNode(), responseMsg, pid);\r
+                } else {\r
+                    try {\r
+                        Node targetNode = getCandidateForRouting(key);\r
+\r
+                        try {\r
+                            // I clone the message such a way to store the info (into the chronology) of the hop receiver\r
+                            historyTuple.y = (Message) msg.clone();\r
+                            historyTuple.y.setCurrentHop(targetNode);\r
+                        } catch (CloneNotSupportedException ex) {\r
+                            Logger.getLogger(SymphonyProtocol.class.getName()).log(Level.SEVERE, "Impossible to clonate the message!");\r
+                            historyTuple.y = msg;\r
+                            msg.setCurrentHop(targetNode);\r
+                        }\r
+\r
+                        transport = (Transport) node.getProtocol(transportID);\r
+                        transport.send(node, targetNode, msg, pid);\r
+                    } catch (RoutingException ex) {\r
+                        /*\r
+                         *\r
+                         * I send the same message to myself (it is going to queue into the event\r
+                         * queue and in this way i "earn" time (postpone) and i hope that the\r
+                         * network will be ok in the meanwhile)\r
+                         */\r
+                        historyTuple.y = msg;\r
+                        msg.setCurrentHop(node);\r
+                        transport = (Transport) node.getProtocol(transportID);\r
+                        transport.send(node, node, msg, pid);\r
+                    }\r
+                }\r
+                break;\r
+            case ROUTE_RESPONSE:\r
+                Tuple<Node, Double> tuple = (Tuple<Node, Double>) msg.getBody();\r
+                key = tuple.y;\r
+                handler = mapHandler.get(key);\r
+                mapHandler.remove(key);\r
+                handler.handle(this, tuple);\r
+                break;\r
+            case ROUTE_FAIL:\r
+                Message requestMsg = (Message) msg.getBody();\r
+                key = (Double) requestMsg.getBody();\r
+                handler = mapHandler.get(key);\r
+                mapHandler.remove(key);\r
+                handler.handle(this, null);\r
+                break;\r
+        }\r
+    }\r
+\r
+    public boolean isManagerOf(double key) {\r
+\r
+        if (key == identifier) {\r
+            return true;\r
+        }\r
+\r
+        SymphonyNodeComparator comparator = new SymphonyNodeComparator(pid, identifier);\r
+        AdapterSymphonyNodeComparator adapterComparator = new AdapterSymphonyNodeComparator(comparator);\r
+\r
+        Collection<Tuple<Node, BootstrapStatus>> leftShortRangeLinksCloned = (Collection<Tuple<Node, BootstrapStatus>>) leftShortRangeLinks.clone();\r
+        Node targetNode = null;\r
+\r
+        while (targetNode == null && !leftShortRangeLinksCloned.isEmpty()) {\r
+            Tuple<Node, BootstrapStatus> nearTuple = Collections.min(leftShortRangeLinksCloned, adapterComparator);\r
+            if (nearTuple.y == BootstrapStatus.ONLINE) {\r
+                targetNode = nearTuple.x;\r
+            } else {\r
+                leftShortRangeLinksCloned.remove(nearTuple);\r
+            }\r
+        }\r
+\r
+        // SPECIAL CASE: NO LEFT NEIGHBOURS. I became the Manager.\r
+        if (targetNode == null) {\r
+            return true;\r
+        }\r
+\r
+        SymphonyProtocol symphony = (SymphonyProtocol) targetNode.getProtocol(pid);\r
+        // Check if it's the situation: right neighbour - key - me. So if i'm the manager or not.\r
+        boolean ret = isLeftNeighbour(identifier, key) && (!isLeftNeighbour(symphony.getIdentifier(), key) && symphony.getIdentifier() != key);\r
+\r
+        return ret;\r
+    }\r
+\r
+    public double getIdentifier() {\r
+        return identifier;\r
+    }\r
+\r
+    public Tuple<Message, Message>[] getHistoryMessage() {\r
+        SoftReference<Tuple<Message, Message>>[] array = messageHistory.toArray(new SoftReference[0]);\r
+        LinkedList<Tuple<Message, Message>> list = new LinkedList<Tuple<Message, Message>>();\r
+        for (SoftReference<Tuple<Message, Message>> reference : array) {\r
+            Tuple<Message, Message> tuple = reference.get();\r
+            if (tuple != null) {\r
+                list.add(tuple);\r
+            }\r
+        }\r
+        return list.toArray(new Tuple[0]);\r
+    }\r
+\r
+    public void clearHistoryMessage() {\r
+        messageHistory.clear();\r
+    }\r
+\r
+    private double generateUniqueIdentifier() {\r
+        boolean duplicated = true;\r
+        Double id = null;\r
+\r
+        while (duplicated) {\r
+            id = CommonState.r.nextDouble();\r
+            duplicated = allIdentifier.contains(id);\r
+        }\r
+\r
+        allIdentifier.add(id);\r
+\r
+        return id;\r
+    }\r
+\r
+    @Override\r
+    public Object clone() {\r
+        SymphonyProtocol dolly = new SymphonyProtocol(prefix);\r
+        return dolly;\r
+    }\r
+\r
+    public boolean isBootstrapped() {\r
+        return loggedIntoNetwork == BootstrapStatus.ONLINE;\r
+    }\r
+}\r