Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
kill another out of date script
[simgrid.git] / contrib / psg / src / example / symphony / SymphonyProtocol.java
1 package example.symphony;\r
2 \r
3 import java.lang.ref.SoftReference;\r
4 import java.util.*;\r
5 import java.util.logging.Level;\r
6 import java.util.logging.Logger;\r
7 \r
8 import example.symphony.Message.MessageType;\r
9 import peersim.config.Configuration;\r
10 import peersim.core.CommonState;\r
11 import peersim.core.Node;\r
12 import peersim.edsim.EDProtocol;\r
13 import peersim.transport.Transport;\r
14 \r
15 /**\r
16  *\r
17  * @author Andrea Esposito <and1989@gmail.com>\r
18  */\r
19 public class SymphonyProtocol implements EDProtocol {\r
20 \r
21     private static final String PAR_SHORT_LINK = "shortlink";\r
22     private static final String PAR_LONG_LINK = "longlink";\r
23     private static final String PAR_TRANSP = "transport";\r
24     private static final String PAR_ROUTING = "routing";\r
25     private static final String PAR_LOOKAHEAD = "lookahead";\r
26     private static Set<Double> allIdentifier = new HashSet<Double>();\r
27     private final String prefix;\r
28     private static int pid;\r
29     private final int transportID;\r
30     private final double identifier;\r
31     public final int sequentialIdentifier;\r
32     private static int sequentialCounter = 0;\r
33     public final int numberShortRangeLinksPerSide;\r
34     public final boolean bidirectionalRouting;\r
35     public final boolean lookAhead;\r
36     public final boolean fixedLongRangeLinks;\r
37     public final int numberFixedLongRangeLinks;\r
38     public LinkedHashSet<Node> longRangeLinksOutgoing;\r
39     public LinkedHashSet<Node> longRangeLinksIncoming;\r
40     public LinkedHashSet<Tuple<Node, BootstrapStatus>> rightShortRangeLinks;\r
41     public LinkedHashSet<Tuple<Node, BootstrapStatus>> leftShortRangeLinks;\r
42     /**\r
43      * Array Contract: at position 0 -> OutgoingLongRangeLinks, 1 -> IncomingLongRangeLinks\r
44      */\r
45     public final LinkedHashMap<Node, Set<Double>[]> lookAheadMap;\r
46     private HashMap<Double, Handler> mapHandler;\r
47     /**\r
48      * IDs Set to verify if there are cycles\r
49      */\r
50     private Set<Long> messageHistoryID;\r
51     /**\r
52      *\r
53      * Tuple chronology that contains: <received message, the possible answer message>\r
54      *\r
55      * I use SoftReference as a trade off between memory usage and accurancy\r
56      */\r
57     private Set<SoftReference<Tuple<Message, Message>>> messageHistory;\r
58     private static boolean firstPrintConfig = true;\r
59 \r
60     public enum BootstrapStatus {\r
61 \r
62         NEW, OFFLINE, ONLINE_AND_ALL_NEIGHBOURS_OFFLINE, ONLINE\r
63     }\r
64     public BootstrapStatus loggedIntoNetwork;\r
65 \r
66     public SymphonyProtocol(String prefix) {\r
67 \r
68         this.prefix = prefix;\r
69         pid = Configuration.lookupPid(prefix.replaceAll("protocol.", ""));\r
70         transportID = Configuration.getPid(prefix + "." + PAR_TRANSP);\r
71         numberShortRangeLinksPerSide = Configuration.getInt(prefix + "." + PAR_SHORT_LINK, 2) / 2;\r
72         bidirectionalRouting = !Configuration.getString(prefix + "." + PAR_ROUTING, "bidirectional").toLowerCase().equals("unidirectional");\r
73         lookAhead = !Configuration.getString(prefix + "." + PAR_LOOKAHEAD, "on").toLowerCase().equals("off");\r
74         numberFixedLongRangeLinks = Configuration.getInt(prefix + "." + PAR_LONG_LINK, -1);\r
75         fixedLongRangeLinks = numberFixedLongRangeLinks >= 0;\r
76 \r
77         longRangeLinksOutgoing = new LinkedHashSet<Node>();\r
78         longRangeLinksIncoming = new LinkedHashSet<Node>();\r
79         rightShortRangeLinks = new LinkedHashSet<Tuple<Node, BootstrapStatus>>();\r
80         leftShortRangeLinks = new LinkedHashSet<Tuple<Node, BootstrapStatus>>();\r
81         lookAheadMap = new LinkedHashMap<Node, Set<Double>[]>();\r
82 \r
83         identifier = generateUniqueIdentifier();\r
84         sequentialIdentifier = sequentialCounter++;\r
85 \r
86         mapHandler = new HashMap<Double, Handler>();\r
87 \r
88         messageHistoryID = new HashSet<Long>();\r
89         messageHistory = new LinkedHashSet<SoftReference<Tuple<Message, Message>>>();\r
90         loggedIntoNetwork = BootstrapStatus.NEW;\r
91 \r
92         printConfig();\r
93     }\r
94 \r
95     private void printConfig() {\r
96 \r
97         if (firstPrintConfig) {\r
98             firstPrintConfig = false;\r
99             System.out.println(SymphonyProtocol.class.getSimpleName() + " Configuration:");\r
100             System.out.println("- Number of short range links per side: " + numberShortRangeLinksPerSide);\r
101             System.out.println("- Number of long range links: " + (fixedLongRangeLinks ? numberFixedLongRangeLinks : "log(n)"));\r
102             System.out.println("- Routing mode: " + (bidirectionalRouting ? "Bidirectional" : "Unidirectional"));\r
103             System.out.println("- LookAhead status: " + (lookAhead ? "ON" : "OFF"));\r
104             System.out.println("-------------------------------\n");\r
105         }\r
106     }\r
107 \r
108     /**\r
109      *\r
110      * Method to identify the next node that has to be contacted. It's going to be used the mode\r
111      * that is described into the configuration file\r
112      */\r
113     public Node getCandidateForRouting(double identifierToRoute) throws RoutingException {\r
114         if (bidirectionalRouting) {\r
115             return getCandidateForBidirectionalRoute(identifierToRoute);\r
116         } else {\r
117             return getCandidateForUnidirectionalRoute(identifierToRoute);\r
118         }\r
119     }\r
120 \r
121     /**\r
122      *\r
123      * Method to individuate the next node that as to be contacted through Unidirectional Routing\r
124      * mode\r
125      */\r
126     public Node getCandidateForUnidirectionalRoute(double identifierToRoute) throws RoutingException {\r
127 \r
128         LinkedHashSet<Node> allLinks = new LinkedHashSet<Node>();\r
129         Node manager = putShortRangeLinksIntoContainerForRouting(allLinks, identifierToRoute);\r
130 \r
131         if (manager != null) {\r
132             return manager;\r
133         }\r
134 \r
135         allLinks.addAll(longRangeLinksOutgoing);\r
136 \r
137         return findClosestNode(identifierToRoute, allLinks, true);\r
138     }\r
139 \r
140     /**\r
141      * Method to individuate the next node that as to be contacted through Bidirectional Routing\r
142      * mode\r
143      */\r
144     public Node getCandidateForBidirectionalRoute(double identifierToRoute) throws RoutingException {\r
145 \r
146         LinkedHashSet<Node> allLinks = new LinkedHashSet<Node>();\r
147         Node manager = putShortRangeLinksIntoContainerForRouting(allLinks, identifierToRoute);\r
148 \r
149         if (manager != null) {\r
150             return manager;\r
151         }\r
152 \r
153         allLinks.addAll(longRangeLinksOutgoing);\r
154         allLinks.addAll(longRangeLinksIncoming);\r
155 \r
156         return findClosestNode(identifierToRoute, allLinks, false);\r
157     }\r
158 \r
159     /**\r
160      * @return Null if it is NOT found the manager. Node if it is found.\r
161      */\r
162     private Node putShortRangeLinksIntoContainerForRouting(Set<Node> container, double identifierToRoute) {\r
163         for (Tuple<Node, BootstrapStatus> rightTuple : rightShortRangeLinks) {\r
164             if (rightTuple.y == BootstrapStatus.ONLINE) {\r
165                 container.add(rightTuple.x);\r
166             }\r
167         }\r
168 \r
169         if (!container.isEmpty()) {\r
170 \r
171             // Special case: i verify if the neighbour at my right (ONLINE) is the manager\r
172             SymphonyNodeComparator comparator = new SymphonyNodeComparator(pid, identifier);\r
173             Node nearRightNeighbour = Collections.min(container, comparator);\r
174             if (nearRightNeighbour != null) {\r
175                 SymphonyProtocol symphony = (SymphonyProtocol) nearRightNeighbour.getProtocol(pid);\r
176                 if (!isLeftNeighbour(identifier, identifierToRoute) && isLeftNeighbour(symphony.getIdentifier(), identifierToRoute)) {\r
177                     return nearRightNeighbour;\r
178                 }\r
179             }\r
180         }\r
181 \r
182         for (Tuple<Node, BootstrapStatus> leftTuple : leftShortRangeLinks) {\r
183             if (leftTuple.y == BootstrapStatus.ONLINE) {\r
184                 container.add(leftTuple.x);\r
185             }\r
186         }\r
187 \r
188         return null;\r
189     }\r
190 \r
191     /**\r
192      *\r
193      * Individuates effectively the next candidate for the routing. Checks if the lookahead is\r
194      * activated and in case of affirmative answer it's going to use that information.\r
195      *\r
196      * @param identifierToRoute Identifier to reach\r
197      * @param container Candidate Nodes Container\r
198      * @param clockwise true, does unidirectional routing. false, does bidirectional routing.\r
199      * @return The nearest node to reach identifierToRoute\r
200      * @throws RoutingException Throw in case no candidate is found\r
201      */\r
202     public Node findClosestNode(final double identifierToRoute, final Iterable<Node> container, final boolean clockwise) throws RoutingException {\r
203         Node ret = null;\r
204         double min = Double.MAX_VALUE;\r
205 \r
206         for (Node node : container) {\r
207             SymphonyProtocol symphonyNodeContainer = (SymphonyProtocol) node.getProtocol(pid);\r
208             double realCandidateIdentifier = symphonyNodeContainer.getIdentifier();\r
209 \r
210             Set<Double> candidateIdentifierSet = new LinkedHashSet<Double>();\r
211             candidateIdentifierSet.add(realCandidateIdentifier);\r
212 \r
213             boolean lookAheadClockwise = true;\r
214 \r
215             /*\r
216              *\r
217              * If lookahead is activated add all the reachable identifiers. No checks are performed\r
218              * on the node type (short/long) because at maximum the map return null.\r
219              */\r
220             if (lookAhead) {\r
221                 Set<Double>[] lookAheadIdentifierSetArray = lookAheadMap.get(node);\r
222 \r
223                 if (lookAheadIdentifierSetArray != null) {\r
224                     Set<Double> lookAheadIdentifierSet = lookAheadIdentifierSetArray[0];\r
225 \r
226                     if (lookAheadIdentifierSet == null) {\r
227                         lookAheadIdentifierSet = new LinkedHashSet<Double>();\r
228                     }\r
229 \r
230                     /*\r
231                      *\r
232                      * If bidirectional routing is going to be performed so i put into account also\r
233                      * the Incoming Long Range Links of the current neighbour\r
234                      */\r
235                     if (bidirectionalRouting && lookAheadIdentifierSetArray[1] != null) {\r
236                         lookAheadIdentifierSet.addAll(lookAheadIdentifierSetArray[1]);\r
237                         lookAheadClockwise = false;\r
238                     }\r
239 \r
240                     if (!lookAheadIdentifierSet.isEmpty()) {\r
241                         candidateIdentifierSet.addAll(lookAheadIdentifierSet);\r
242                     }\r
243                 }\r
244             }\r
245 \r
246             for (Double candidateIdentifier : candidateIdentifierSet) {\r
247                 // if it is a my neighbour i use my routing mode instead if it is a looAhead one i use its routing mode\r
248                 boolean currentClockwise = candidateIdentifier.equals(realCandidateIdentifier) ? clockwise : lookAheadClockwise;\r
249 \r
250                 double distance = Math.abs(candidateIdentifier - identifierToRoute);\r
251                 distance = Math.min(distance, 1.0 - distance);\r
252 \r
253                 // if clockwise i have to exclude the case: candidateIdentifier - indentifierToRoute - identifier\r
254                 if (currentClockwise) {\r
255                     if (isLeftNeighbour(candidateIdentifier, identifierToRoute)) {\r
256 \r
257                         // Special case (0.9 - 0.1) the normal order is not more meanful to decide the side\r
258                         if (identifierToRoute >= candidateIdentifier) {\r
259                             distance = identifierToRoute - candidateIdentifier;\r
260                         } else {\r
261                             distance = (1.0 - candidateIdentifier) + identifierToRoute;\r
262                         }\r
263                     } else {\r
264                         distance = (1.0 - (candidateIdentifier - identifierToRoute)) % 1;\r
265                     }\r
266                 }\r
267 \r
268                 /*\r
269                  *\r
270                  * Priority to the node that i'm directly connected and only after i use the\r
271                  * lookAhead information\r
272                  */\r
273                 if (min >= Math.abs(distance)\r
274                         && (candidateIdentifier.equals(realCandidateIdentifier)\r
275                         || ret == null\r
276                         || min > Math.abs(distance))) {\r
277                     ret = node;\r
278                     min = Math.abs(distance);\r
279                 }\r
280             }\r
281         }\r
282 \r
283         if (ret == null) {\r
284             throw new RoutingException("Impossible do routing. [Hit: Neighbour links (maybe) not yet online.");\r
285         }\r
286 \r
287         return ret;\r
288     }\r
289 \r
290     /**\r
291      *\r
292      * @param neighbourNode Neighbour Node\r
293      * @return true if the node is a left neighbour (or itself), false if it is a right one\r
294      */\r
295     public static boolean isLeftNeighbour(Node rootNode, Node neighbourNode) {\r
296         SymphonyProtocol rootSymphony = (SymphonyProtocol) rootNode.getProtocol(pid);\r
297         SymphonyProtocol neighbourSymphony = (SymphonyProtocol) neighbourNode.getProtocol(pid);\r
298 \r
299         return isLeftNeighbour(rootSymphony.getIdentifier(), neighbourSymphony.getIdentifier());\r
300     }\r
301 \r
302     public static boolean isLeftNeighbour(double rootIdentifier, double neighbourIdentifier) {\r
303 \r
304         // I calculate putting the hypotesis that i have to translate/"normalize", after i'll check if it was useless\r
305         double traslateRootIdentifier = (rootIdentifier + 0.5) % 1;\r
306         double traslateNeighbourIdentifier = (neighbourIdentifier + 0.5) % 1;\r
307         double distance = traslateNeighbourIdentifier - traslateRootIdentifier;\r
308 \r
309         // I verify if the neighbourIdentifier is over half ring, if yes i don't need to do the translation/"normalization"\r
310         if ((neighbourIdentifier + 0.5) != traslateNeighbourIdentifier) {\r
311             distance = neighbourIdentifier - rootIdentifier;\r
312         }\r
313 \r
314         return distance >= 0 && distance <= 0.5;\r
315     }\r
316 \r
317     public void route(Node src, double key, Handler handler) throws RoutingException {\r
318 \r
319         mapHandler.put(key, handler);\r
320 \r
321         Message msg = new Message(key, src, MessageType.ROUTE);\r
322 \r
323         Node targetNode = src;\r
324 \r
325         if (!isManagerOf(key)) {\r
326             targetNode = getCandidateForRouting(key);\r
327             Transport transport = (Transport) src.getProtocol(transportID);\r
328             transport.send(src, targetNode, msg, pid);\r
329         }\r
330 \r
331         // Insert the message into the chronology\r
332         Tuple<Message, Message> historyTuple = new Tuple<Message, Message>();\r
333         try {\r
334             historyTuple.x = msg;\r
335             historyTuple.y = (Message) msg.clone();\r
336             historyTuple.y.setCurrentHop(targetNode);\r
337         } catch (CloneNotSupportedException ex) {\r
338             Logger.getLogger(SymphonyProtocol.class.getName()).log(Level.SEVERE, "Impossible to clonate the message!");\r
339             historyTuple.x = null;\r
340             historyTuple.y = msg;\r
341             msg.setCurrentHop(targetNode);\r
342         }\r
343         messageHistory.add(new SoftReference<Tuple<Message, Message>>(historyTuple));\r
344         messageHistoryID.add(msg.getID());\r
345 \r
346         /*\r
347          *\r
348          * If i am the manager (brutally through the reference), i don't do the loopback routing but\r
349          * i soddisfy immediately the request\r
350          */\r
351         if (targetNode == src) {\r
352 \r
353             // Uppdate the chronology\r
354             historyTuple.y = new Message(key, targetNode, MessageType.ROUTE_RESPONSE);\r
355 \r
356             Tuple<Node, Double> tuple = new Tuple<Node, Double>(src, key);\r
357             mapHandler.remove(key);\r
358             handler.handle(this, tuple);\r
359         }\r
360     }\r
361 \r
362     public void processEvent(Node node, int pid, Object event) {\r
363         Message msg = (Message) event;\r
364         msg.incrementHop(); // I increment the message Hop\r
365 \r
366         Tuple<Message, Message> historyTuple = new Tuple<Message, Message>();\r
367         try {\r
368             // I clone the message such a way to store into the chronology the hop sender's information\r
369             historyTuple.x = (Message) msg.clone();\r
370         } catch (CloneNotSupportedException ex) {\r
371             Logger.getLogger(SymphonyProtocol.class.getName()).log(Level.SEVERE, "Impossible to clonate the message!");\r
372             historyTuple.x = msg;\r
373         }\r
374 \r
375         messageHistory.add(new SoftReference<Tuple<Message, Message>>(historyTuple));\r
376 \r
377         Double key;\r
378         Transport transport;\r
379         Handler handler;\r
380 \r
381         // Individuate cycles\r
382         if (messageHistoryID.contains(msg.getID())) {\r
383             Message responseMsg = new Message(msg, node, MessageType.ROUTE_FAIL);\r
384 \r
385             historyTuple.y = responseMsg;\r
386 \r
387             transport = (Transport) node.getProtocol(transportID);\r
388             transport.send(node, msg.getSourceNode(), responseMsg, pid);\r
389             return;\r
390         }\r
391 \r
392         /*\r
393          * If i'm arrived till here means that i'm not into a cycle --> i insert the message ID into\r
394          * the chronology\r
395          */\r
396         messageHistoryID.add(msg.getID());\r
397 \r
398         switch (msg.getType()) {\r
399             case ROUTE:\r
400                 key = (Double) msg.getBody();\r
401                 Logger.getLogger(SymphonyProtocol.class.getName()).log(Level.FINEST, key + " " + identifier);\r
402                 if (isManagerOf(key)) {\r
403                     transport = (Transport) msg.getSourceNode().getProtocol(transportID);\r
404                     Message responseMsg = new Message(new Tuple<Node, Double>(node, key), node, MessageType.ROUTE_RESPONSE);\r
405                     historyTuple.y = responseMsg;\r
406                     transport.send(node, msg.getSourceNode(), responseMsg, pid);\r
407                 } else {\r
408                     try {\r
409                         Node targetNode = getCandidateForRouting(key);\r
410 \r
411                         try {\r
412                             // I clone the message such a way to store the info (into the chronology) of the hop receiver\r
413                             historyTuple.y = (Message) msg.clone();\r
414                             historyTuple.y.setCurrentHop(targetNode);\r
415                         } catch (CloneNotSupportedException ex) {\r
416                             Logger.getLogger(SymphonyProtocol.class.getName()).log(Level.SEVERE, "Impossible to clonate the message!");\r
417                             historyTuple.y = msg;\r
418                             msg.setCurrentHop(targetNode);\r
419                         }\r
420 \r
421                         transport = (Transport) node.getProtocol(transportID);\r
422                         transport.send(node, targetNode, msg, pid);\r
423                     } catch (RoutingException ex) {\r
424                         /*\r
425                          *\r
426                          * I send the same message to myself (it is going to queue into the event\r
427                          * queue and in this way i "earn" time (postpone) and i hope that the\r
428                          * network will be ok in the meanwhile)\r
429                          */\r
430                         historyTuple.y = msg;\r
431                         msg.setCurrentHop(node);\r
432                         transport = (Transport) node.getProtocol(transportID);\r
433                         transport.send(node, node, msg, pid);\r
434                     }\r
435                 }\r
436                 break;\r
437             case ROUTE_RESPONSE:\r
438                 Tuple<Node, Double> tuple = (Tuple<Node, Double>) msg.getBody();\r
439                 key = tuple.y;\r
440                 handler = mapHandler.get(key);\r
441                 mapHandler.remove(key);\r
442                 handler.handle(this, tuple);\r
443                 break;\r
444             case ROUTE_FAIL:\r
445                 Message requestMsg = (Message) msg.getBody();\r
446                 key = (Double) requestMsg.getBody();\r
447                 handler = mapHandler.get(key);\r
448                 mapHandler.remove(key);\r
449                 handler.handle(this, null);\r
450                 break;\r
451         }\r
452     }\r
453 \r
454     public boolean isManagerOf(double key) {\r
455 \r
456         if (key == identifier) {\r
457             return true;\r
458         }\r
459 \r
460         SymphonyNodeComparator comparator = new SymphonyNodeComparator(pid, identifier);\r
461         AdapterSymphonyNodeComparator adapterComparator = new AdapterSymphonyNodeComparator(comparator);\r
462 \r
463         Collection<Tuple<Node, BootstrapStatus>> leftShortRangeLinksCloned = (Collection<Tuple<Node, BootstrapStatus>>) leftShortRangeLinks.clone();\r
464         Node targetNode = null;\r
465 \r
466         while (targetNode == null && !leftShortRangeLinksCloned.isEmpty()) {\r
467             Tuple<Node, BootstrapStatus> nearTuple = Collections.min(leftShortRangeLinksCloned, adapterComparator);\r
468             if (nearTuple.y == BootstrapStatus.ONLINE) {\r
469                 targetNode = nearTuple.x;\r
470             } else {\r
471                 leftShortRangeLinksCloned.remove(nearTuple);\r
472             }\r
473         }\r
474 \r
475         // SPECIAL CASE: NO LEFT NEIGHBOURS. I became the Manager.\r
476         if (targetNode == null) {\r
477             return true;\r
478         }\r
479 \r
480         SymphonyProtocol symphony = (SymphonyProtocol) targetNode.getProtocol(pid);\r
481         // Check if it's the situation: right neighbour - key - me. So if i'm the manager or not.\r
482         boolean ret = isLeftNeighbour(identifier, key) && (!isLeftNeighbour(symphony.getIdentifier(), key) && symphony.getIdentifier() != key);\r
483 \r
484         return ret;\r
485     }\r
486 \r
487     public double getIdentifier() {\r
488         return identifier;\r
489     }\r
490 \r
491     public Tuple<Message, Message>[] getHistoryMessage() {\r
492         SoftReference<Tuple<Message, Message>>[] array = messageHistory.toArray(new SoftReference[0]);\r
493         LinkedList<Tuple<Message, Message>> list = new LinkedList<Tuple<Message, Message>>();\r
494         for (SoftReference<Tuple<Message, Message>> reference : array) {\r
495             Tuple<Message, Message> tuple = reference.get();\r
496             if (tuple != null) {\r
497                 list.add(tuple);\r
498             }\r
499         }\r
500         return list.toArray(new Tuple[0]);\r
501     }\r
502 \r
503     public void clearHistoryMessage() {\r
504         messageHistory.clear();\r
505     }\r
506 \r
507     private double generateUniqueIdentifier() {\r
508         boolean duplicated = true;\r
509         Double id = null;\r
510 \r
511         while (duplicated) {\r
512             id = CommonState.r.nextDouble();\r
513             duplicated = allIdentifier.contains(id);\r
514         }\r
515 \r
516         allIdentifier.add(id);\r
517 \r
518         return id;\r
519     }\r
520 \r
521     @Override\r
522     public Object clone() {\r
523         SymphonyProtocol dolly = new SymphonyProtocol(prefix);\r
524         return dolly;\r
525     }\r
526 \r
527     public boolean isBootstrapped() {\r
528         return loggedIntoNetwork == BootstrapStatus.ONLINE;\r
529     }\r
530 }\r