1 package example.symphony;
\r
4 import java.util.logging.Level;
\r
5 import java.util.logging.Logger;
\r
7 import example.symphony.Message.MessageType;
\r
8 import example.symphony.SymphonyProtocol.BootstrapStatus;
\r
9 import peersim.cdsim.CDProtocol;
\r
10 import peersim.config.Configuration;
\r
11 import peersim.core.CommonState;
\r
12 import peersim.core.Fallible;
\r
13 import peersim.core.Node;
\r
14 import peersim.edsim.EDProtocol;
\r
15 import peersim.transport.Transport;
\r
19 * @author Andrea Esposito <and1989@gmail.com>
\r
21 public class SymphonyNetworkManager implements EDProtocol, CDProtocol {
\r
23 private static final String PAR_SYMPHONY = "symphony";
\r
24 private static final String PAR_TRANSP = "transport";
\r
25 private static final String PAR_ATTEMPTS = "attempts";
\r
26 private static final String PAR_NETSIZE = "networkestimator";
\r
27 private static final String PAR_NUM_TIMEOUT = "nTimeout";
\r
28 private static final String PAR_RELINKING = "relinking";
\r
29 private static final String PAR_RELINKING_LOWER_BOUND = "relinkingLowerBound";
\r
30 private static final String PAR_RELINKING_UPPER_BOUND = "relinkingUpperBound";
\r
31 private static final int DEFAULT_K = 1;
\r
32 private static final int DEFAULT_N = 2;
\r
33 private static final double DEFAULT_RELINKING_LOWER_BOUND = 0.5;
\r
34 private static final double DEFAULT_RELINKING_UPPER_BOUND = 2.0;
\r
35 private final String prefix;
\r
36 private final int symphonyID;
\r
37 private final int transportID;
\r
38 private final int networkEstimatorID;
\r
39 private final int attempts;
\r
40 private final int pid;
\r
41 private final int nTimeout;
\r
42 private final HashMap<Node, Integer> keepAliveMap;
\r
43 private final boolean relinkingProtocolActivated;
\r
44 private final double relinkingUpperBound;
\r
45 private final double relinkingLowerBound;
\r
46 private int k = DEFAULT_K; // Number of Long Range Link
\r
47 private int n = DEFAULT_N; // Estimation Network size
\r
48 private static boolean firstPrintConfig = true;
\r
50 * Estimation Network size at which last long distance link was established, at the beginning -1
\r
51 * to indicate that we never had Long Range Links
\r
53 private int nLink = -1;
\r
54 private int currentAttempts;
\r
56 public SymphonyNetworkManager(String prefix) {
\r
58 this.prefix = prefix;
\r
59 pid = Configuration.lookupPid(prefix.replaceAll("protocol.", ""));
\r
60 symphonyID = Configuration.getPid(prefix + "." + PAR_SYMPHONY);
\r
61 transportID = Configuration.getPid(prefix + "." + PAR_TRANSP);
\r
62 networkEstimatorID = Configuration.getPid(prefix + "." + PAR_NETSIZE);
\r
63 attempts = Configuration.getInt(prefix + "." + PAR_ATTEMPTS);
\r
64 nTimeout = Configuration.getInt(prefix + "." + PAR_NUM_TIMEOUT, 10);
\r
65 relinkingProtocolActivated = !Configuration.getString(prefix + "." + PAR_RELINKING, "on").toLowerCase().equals("off");
\r
66 double relinkingLowerBoundAppo = Configuration.getDouble(prefix + "." + PAR_RELINKING_LOWER_BOUND, DEFAULT_RELINKING_LOWER_BOUND);
\r
67 double relinkingUpperBoundAppo = Configuration.getDouble(prefix + "." + PAR_RELINKING_UPPER_BOUND, DEFAULT_RELINKING_UPPER_BOUND);
\r
68 if (relinkingLowerBoundAppo > relinkingUpperBoundAppo) {
\r
69 relinkingLowerBound = DEFAULT_RELINKING_LOWER_BOUND;
\r
70 relinkingUpperBound = DEFAULT_RELINKING_UPPER_BOUND;
\r
72 relinkingLowerBound = relinkingLowerBoundAppo;
\r
73 relinkingUpperBound = relinkingUpperBoundAppo;
\r
76 keepAliveMap = new HashMap<Node, Integer>();
\r
81 private void printConfig() {
\r
83 if (firstPrintConfig) {
\r
84 firstPrintConfig = false;
\r
85 System.out.println(SymphonyNetworkManager.class.getSimpleName() + " Configuration:");
\r
86 System.out.println("- Attempts per LongRangeLinks: " + attempts);
\r
87 System.out.println("- Number of Timeout before a node is considered OFFLINE (through Keep-alive):" + nTimeout);
\r
88 System.out.println("- Relinking: " + (relinkingProtocolActivated ? "ON" : "OFF"));
\r
89 System.out.println("- Relinking Range: [" + relinkingLowerBound + ", " + relinkingUpperBound + "]");
\r
90 System.out.println("-------------------------------\n");
\r
94 public void join(final Node node, final Node bootstrapNode) throws RoutingException {
\r
95 final SymphonyProtocol bootstrapSymphony = (SymphonyProtocol) bootstrapNode.getProtocol(symphonyID);
\r
96 SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);
\r
99 * Search (through the bootstrap node) and contact the Manager Node of myself such a way to
\r
100 * be able to insert myself into the ring and create the short links
\r
103 bootstrapSymphony.route(bootstrapNode, symphony.getIdentifier(), new Handler() {
\r
105 public void handle(SymphonyProtocol src, Tuple<Node, Double> tuple) {
\r
106 if (tuple == null) {
\r
107 Logger.getLogger(SymphonyNetworkManager.class.getName()).log(Level.SEVERE, "FAIL ROUTE JOIN");
\r
108 node.setFailState(Fallible.DEAD);
\r
112 Node managerNode = tuple.x;
\r
114 Transport transport = (Transport) node.getProtocol(transportID);
\r
115 Message msg = new Message(node, node, MessageType.JOIN);
\r
116 transport.send(node, managerNode, msg, pid);
\r
120 // The Long Range Links are added after that i joined the ring (before i can't because i haven't got the nodes to do the routing)
\r
124 * Conservative Re-Linking (i reuse the ones already created: not all fresh)
\r
128 public void updateLongRangeLinks(Node node) {
\r
129 SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);
\r
130 Transport transport = (Transport) node.getProtocol(transportID);
\r
132 // if too much links i delete the farest ones
\r
133 while (symphony.longRangeLinksOutgoing.size() > k) {
\r
134 Node distantNode = Collections.max(symphony.longRangeLinksOutgoing, new SymphonyNodeComparator(symphonyID, node));
\r
135 symphony.longRangeLinksOutgoing.remove(distantNode);
\r
137 // Communicate to the outgoing node that it ins't anymore one of my long range links
\r
138 Message disconnectMsg = new Message(null, node, MessageType.DISCONNECT_LONG_RANGE_LINK);
\r
139 transport.send(node, distantNode, disconnectMsg, pid);
\r
142 // I can search Long Range Links only if i'm into the ring and i'm able to do routing
\r
143 if (symphony.isBootstrapped()) {
\r
144 // if only few i try again, untill attempts times, to add new ones
\r
145 int difference = k - symphony.longRangeLinksOutgoing.size();
\r
146 currentAttempts = attempts;
\r
147 for (int i = 0; i < difference; i++) {
\r
148 sendLongRangeLinkRequest(symphony, node);
\r
152 private static final int MAX_ANTILOOP_COUNT_MANAGER_MYSELF = 5;
\r
153 private int antiLoopManagerMySelf = 0;
\r
155 private void sendLongRangeLinkRequest(final SymphonyProtocol symphony, final Node node) {
\r
158 double distance = Math.exp((Math.log(n) / Math.log(2)) * (CommonState.r.nextDouble() - 1.0)); // Harmonic Distribution
\r
159 double targetIdentifier = (symphony.getIdentifier() + distance) % 1;
\r
162 symphony.route(node, targetIdentifier, new Handler() {
\r
164 public void handle(SymphonyProtocol src, Tuple<Node, Double> tuple) {
\r
166 if (tuple == null) {
\r
167 Logger.getLogger(SymphonyNetworkManager.class.getName()).log(Level.SEVERE, "FAIL ROUTE SENDLONGRANGELINKREQUEST");
\r
171 Collection<Node> allShortLinks = new LinkedList<Node>();
\r
172 for (Tuple<Node, BootstrapStatus> shortTuple : symphony.leftShortRangeLinks) {
\r
173 allShortLinks.add(shortTuple.x);
\r
175 for (Tuple<Node, BootstrapStatus> shortTuple : symphony.rightShortRangeLinks) {
\r
176 allShortLinks.add(shortTuple.x);
\r
181 * I'm myself one of my short links, special case... i try again without
\r
182 * reduce the attempts for a maximum of MAX_ANTILOOP_COUNT_MANAGER_MYSELF
\r
183 * times after that i start again to reduce the attempts
\r
185 if (tuple.x.equals(node) || allShortLinks.contains(tuple.x)) {
\r
187 if (antiLoopManagerMySelf < MAX_ANTILOOP_COUNT_MANAGER_MYSELF) {
\r
189 antiLoopManagerMySelf++;
\r
190 sendLongRangeLinkRequest(symphony, node);
\r
192 antiLoopManagerMySelf = 0;
\r
197 boolean alreadyAdded = symphony.longRangeLinksOutgoing.contains(tuple.x);
\r
200 * OPINABLE: DESCREASE ATTEMPTS ONLY FOR REJECT? If yes i have to manage
\r
201 * the possible loop (nodi exhaurited so already all added)
\r
203 if (alreadyAdded && currentAttempts > 0) {
\r
205 sendLongRangeLinkRequest(symphony, node);
\r
206 } else if (!alreadyAdded) {
\r
207 Message msg = new Message(null, node, MessageType.REQUEST_LONG_RANGE_LINK);
\r
208 Transport transport = (Transport) node.getProtocol(transportID);
\r
209 transport.send(node, tuple.x, msg, pid);
\r
215 } catch (RoutingException ex) {
\r
218 } while (!routingOk);
\r
221 public void leave(Node node) {
\r
222 SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);
\r
224 if (symphony.loggedIntoNetwork != BootstrapStatus.OFFLINE) {
\r
225 Transport transport = (Transport) node.getProtocol(transportID);
\r
227 symphony.loggedIntoNetwork = BootstrapStatus.OFFLINE;
\r
229 // Communicate that i'm leaving to the outgoing (that i point to) nodes
\r
230 for (Node outgoingNode : symphony.longRangeLinksOutgoing) {
\r
231 Message disconnectMsg = new Message(null, node, MessageType.DISCONNECT_LONG_RANGE_LINK);
\r
232 transport.send(node, outgoingNode, disconnectMsg, pid);
\r
235 // Communicate that i'm leaving to the incoming (that they point to me) nodes
\r
236 for (Node incomingNode : symphony.longRangeLinksIncoming) {
\r
237 Message unavailableMsg = new Message(null, node, MessageType.UNAVAILABLE_LONG_RANGE_LINK);
\r
238 transport.send(node, incomingNode, unavailableMsg, pid);
\r
241 // Communicate to my neighbours (short range links) that i'm leaving and i send to them the near neighbours
\r
242 for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {
\r
243 Message leaveMsg = new Message(symphony.rightShortRangeLinks.clone(), node, MessageType.LEAVE);
\r
244 transport.send(node, leftTuple.x, leaveMsg, pid);
\r
247 for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {
\r
248 Message leaveMsg = new Message(symphony.leftShortRangeLinks.clone(), node, MessageType.LEAVE);
\r
249 transport.send(node, rightTuple.x, leaveMsg, pid);
\r
252 node.setFailState(Fallible.DEAD);
\r
256 public void processEvent(Node node, int pid, Object event) {
\r
258 Message msg = (Message) event;
\r
260 SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);
\r
261 Transport transport = (Transport) node.getProtocol(transportID);
\r
263 Collection<Tuple<Node, BootstrapStatus>> collection = null;
\r
264 switch (msg.getType()) {
\r
266 // I send my current neighbours to the entering node
\r
267 collection = (Collection<Tuple<Node, BootstrapStatus>>) symphony.leftShortRangeLinks.clone();
\r
268 collection.addAll((Collection<Tuple<Node, BootstrapStatus>>) symphony.rightShortRangeLinks.clone());
\r
269 Message responseMsg = new Message(collection, node, MessageType.JOIN_RESPONSE);
\r
270 transport.send(node, msg.getSourceNode(), responseMsg, pid);
\r
273 * Update my neighbours list, adding the new one (for sure it is entering in the
\r
276 * Put to "ONLINE_AND_ALL_NEIGHBOURS_OFFLINE" because maybe the bootstrap phase is
\r
277 * not terminated yet (ashyncronous communication)
\r
279 symphony.leftShortRangeLinks.add(new Tuple<Node, BootstrapStatus>(msg.getSourceNode(), BootstrapStatus.ONLINE_AND_ALL_NEIGHBOURS_OFFLINE));
\r
282 fixNeighbours(node, symphony.leftShortRangeLinks);
\r
284 case JOIN_RESPONSE:
\r
286 Collection<Tuple<Node, BootstrapStatus>> tupleCollection = (Collection<Tuple<Node, BootstrapStatus>>) msg.getBody();
\r
290 * My manager is a right neighbour. The manager is already inside the ring, boostrap
\r
293 symphony.rightShortRangeLinks.add(new Tuple<Node, BootstrapStatus>(msg.getSourceNode(), BootstrapStatus.ONLINE));
\r
295 // Set my neighbours in the correct position
\r
296 for (Tuple<Node, BootstrapStatus> tuple : tupleCollection) {
\r
297 if (SymphonyProtocol.isLeftNeighbour(node, tuple.x)) {
\r
298 symphony.leftShortRangeLinks.add(tuple);
\r
300 symphony.rightShortRangeLinks.add(tuple);
\r
304 fixNeighbours(node, symphony.leftShortRangeLinks);
\r
305 fixNeighbours(node, symphony.rightShortRangeLinks);
\r
307 // Update bootstrap status
\r
308 checkBootstrapStatus(node);
\r
310 // I send the refresh command such a way to exchange the views
\r
311 refreshNeighbours(node);
\r
313 // Update Long Range Links, because it's at the beginning is the same as adding k
\r
314 updateLongRangeLinks(node);
\r
316 case UPDATE_NEIGHBOURS:
\r
318 Collection<Tuple<Node, BootstrapStatus>> collectionCloned = ((Collection<Tuple<Node, BootstrapStatus>>) symphony.leftShortRangeLinks.clone());
\r
319 collectionCloned.addAll(((Collection<Tuple<Node, BootstrapStatus>>) symphony.rightShortRangeLinks.clone()));
\r
321 // Send my neighbours such a way it can also update itself
\r
322 Message responseUpdateMsg = new Message(collectionCloned, node, MessageType.UPDATE_NEIGHBOURS_RESPONSE);
\r
323 transport.send(node, msg.getSourceNode(), responseUpdateMsg, pid);
\r
325 // Update my view with the new node
\r
326 Tuple<Node, BootstrapStatus> neighbourTuple = new Tuple<Node, BootstrapStatus>(msg.getSourceNode(), (BootstrapStatus) msg.getBody());
\r
327 if (SymphonyProtocol.isLeftNeighbour(node, msg.getSourceNode())) {
\r
328 collection = symphony.leftShortRangeLinks;
\r
330 collection = symphony.rightShortRangeLinks;
\r
332 collection.add(neighbourTuple);
\r
334 fixNeighbours(node, collection);
\r
335 fixLookAheadMap(node);
\r
337 case UPDATE_NEIGHBOURS_RESPONSE:
\r
339 Collection<Tuple<Node, BootstrapStatus>> responseCollection = (Collection<Tuple<Node, BootstrapStatus>>) msg.getBody();
\r
341 for (Tuple<Node, BootstrapStatus> neighbourResponseTuple : responseCollection) {
\r
342 if (SymphonyProtocol.isLeftNeighbour(node, neighbourResponseTuple.x)) {
\r
343 collection = symphony.leftShortRangeLinks;
\r
345 collection = symphony.rightShortRangeLinks;
\r
347 collection.add(neighbourResponseTuple);
\r
350 // Fix the neighbours number to the maximum allow and maybe remove myself from the list
\r
351 fixNeighbours(node, symphony.leftShortRangeLinks);
\r
352 fixNeighbours(node, symphony.rightShortRangeLinks);
\r
353 fixLookAheadMap(node);
\r
355 case UPDATE_STATUS:
\r
356 case UPDATE_STATUS_RESPONSE:
\r
358 Node updNode = msg.getSourceNode();
\r
359 BootstrapStatus updStatus = (BootstrapStatus) msg.getBody();
\r
361 // I search the neighbour and i update its status
\r
362 boolean founded = false;
\r
364 // Try to see if it is on the left
\r
365 for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {
\r
366 if (leftTuple.x.equals(updNode)) {
\r
367 symphony.leftShortRangeLinks.remove(leftTuple);
\r
368 symphony.leftShortRangeLinks.add(new Tuple<Node, BootstrapStatus>(updNode, updStatus));
\r
375 // If it isn't on the left i try with the neighbours on the right
\r
377 for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {
\r
378 if (rightTuple.x.equals(updNode)) {
\r
379 symphony.rightShortRangeLinks.remove(rightTuple);
\r
380 symphony.rightShortRangeLinks.add(new Tuple<Node, BootstrapStatus>(updNode, updStatus));
\r
386 fixNeighbours(node, symphony.rightShortRangeLinks);
\r
388 fixNeighbours(node, symphony.leftShortRangeLinks);
\r
391 checkBootstrapStatusAndAlert(node);
\r
393 if (msg.getType() == MessageType.UPDATE_STATUS) {
\r
394 Message responseUpdStatus = new Message(symphony.loggedIntoNetwork, node, MessageType.UPDATE_STATUS_RESPONSE);
\r
395 transport.send(node, updNode, responseUpdStatus, pid);
\r
399 case REQUEST_LONG_RANGE_LINK:
\r
400 MessageType responseType = MessageType.REJECT_LONG_RANGE_LINK;
\r
401 if (symphony.longRangeLinksIncoming.size() < (2 * k)) {
\r
402 boolean added = symphony.longRangeLinksIncoming.add(msg.getSourceNode());
\r
404 responseType = MessageType.ACCEPTED_LONG_RANGE_LINK;
\r
407 Message responseLongLinkMsg = new Message(null, node, responseType);
\r
408 transport.send(node, msg.getSourceNode(), responseLongLinkMsg, pid);
\r
410 case ACCEPTED_LONG_RANGE_LINK:
\r
412 symphony.longRangeLinksOutgoing.add(msg.getSourceNode());
\r
414 case REJECT_LONG_RANGE_LINK:
\r
415 if (currentAttempts > 0) {
\r
417 sendLongRangeLinkRequest(symphony, node);
\r
420 case DISCONNECT_LONG_RANGE_LINK:
\r
421 symphony.longRangeLinksIncoming.remove(msg.getSourceNode());
\r
422 symphony.lookAheadMap.put(msg.getSourceNode(), null);
\r
424 case UNAVAILABLE_LONG_RANGE_LINK:
\r
425 symphony.longRangeLinksOutgoing.remove(msg.getSourceNode());
\r
426 symphony.lookAheadMap.put(msg.getSourceNode(), null);
\r
429 Tuple<Node, BootstrapStatus> foundedTuple = null;
\r
431 // Verify if the node that is leaving is a left neighbour
\r
432 for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {
\r
433 if (leftTuple.x.equals(msg.getSourceNode())) {
\r
434 collection = symphony.leftShortRangeLinks;
\r
435 foundedTuple = leftTuple;
\r
440 // Verify if the node that is leaving is a right neighbour
\r
441 if (collection == null) {
\r
442 for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {
\r
443 if (rightTuple.x.equals(msg.getSourceNode())) {
\r
444 collection = symphony.rightShortRangeLinks;
\r
445 foundedTuple = rightTuple;
\r
451 // if i've found the neighbour i remove it and i add to myself its neighbours
\r
452 if (collection != null) {
\r
453 collection.addAll((Collection<Tuple<Node, BootstrapStatus>>) msg.getBody());
\r
454 collection.remove(foundedTuple);
\r
455 fixNeighbours(node, collection);
\r
457 // Update status and ready to send an alert in case i'm out of the ring
\r
458 checkBootstrapStatusAndAlert(node);
\r
462 Set<Double>[] lookAheadSetArray = new LinkedHashSet[2];
\r
465 * Check if the contacting node is doing lookAhead and in case of affirmative answer
\r
466 * i provide to it the long range link identifiers (according to my routing mode)
\r
468 if ((Boolean) msg.getBody()) {
\r
470 Iterable[] iterableArray;
\r
471 if (symphony.bidirectionalRouting) {
\r
472 iterableArray = new Iterable[]{symphony.longRangeLinksOutgoing, symphony.longRangeLinksIncoming};
\r
474 iterableArray = new Iterable[]{symphony.longRangeLinksOutgoing};
\r
477 for (Iterable<Node> iterable : iterableArray) {
\r
478 lookAheadSetArray[i] = new LinkedHashSet<Double>();
\r
479 Set<Double> lookAheadSet = lookAheadSetArray[i];
\r
480 Iterator<Node> it = iterable.iterator();
\r
481 while (it.hasNext()) {
\r
482 Node longLinkNode = it.next();
\r
483 lookAheadSet.add(((SymphonyProtocol) longLinkNode.getProtocol(symphonyID)).getIdentifier());
\r
489 transport.send(node, msg.getSourceNode(), new Message(lookAheadSetArray, node, MessageType.KEEP_ALIVE_RESPONSE), pid);
\r
491 case KEEP_ALIVE_RESPONSE:
\r
492 // Reset the counter to 0
\r
493 keepAliveMap.put(msg.getSourceNode(), 0);
\r
495 if (symphony.lookAhead) {
\r
496 symphony.lookAheadMap.put(msg.getSourceNode(), (Set<Double>[]) msg.getBody());
\r
505 * Update the status and communicate immediately to the neighbours if the node is gone out from
\r
506 * the ring (and before it was inside)
\r
510 private void checkBootstrapStatusAndAlert(Node node) {
\r
511 SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);
\r
512 BootstrapStatus beforeStatus = symphony.loggedIntoNetwork;
\r
514 checkBootstrapStatus(node);
\r
516 // Instead of waiting that the update happens periodically i do it now because i'm out of the ring and before i wasn't
\r
517 if (symphony.loggedIntoNetwork != beforeStatus && !symphony.isBootstrapped()) {
\r
518 updateBootstrapStatusNeighbours(node, true);
\r
522 private void fixNeighbours(Node node, Collection<Tuple<Node, BootstrapStatus>> neighbours) {
\r
524 SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);
\r
526 // Remove duplicates, remove that ones that are in an obsolete status
\r
527 Collection<Tuple<Node, BootstrapStatus>> removedNeighbours = new LinkedHashSet<Tuple<Node, BootstrapStatus>>();
\r
528 for (Tuple<Node, BootstrapStatus> tuple : neighbours) {
\r
530 // Remove myself from the neighbours list
\r
531 if (tuple.x.equals(node)) {
\r
532 removedNeighbours.add(tuple);
\r
536 EnumSet<BootstrapStatus> status = EnumSet.allOf(BootstrapStatus.class);
\r
537 status.remove(tuple.y);
\r
539 for (BootstrapStatus opposite : status) {
\r
540 Tuple<Node, BootstrapStatus> oppositeNeighbour = new Tuple<Node, BootstrapStatus>(tuple.x, opposite);
\r
541 if (neighbours.contains(oppositeNeighbour)) {
\r
542 if (tuple.y != BootstrapStatus.ONLINE) {
\r
543 removedNeighbours.add(new Tuple<Node, BootstrapStatus>(tuple.x, BootstrapStatus.OFFLINE));
\r
544 if (opposite == BootstrapStatus.ONLINE) {
\r
545 removedNeighbours.add(new Tuple<Node, BootstrapStatus>(tuple.x, BootstrapStatus.ONLINE_AND_ALL_NEIGHBOURS_OFFLINE));
\r
551 neighbours.removeAll(removedNeighbours);
\r
555 * I count the neighbours that are in the ONLINE status but before i remove the ones that
\r
556 * are gone in timeout during the keep-alive procedure because can be someone that is old
\r
557 * but not remove from the exchanging views (UPDATE_NEIGHBOURS) procedure and are not
\r
558 * effectively online. To do anyway the possibility to the node to join again i decrease its
\r
559 * timeout value. This only if the node is ONLINE and so i'm really interested that it is ok
\r
563 int onlineNeighbours = 0;
\r
564 for (Tuple<Node, BootstrapStatus> tuple : neighbours) {
\r
566 Integer value = keepAliveMap.get(tuple.x);
\r
567 if (value != null && value >= nTimeout && tuple.y == BootstrapStatus.ONLINE) {
\r
568 keepAliveMap.put(tuple.x, value - 1);
\r
569 removedNeighbours.add(tuple);
\r
572 if (tuple.y == BootstrapStatus.ONLINE) {
\r
573 onlineNeighbours++;
\r
577 neighbours.removeAll(removedNeighbours);
\r
579 // Fix the neighbours number to the maximum allowed
\r
580 SymphonyNodeComparator comparator = new SymphonyNodeComparator(symphonyID, node);
\r
581 AdapterSymphonyNodeComparator adapterComparator = new AdapterSymphonyNodeComparator(comparator);
\r
582 while (neighbours.size() > symphony.numberShortRangeLinksPerSide) {
\r
583 Tuple<Node, BootstrapStatus> distantTuple = Collections.max(neighbours, adapterComparator);
\r
585 // Mantain the link with the ring
\r
586 if (distantTuple.y == BootstrapStatus.ONLINE) {
\r
587 if (onlineNeighbours > 1) {
\r
588 neighbours.remove(distantTuple);
\r
589 onlineNeighbours--;
\r
592 * If will be only one neighbour that is online i save it and i'm going to
\r
593 * eliminate another one (for sure it'll be not online)
\r
596 Tuple<Node, BootstrapStatus> backupOnlineNeighbour = distantTuple;
\r
597 neighbours.remove(backupOnlineNeighbour);
\r
598 distantTuple = Collections.max(neighbours, adapterComparator);
\r
599 neighbours.add(backupOnlineNeighbour);
\r
600 neighbours.remove(distantTuple);
\r
604 neighbours.remove(distantTuple);
\r
610 public Object clone() {
\r
611 SymphonyNetworkManager dolly = new SymphonyNetworkManager(prefix);
\r
615 public void nextCycle(Node node, int protocolID) {
\r
619 // Update the estimated network size
\r
622 // Update the estimated K
\r
625 // Update the bootstrap status of my neighbours that were joining the ring
\r
626 updateBootstrapStatusNeighbours(node, false);
\r
628 // Refresh the neighbours views
\r
629 refreshNeighbours(node);
\r
631 // I send and check the connection status of the neighbours
\r
634 // Update the bootstrap status
\r
635 checkBootstrapStatus(node);
\r
637 // If it's active i check the Relinking criteria
\r
638 if (relinkingProtocolActivated) {
\r
639 reLinkingProtocol(node);
\r
642 // Update the long range links (conservative)
\r
643 updateLongRangeLinks(node);
\r
649 * @param allNeighbours true, communicate/receive the status update from all the neighbours.
\r
650 * false, communicate/receive the status update only from the neighbours that are NOT ONLINE
\r
653 private void updateBootstrapStatusNeighbours(Node node, boolean allNeighbours) {
\r
654 SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);
\r
655 Transport transport = (Transport) node.getProtocol(transportID);
\r
657 Collection<Tuple<Node, BootstrapStatus>> collection = new LinkedHashSet<Tuple<Node, BootstrapStatus>>();
\r
658 collection.addAll(symphony.leftShortRangeLinks);
\r
659 collection.addAll(symphony.rightShortRangeLinks);
\r
661 for (Tuple<Node, BootstrapStatus> neighbourTuple : collection) {
\r
662 if (allNeighbours || neighbourTuple.y != BootstrapStatus.ONLINE) {
\r
663 Message msg = new Message(symphony.loggedIntoNetwork, node, MessageType.UPDATE_STATUS);
\r
664 transport.send(node, neighbourTuple.x, msg, pid);
\r
669 private void updateN(Node node) {
\r
670 NetworkSizeEstimatorProtocolInterface networkEstimator = (NetworkSizeEstimatorProtocolInterface) node.getProtocol(networkEstimatorID);
\r
671 n = networkEstimator.getNetworkSize(node);
\r
678 * Update the K value with the current expectation of the network size
\r
680 private void updateK(Node node) {
\r
682 SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);
\r
683 if (!symphony.fixedLongRangeLinks) {
\r
684 k = (int) Math.ceil(Math.log(n) / Math.log(2));
\r
690 k = symphony.numberFixedLongRangeLinks;
\r
694 private void refreshNeighbours(Node node) {
\r
695 SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);
\r
696 Transport transport = (Transport) node.getProtocol(transportID);
\r
698 for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {
\r
699 Node leftNode = leftTuple.x;
\r
700 Message updateNeighbourMsg = new Message(symphony.loggedIntoNetwork, node, MessageType.UPDATE_NEIGHBOURS);
\r
701 transport.send(node, leftNode, updateNeighbourMsg, pid);
\r
704 for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {
\r
705 Node rightNode = rightTuple.x;
\r
706 Message updateNeighbourMsg = new Message(symphony.loggedIntoNetwork, node, MessageType.UPDATE_NEIGHBOURS);
\r
707 transport.send(node, rightNode, updateNeighbourMsg, pid);
\r
712 * Method to update the (connection) status of the node. Perform the update to the "up" so:
\r
713 * OFFLINE -> ONLINE_AND_ALL_NEIGHBOURS_OFFLINE -> ONLINE
\r
715 * and to the "down" only: ONLINE -> ONLINE_AND_ALL_NEIGHBOURS_OFFLINE
\r
719 private void checkBootstrapStatus(Node node) {
\r
720 SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);
\r
722 if (symphony.loggedIntoNetwork != BootstrapStatus.OFFLINE) {
\r
724 symphony.loggedIntoNetwork = BootstrapStatus.ONLINE_AND_ALL_NEIGHBOURS_OFFLINE;
\r
726 // Check if i'm inside the ring and i'm able to do routing
\r
727 if (!symphony.leftShortRangeLinks.isEmpty() && !symphony.rightShortRangeLinks.isEmpty()) {
\r
729 boolean leftOk = false;
\r
730 for (Tuple<Node, BootstrapStatus> leftTuple : symphony.leftShortRangeLinks) {
\r
731 if (leftTuple.y == BootstrapStatus.ONLINE) {
\r
738 for (Tuple<Node, BootstrapStatus> rightTuple : symphony.rightShortRangeLinks) {
\r
739 if (rightTuple.y == BootstrapStatus.ONLINE) {
\r
740 symphony.loggedIntoNetwork = BootstrapStatus.ONLINE;
\r
750 * Remove the possible wrong entries from the lookAhead table
\r
752 private void fixLookAheadMap(Node node) {
\r
753 SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);
\r
754 for (Tuple<Node, BootstrapStatus> tuple : symphony.leftShortRangeLinks) {
\r
755 symphony.lookAheadMap.put(tuple.x, null);
\r
757 for (Tuple<Node, BootstrapStatus> tuple : symphony.rightShortRangeLinks) {
\r
758 symphony.lookAheadMap.put(tuple.x, null);
\r
763 * Sent keep-alive messages to verify if the links still online
\r
765 * if enable the lookAhead mode i require the neighbours list from my neighbours (1-lookAhead).
\r
767 * Note: I don't reuse the UPDATE_STATUS messages because i want to mantain separate the
\r
768 * semantic and have more clear source code
\r
770 private void keepAlive(Node node) {
\r
772 SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);
\r
773 Transport transport = (Transport) node.getProtocol(transportID);
\r
775 // Send and check for the long range links (both incoming and outgoing)
\r
776 for (Iterable<Node> iterable : new Iterable[]{symphony.longRangeLinksOutgoing, symphony.longRangeLinksIncoming}) {
\r
777 Iterator<Node> longLinksIterator = iterable.iterator();
\r
778 while (longLinksIterator.hasNext()) {
\r
779 Node longLinkNode = longLinksIterator.next();
\r
780 Integer value = keepAliveMap.get(longLinkNode);
\r
781 if (value == null) {
\r
786 * Verify if i reached the sufficient time of sending and not receiving an answer
\r
787 * and so i can consider that node as disconnected
\r
789 if (value >= nTimeout) {
\r
790 symphony.lookAheadMap.put(longLinkNode, null); // Do it anyway if it's enabled the lookAhead or not
\r
791 longLinksIterator.remove();
\r
793 keepAliveMap.put(longLinkNode, value + 1);
\r
795 Message keepAliveMsg = new Message(symphony.lookAhead, node, MessageType.KEEP_ALIVE);
\r
796 transport.send(node, longLinkNode, keepAliveMsg, pid);
\r
801 // Send and check for the short links
\r
802 for (Iterable<Tuple<Node, BootstrapStatus>> iterable : new Iterable[]{symphony.rightShortRangeLinks, symphony.leftShortRangeLinks}) {
\r
803 Iterator<Tuple<Node, BootstrapStatus>> shortLinksIterator = iterable.iterator();
\r
804 while (shortLinksIterator.hasNext()) {
\r
805 Node shortLinkNode = shortLinksIterator.next().x;
\r
806 Integer value = keepAliveMap.get(shortLinkNode);
\r
807 if (value == null) {
\r
811 // the same as above
\r
812 if (value >= nTimeout) {
\r
813 shortLinksIterator.remove();
\r
815 keepAliveMap.put(shortLinkNode, value + 1);
\r
817 // LookAhead is not to be performed to the short links!
\r
818 Message keepAliveMsg = new Message(false, node, MessageType.KEEP_ALIVE);
\r
819 transport.send(node, shortLinkNode, keepAliveMsg, pid);
\r
826 * Implement the Re-Linking criteria of the Long Range Links. It does the complete refresh. The
\r
827 * repopulation is done through the 'updateLongRangeLinks' method.
\r
829 private void reLinkingProtocol(Node node) {
\r
830 // I do the check only if i succeed at least one time to create a long range link
\r
832 double criterionValue = n / nLink;
\r
834 if (!(criterionValue >= relinkingLowerBound && criterionValue <= relinkingUpperBound)) {
\r
837 * Not explicitly precised in the paper: if i haven't created a new link i update
\r
838 * anyway nLink because can happen a special case that i will not be able to create
\r
839 * links because the reLinkingProtocol procedure is "faster".
\r
843 SymphonyProtocol symphony = (SymphonyProtocol) node.getProtocol(symphonyID);
\r
844 Transport transport = (Transport) node.getProtocol(transportID);
\r
846 // Communicate to the all Outgoing Long Range Links that they aren't anymore
\r
847 for (Node longRangeLinkOutgoingNode : symphony.longRangeLinksOutgoing) {
\r
848 Message disconnectMsg = new Message(null, node, MessageType.DISCONNECT_LONG_RANGE_LINK);
\r
849 transport.send(node, longRangeLinkOutgoingNode, disconnectMsg, pid);
\r
852 symphony.longRangeLinksOutgoing.clear();
\r
857 public int getK() {
\r
861 public int getN() {
\r