1 package example.symphony;
\r
3 import java.lang.ref.SoftReference;
\r
5 import java.util.logging.Level;
\r
6 import java.util.logging.Logger;
\r
8 import example.symphony.Message.MessageType;
\r
9 import peersim.config.Configuration;
\r
10 import peersim.core.CommonState;
\r
11 import peersim.core.Node;
\r
12 import peersim.edsim.EDProtocol;
\r
13 import peersim.transport.Transport;
\r
17 * @author Andrea Esposito <and1989@gmail.com>
\r
19 public class SymphonyProtocol implements EDProtocol {
\r
21 private static final String PAR_SHORT_LINK = "shortlink";
\r
22 private static final String PAR_LONG_LINK = "longlink";
\r
23 private static final String PAR_TRANSP = "transport";
\r
24 private static final String PAR_ROUTING = "routing";
\r
25 private static final String PAR_LOOKAHEAD = "lookahead";
\r
26 private static Set<Double> allIdentifier = new HashSet<Double>();
\r
27 private final String prefix;
\r
28 private static int pid;
\r
29 private final int transportID;
\r
30 private final double identifier;
\r
31 public final int sequentialIdentifier;
\r
32 private static int sequentialCounter = 0;
\r
33 public final int numberShortRangeLinksPerSide;
\r
34 public final boolean bidirectionalRouting;
\r
35 public final boolean lookAhead;
\r
36 public final boolean fixedLongRangeLinks;
\r
37 public final int numberFixedLongRangeLinks;
\r
38 public LinkedHashSet<Node> longRangeLinksOutgoing;
\r
39 public LinkedHashSet<Node> longRangeLinksIncoming;
\r
40 public LinkedHashSet<Tuple<Node, BootstrapStatus>> rightShortRangeLinks;
\r
41 public LinkedHashSet<Tuple<Node, BootstrapStatus>> leftShortRangeLinks;
\r
43 * Array Contract: at position 0 -> OutgoingLongRangeLinks, 1 -> IncomingLongRangeLinks
\r
45 public final LinkedHashMap<Node, Set<Double>[]> lookAheadMap;
\r
46 private HashMap<Double, Handler> mapHandler;
\r
48 * IDs Set to verify if there are cycles
\r
50 private Set<Long> messageHistoryID;
\r
53 * Tuple chronology that contains: <received message, the possible answer message>
\r
55 * I use SoftReference as a trade off between memory usage and accurancy
\r
57 private Set<SoftReference<Tuple<Message, Message>>> messageHistory;
\r
58 private static boolean firstPrintConfig = true;
\r
60 public enum BootstrapStatus {
\r
62 NEW, OFFLINE, ONLINE_AND_ALL_NEIGHBOURS_OFFLINE, ONLINE
\r
64 public BootstrapStatus loggedIntoNetwork;
\r
66 public SymphonyProtocol(String prefix) {
\r
68 this.prefix = prefix;
\r
69 pid = Configuration.lookupPid(prefix.replaceAll("protocol.", ""));
\r
70 transportID = Configuration.getPid(prefix + "." + PAR_TRANSP);
\r
71 numberShortRangeLinksPerSide = Configuration.getInt(prefix + "." + PAR_SHORT_LINK, 2) / 2;
\r
72 bidirectionalRouting = !Configuration.getString(prefix + "." + PAR_ROUTING, "bidirectional").toLowerCase().equals("unidirectional");
\r
73 lookAhead = !Configuration.getString(prefix + "." + PAR_LOOKAHEAD, "on").toLowerCase().equals("off");
\r
74 numberFixedLongRangeLinks = Configuration.getInt(prefix + "." + PAR_LONG_LINK, -1);
\r
75 fixedLongRangeLinks = numberFixedLongRangeLinks >= 0;
\r
77 longRangeLinksOutgoing = new LinkedHashSet<Node>();
\r
78 longRangeLinksIncoming = new LinkedHashSet<Node>();
\r
79 rightShortRangeLinks = new LinkedHashSet<Tuple<Node, BootstrapStatus>>();
\r
80 leftShortRangeLinks = new LinkedHashSet<Tuple<Node, BootstrapStatus>>();
\r
81 lookAheadMap = new LinkedHashMap<Node, Set<Double>[]>();
\r
83 identifier = generateUniqueIdentifier();
\r
84 sequentialIdentifier = sequentialCounter++;
\r
86 mapHandler = new HashMap<Double, Handler>();
\r
88 messageHistoryID = new HashSet<Long>();
\r
89 messageHistory = new LinkedHashSet<SoftReference<Tuple<Message, Message>>>();
\r
90 loggedIntoNetwork = BootstrapStatus.NEW;
\r
95 private void printConfig() {
\r
97 if (firstPrintConfig) {
\r
98 firstPrintConfig = false;
\r
99 System.out.println(SymphonyProtocol.class.getSimpleName() + " Configuration:");
\r
100 System.out.println("- Number of short range links per side: " + numberShortRangeLinksPerSide);
\r
101 System.out.println("- Number of long range links: " + (fixedLongRangeLinks ? numberFixedLongRangeLinks : "log(n)"));
\r
102 System.out.println("- Routing mode: " + (bidirectionalRouting ? "Bidirectional" : "Unidirectional"));
\r
103 System.out.println("- LookAhead status: " + (lookAhead ? "ON" : "OFF"));
\r
104 System.out.println("-------------------------------\n");
\r
110 * Method to identify the next node that has to be contacted. It's going to be used the mode
\r
111 * that is described into the configuration file
\r
113 public Node getCandidateForRouting(double identifierToRoute) throws RoutingException {
\r
114 if (bidirectionalRouting) {
\r
115 return getCandidateForBidirectionalRoute(identifierToRoute);
\r
117 return getCandidateForUnidirectionalRoute(identifierToRoute);
\r
123 * Method to individuate the next node that as to be contacted through Unidirectional Routing
\r
126 public Node getCandidateForUnidirectionalRoute(double identifierToRoute) throws RoutingException {
\r
128 LinkedHashSet<Node> allLinks = new LinkedHashSet<Node>();
\r
129 Node manager = putShortRangeLinksIntoContainerForRouting(allLinks, identifierToRoute);
\r
131 if (manager != null) {
\r
135 allLinks.addAll(longRangeLinksOutgoing);
\r
137 return findClosestNode(identifierToRoute, allLinks, true);
\r
141 * Method to individuate the next node that as to be contacted through Bidirectional Routing
\r
144 public Node getCandidateForBidirectionalRoute(double identifierToRoute) throws RoutingException {
\r
146 LinkedHashSet<Node> allLinks = new LinkedHashSet<Node>();
\r
147 Node manager = putShortRangeLinksIntoContainerForRouting(allLinks, identifierToRoute);
\r
149 if (manager != null) {
\r
153 allLinks.addAll(longRangeLinksOutgoing);
\r
154 allLinks.addAll(longRangeLinksIncoming);
\r
156 return findClosestNode(identifierToRoute, allLinks, false);
\r
160 * @return Null if it is NOT found the manager. Node if it is found.
\r
162 private Node putShortRangeLinksIntoContainerForRouting(Set<Node> container, double identifierToRoute) {
\r
163 for (Tuple<Node, BootstrapStatus> rightTuple : rightShortRangeLinks) {
\r
164 if (rightTuple.y == BootstrapStatus.ONLINE) {
\r
165 container.add(rightTuple.x);
\r
169 if (!container.isEmpty()) {
\r
171 // Special case: i verify if the neighbour at my right (ONLINE) is the manager
\r
172 SymphonyNodeComparator comparator = new SymphonyNodeComparator(pid, identifier);
\r
173 Node nearRightNeighbour = Collections.min(container, comparator);
\r
174 if (nearRightNeighbour != null) {
\r
175 SymphonyProtocol symphony = (SymphonyProtocol) nearRightNeighbour.getProtocol(pid);
\r
176 if (!isLeftNeighbour(identifier, identifierToRoute) && isLeftNeighbour(symphony.getIdentifier(), identifierToRoute)) {
\r
177 return nearRightNeighbour;
\r
182 for (Tuple<Node, BootstrapStatus> leftTuple : leftShortRangeLinks) {
\r
183 if (leftTuple.y == BootstrapStatus.ONLINE) {
\r
184 container.add(leftTuple.x);
\r
193 * Individuates effectively the next candidate for the routing. Checks if the lookahead is
\r
194 * activated and in case of affirmative answer it's going to use that information.
\r
196 * @param identifierToRoute Identifier to reach
\r
197 * @param container Candidate Nodes Container
\r
198 * @param clockwise true, does unidirectional routing. false, does bidirectional routing.
\r
199 * @return The nearest node to reach identifierToRoute
\r
200 * @throws RoutingException Throw in case no candidate is found
\r
202 public Node findClosestNode(final double identifierToRoute, final Iterable<Node> container, final boolean clockwise) throws RoutingException {
\r
204 double min = Double.MAX_VALUE;
\r
206 for (Node node : container) {
\r
207 SymphonyProtocol symphonyNodeContainer = (SymphonyProtocol) node.getProtocol(pid);
\r
208 double realCandidateIdentifier = symphonyNodeContainer.getIdentifier();
\r
210 Set<Double> candidateIdentifierSet = new LinkedHashSet<Double>();
\r
211 candidateIdentifierSet.add(realCandidateIdentifier);
\r
213 boolean lookAheadClockwise = true;
\r
217 * If lookahead is activated add all the reachable identifiers. No checks are performed
\r
218 * on the node type (short/long) because at maximum the map return null.
\r
221 Set<Double>[] lookAheadIdentifierSetArray = lookAheadMap.get(node);
\r
223 if (lookAheadIdentifierSetArray != null) {
\r
224 Set<Double> lookAheadIdentifierSet = lookAheadIdentifierSetArray[0];
\r
226 if (lookAheadIdentifierSet == null) {
\r
227 lookAheadIdentifierSet = new LinkedHashSet<Double>();
\r
232 * If bidirectional routing is going to be performed so i put into account also
\r
233 * the Incoming Long Range Links of the current neighbour
\r
235 if (bidirectionalRouting && lookAheadIdentifierSetArray[1] != null) {
\r
236 lookAheadIdentifierSet.addAll(lookAheadIdentifierSetArray[1]);
\r
237 lookAheadClockwise = false;
\r
240 if (!lookAheadIdentifierSet.isEmpty()) {
\r
241 candidateIdentifierSet.addAll(lookAheadIdentifierSet);
\r
246 for (Double candidateIdentifier : candidateIdentifierSet) {
\r
247 // if it is a my neighbour i use my routing mode instead if it is a looAhead one i use its routing mode
\r
248 boolean currentClockwise = candidateIdentifier.equals(realCandidateIdentifier) ? clockwise : lookAheadClockwise;
\r
250 double distance = Math.abs(candidateIdentifier - identifierToRoute);
\r
251 distance = Math.min(distance, 1.0 - distance);
\r
253 // if clockwise i have to exclude the case: candidateIdentifier - indentifierToRoute - identifier
\r
254 if (currentClockwise) {
\r
255 if (isLeftNeighbour(candidateIdentifier, identifierToRoute)) {
\r
257 // Special case (0.9 - 0.1) the normal order is not more meanful to decide the side
\r
258 if (identifierToRoute >= candidateIdentifier) {
\r
259 distance = identifierToRoute - candidateIdentifier;
\r
261 distance = (1.0 - candidateIdentifier) + identifierToRoute;
\r
264 distance = (1.0 - (candidateIdentifier - identifierToRoute)) % 1;
\r
270 * Priority to the node that i'm directly connected and only after i use the
\r
271 * lookAhead information
\r
273 if (min >= Math.abs(distance)
\r
274 && (candidateIdentifier.equals(realCandidateIdentifier)
\r
276 || min > Math.abs(distance))) {
\r
278 min = Math.abs(distance);
\r
284 throw new RoutingException("Impossible do routing. [Hit: Neighbour links (maybe) not yet online.");
\r
292 * @param neighbourNode Neighbour Node
\r
293 * @return true if the node is a left neighbour (or itself), false if it is a right one
\r
295 public static boolean isLeftNeighbour(Node rootNode, Node neighbourNode) {
\r
296 SymphonyProtocol rootSymphony = (SymphonyProtocol) rootNode.getProtocol(pid);
\r
297 SymphonyProtocol neighbourSymphony = (SymphonyProtocol) neighbourNode.getProtocol(pid);
\r
299 return isLeftNeighbour(rootSymphony.getIdentifier(), neighbourSymphony.getIdentifier());
\r
302 public static boolean isLeftNeighbour(double rootIdentifier, double neighbourIdentifier) {
\r
304 // I calculate putting the hypotesis that i have to translate/"normalize", after i'll check if it was useless
\r
305 double traslateRootIdentifier = (rootIdentifier + 0.5) % 1;
\r
306 double traslateNeighbourIdentifier = (neighbourIdentifier + 0.5) % 1;
\r
307 double distance = traslateNeighbourIdentifier - traslateRootIdentifier;
\r
309 // I verify if the neighbourIdentifier is over half ring, if yes i don't need to do the translation/"normalization"
\r
310 if ((neighbourIdentifier + 0.5) != traslateNeighbourIdentifier) {
\r
311 distance = neighbourIdentifier - rootIdentifier;
\r
314 return distance >= 0 && distance <= 0.5;
\r
317 public void route(Node src, double key, Handler handler) throws RoutingException {
\r
319 mapHandler.put(key, handler);
\r
321 Message msg = new Message(key, src, MessageType.ROUTE);
\r
323 Node targetNode = src;
\r
325 if (!isManagerOf(key)) {
\r
326 targetNode = getCandidateForRouting(key);
\r
327 Transport transport = (Transport) src.getProtocol(transportID);
\r
328 transport.send(src, targetNode, msg, pid);
\r
331 // Insert the message into the chronology
\r
332 Tuple<Message, Message> historyTuple = new Tuple<Message, Message>();
\r
334 historyTuple.x = msg;
\r
335 historyTuple.y = (Message) msg.clone();
\r
336 historyTuple.y.setCurrentHop(targetNode);
\r
337 } catch (CloneNotSupportedException ex) {
\r
338 Logger.getLogger(SymphonyProtocol.class.getName()).log(Level.SEVERE, "Impossible to clonate the message!");
\r
339 historyTuple.x = null;
\r
340 historyTuple.y = msg;
\r
341 msg.setCurrentHop(targetNode);
\r
343 messageHistory.add(new SoftReference<Tuple<Message, Message>>(historyTuple));
\r
344 messageHistoryID.add(msg.getID());
\r
348 * If i am the manager (brutally through the reference), i don't do the loopback routing but
\r
349 * i soddisfy immediately the request
\r
351 if (targetNode == src) {
\r
353 // Uppdate the chronology
\r
354 historyTuple.y = new Message(key, targetNode, MessageType.ROUTE_RESPONSE);
\r
356 Tuple<Node, Double> tuple = new Tuple<Node, Double>(src, key);
\r
357 mapHandler.remove(key);
\r
358 handler.handle(this, tuple);
\r
362 public void processEvent(Node node, int pid, Object event) {
\r
363 Message msg = (Message) event;
\r
364 msg.incrementHop(); // I increment the message Hop
\r
366 Tuple<Message, Message> historyTuple = new Tuple<Message, Message>();
\r
368 // I clone the message such a way to store into the chronology the hop sender's information
\r
369 historyTuple.x = (Message) msg.clone();
\r
370 } catch (CloneNotSupportedException ex) {
\r
371 Logger.getLogger(SymphonyProtocol.class.getName()).log(Level.SEVERE, "Impossible to clonate the message!");
\r
372 historyTuple.x = msg;
\r
375 messageHistory.add(new SoftReference<Tuple<Message, Message>>(historyTuple));
\r
378 Transport transport;
\r
381 // Individuate cycles
\r
382 if (messageHistoryID.contains(msg.getID())) {
\r
383 Message responseMsg = new Message(msg, node, MessageType.ROUTE_FAIL);
\r
385 historyTuple.y = responseMsg;
\r
387 transport = (Transport) node.getProtocol(transportID);
\r
388 transport.send(node, msg.getSourceNode(), responseMsg, pid);
\r
393 * If i'm arrived till here means that i'm not into a cycle --> i insert the message ID into
\r
396 messageHistoryID.add(msg.getID());
\r
398 switch (msg.getType()) {
\r
400 key = (Double) msg.getBody();
\r
401 Logger.getLogger(SymphonyProtocol.class.getName()).log(Level.FINEST, key + " " + identifier);
\r
402 if (isManagerOf(key)) {
\r
403 transport = (Transport) msg.getSourceNode().getProtocol(transportID);
\r
404 Message responseMsg = new Message(new Tuple<Node, Double>(node, key), node, MessageType.ROUTE_RESPONSE);
\r
405 historyTuple.y = responseMsg;
\r
406 transport.send(node, msg.getSourceNode(), responseMsg, pid);
\r
409 Node targetNode = getCandidateForRouting(key);
\r
412 // I clone the message such a way to store the info (into the chronology) of the hop receiver
\r
413 historyTuple.y = (Message) msg.clone();
\r
414 historyTuple.y.setCurrentHop(targetNode);
\r
415 } catch (CloneNotSupportedException ex) {
\r
416 Logger.getLogger(SymphonyProtocol.class.getName()).log(Level.SEVERE, "Impossible to clonate the message!");
\r
417 historyTuple.y = msg;
\r
418 msg.setCurrentHop(targetNode);
\r
421 transport = (Transport) node.getProtocol(transportID);
\r
422 transport.send(node, targetNode, msg, pid);
\r
423 } catch (RoutingException ex) {
\r
426 * I send the same message to myself (it is going to queue into the event
\r
427 * queue and in this way i "earn" time (postpone) and i hope that the
\r
428 * network will be ok in the meanwhile)
\r
430 historyTuple.y = msg;
\r
431 msg.setCurrentHop(node);
\r
432 transport = (Transport) node.getProtocol(transportID);
\r
433 transport.send(node, node, msg, pid);
\r
437 case ROUTE_RESPONSE:
\r
438 Tuple<Node, Double> tuple = (Tuple<Node, Double>) msg.getBody();
\r
440 handler = mapHandler.get(key);
\r
441 mapHandler.remove(key);
\r
442 handler.handle(this, tuple);
\r
445 Message requestMsg = (Message) msg.getBody();
\r
446 key = (Double) requestMsg.getBody();
\r
447 handler = mapHandler.get(key);
\r
448 mapHandler.remove(key);
\r
449 handler.handle(this, null);
\r
454 public boolean isManagerOf(double key) {
\r
456 if (key == identifier) {
\r
460 SymphonyNodeComparator comparator = new SymphonyNodeComparator(pid, identifier);
\r
461 AdapterSymphonyNodeComparator adapterComparator = new AdapterSymphonyNodeComparator(comparator);
\r
463 Collection<Tuple<Node, BootstrapStatus>> leftShortRangeLinksCloned = (Collection<Tuple<Node, BootstrapStatus>>) leftShortRangeLinks.clone();
\r
464 Node targetNode = null;
\r
466 while (targetNode == null && !leftShortRangeLinksCloned.isEmpty()) {
\r
467 Tuple<Node, BootstrapStatus> nearTuple = Collections.min(leftShortRangeLinksCloned, adapterComparator);
\r
468 if (nearTuple.y == BootstrapStatus.ONLINE) {
\r
469 targetNode = nearTuple.x;
\r
471 leftShortRangeLinksCloned.remove(nearTuple);
\r
475 // SPECIAL CASE: NO LEFT NEIGHBOURS. I became the Manager.
\r
476 if (targetNode == null) {
\r
480 SymphonyProtocol symphony = (SymphonyProtocol) targetNode.getProtocol(pid);
\r
481 // Check if it's the situation: right neighbour - key - me. So if i'm the manager or not.
\r
482 boolean ret = isLeftNeighbour(identifier, key) && (!isLeftNeighbour(symphony.getIdentifier(), key) && symphony.getIdentifier() != key);
\r
487 public double getIdentifier() {
\r
491 public Tuple<Message, Message>[] getHistoryMessage() {
\r
492 SoftReference<Tuple<Message, Message>>[] array = messageHistory.toArray(new SoftReference[0]);
\r
493 LinkedList<Tuple<Message, Message>> list = new LinkedList<Tuple<Message, Message>>();
\r
494 for (SoftReference<Tuple<Message, Message>> reference : array) {
\r
495 Tuple<Message, Message> tuple = reference.get();
\r
496 if (tuple != null) {
\r
500 return list.toArray(new Tuple[0]);
\r
503 public void clearHistoryMessage() {
\r
504 messageHistory.clear();
\r
507 private double generateUniqueIdentifier() {
\r
508 boolean duplicated = true;
\r
511 while (duplicated) {
\r
512 id = CommonState.r.nextDouble();
\r
513 duplicated = allIdentifier.contains(id);
\r
516 allIdentifier.add(id);
\r
522 public Object clone() {
\r
523 SymphonyProtocol dolly = new SymphonyProtocol(prefix);
\r
527 public boolean isBootstrapped() {
\r
528 return loggedIntoNetwork == BootstrapStatus.ONLINE;
\r