Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge pull request #179 from Takishipp/signals
[simgrid.git] / examples / java / dht / chord / Node.java
1 /* Copyright (c) 2006-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 package dht.chord;
7
8 import org.simgrid.msg.Msg;
9 import org.simgrid.msg.Comm;
10 import org.simgrid.msg.Host;
11 import org.simgrid.msg.Task;
12 import org.simgrid.msg.Process;
13 import org.simgrid.msg.MsgException;
14 import org.simgrid.msg.TimeoutException;
15 public class Node extends Process {
16   protected int id;
17   protected String mailbox;
18   protected int predId;
19   protected String predMailbox;
20   protected int nextFingerToFix;
21   protected Comm commReceive;
22   ///Last time I changed a finger or my predecessor
23   protected double lastChangeDate;
24   private int[] fingers;
25
26   public Node(Host host, String name, String[] args) {
27     super(host,name,args);
28   }
29
30   @Override
31   public void main(String[] args) throws MsgException {
32     if (args.length != 2 && args.length != 4) {
33       Msg.info("You need to provide 2 or 4 arguments.");
34       return;
35     }
36     double initTime = Msg.getClock();
37     int i;
38     boolean joinSuccess;
39     double deadline;
40
41     double nextStabilizeDate = initTime + Common.PERIODIC_STABILIZE_DELAY;
42     double nextFixFingersDate = initTime + Common.PERIODIC_FIX_FINGERS_DELAY;
43     double nextCheckPredecessorDate = initTime + Common.PERIODIC_CHECK_PREDECESSOR_DELAY;
44     double nextLookupDate = initTime + Common.PERIODIC_LOOKUP_DELAY;
45
46     mailbox = args[0];
47     id = Integer.parseInt(args[0]);
48
49     fingers = new int[Common.NB_BITS];
50     for (i = 0; i < Common.NB_BITS; i++) {
51       fingers[i] = -1;
52       setFinger(i,this.id);
53     }
54
55     //First node
56     if (args.length == 2) {
57       deadline = Integer.parseInt(args[1]);
58       create();
59       joinSuccess = true;
60     } else {
61       int knownId = Integer.valueOf(args[1]);
62       deadline = Integer.valueOf(args[3]);
63       Msg.debug("Hey! Let's join the system with the id " + id + ".");
64
65       joinSuccess = join(knownId);
66     }
67     if (joinSuccess) {
68       double currentClock = Msg.getClock();
69       while (currentClock < (initTime + deadline) && currentClock < Common.MAX_SIMULATION_TIME) {
70         if (commReceive == null) {
71           commReceive = Task.irecv(this.mailbox);
72         }
73         try {
74           if (!commReceive.test()) {
75             if (currentClock >= nextStabilizeDate) {
76               stabilize();
77               nextStabilizeDate = Msg.getClock() + Common.PERIODIC_STABILIZE_DELAY;
78             } else if (currentClock >= nextFixFingersDate) {
79               fixFingers();
80               nextFixFingersDate = Msg.getClock() + Common.PERIODIC_FIX_FINGERS_DELAY;
81             } else if (currentClock >= nextCheckPredecessorDate) {
82               this.checkPredecessor();
83               nextCheckPredecessorDate = Msg.getClock() + Common.PERIODIC_CHECK_PREDECESSOR_DELAY;
84             } else if (currentClock >= nextLookupDate) {
85               this.randomLookup();
86               nextLookupDate = Msg.getClock() + Common.PERIODIC_LOOKUP_DELAY;
87             } else {
88               waitFor(5);
89             }
90             currentClock = Msg.getClock();
91           } else {
92             handleTask(commReceive.getTask());
93             currentClock = Msg.getClock();
94             commReceive = null;
95           }
96         }
97         catch (Exception e) {
98           currentClock = Msg.getClock();
99           commReceive = null;
100         }
101       }
102       leave();
103       if (commReceive != null) {
104         commReceive = null;
105       }
106     } else {
107       Msg.info("I couldn't join the ring");
108     }
109   }
110
111   private void handleTask(Task task) {
112     if (task instanceof FindSuccessorTask) {
113       FindSuccessorTask fTask = (FindSuccessorTask)task;
114       Msg.debug("Receiving a 'Find Successor' request from " + fTask.getIssuerHostName() + " for id " + 
115                 fTask.getRequestId());
116       // is my successor the successor?
117       if (isInInterval(fTask.getRequestId(), this.id + 1, fingers[0])) {
118         Msg.debug("Send the request to " + fTask.getAnswerTo() + " with answer " + fingers[0]);
119         FindSuccessorAnswerTask answer = new FindSuccessorAnswerTask(getHost().getName(), mailbox, fingers[0]);
120         answer.dsend(fTask.getAnswerTo());
121       } else {
122         // otherwise, forward the request to the closest preceding finger in my table
123         int closest = closestPrecedingNode(fTask.getRequestId());
124         Msg.debug("Forward the request to " + closest);
125         fTask.dsend(Integer.toString(closest));
126       }
127     } else if (task instanceof GetPredecessorTask) {
128       GetPredecessorTask gTask = (GetPredecessorTask)(task);
129       Msg.debug("Receiving a 'Get Predecessor' request from " + gTask.getIssuerHostName());
130       GetPredecessorAnswerTask answer = new GetPredecessorAnswerTask(getHost().getName(), mailbox, predId);
131       answer.dsend(gTask.getAnswerTo());
132     } else if (task instanceof NotifyTask) {
133       NotifyTask nTask = (NotifyTask)task;
134       notify(nTask.getRequestId());
135     } else {
136       Msg.debug("Ignoring unexpected task of type:" + task);
137     }
138   }
139
140   private void leave() {
141     Msg.debug("Well Guys! I Think it's time for me to quit ;)");
142     // TODO: Notify my successor and predecessor.
143   }
144
145   /** @brief Initializes the current node as the first one of the system  */
146   private void create() {
147     Msg.debug("Create a new Chord ring...");
148     setPredecessor(-1);
149   }
150
151   // Makes the current node join the ring, knowing the id of a node already in the ring 
152   private boolean join(int knownId) {
153     Msg.info("Joining the ring with id " + this.id + " knowing node " + knownId);
154     setPredecessor(-1);
155     int successorId = remoteFindSuccessor(knownId, this.id);
156     if (successorId == -1) {
157       Msg.info("Cannot join the ring.");
158     } else {
159       setFinger(0, successorId);
160     }
161     return successorId != -1;
162   }
163
164   private void setPredecessor(int predecessorId) {
165     if (predecessorId != predId) {
166       predId = predecessorId;
167       if (predecessorId != -1) {
168         predMailbox = Integer.toString(predId);
169       }
170       lastChangeDate = Msg.getClock();
171     }
172   }
173
174   /**
175    * @brief Asks another node its predecessor.
176    * @param askTo the node to ask to
177    * @return the id of its predecessor node, or -1 if the request failed(or if the node does not know its predecessor)
178    */
179   private int remoteGetPredecessor(int askTo) {
180     int predecessorId = -1;
181     boolean stop = false;
182     Msg.debug("Sending a 'Get Predecessor' request to " + askTo);
183     String mailboxTo = Integer.toString(askTo);
184     GetPredecessorTask sendTask = new GetPredecessorTask(getHost().getName(), this.mailbox);
185     try {
186       sendTask.send(mailboxTo, Common.TIMEOUT);
187       try {
188         do {
189           if (commReceive == null) {
190             commReceive = Task.irecv(this.mailbox);
191           }
192           commReceive.waitCompletion(Common.TIMEOUT);
193           Task taskReceived = commReceive.getTask();
194           if (taskReceived instanceof GetPredecessorAnswerTask) {
195             predecessorId = ((GetPredecessorAnswerTask) taskReceived).getAnswerId();
196             stop = true;
197           } else {
198             handleTask(taskReceived);
199           }
200           commReceive = null;
201         } while (!stop);
202       }
203       catch (MsgException e) {
204         commReceive = null;
205         stop = true;
206       }
207     }
208     catch (MsgException e) {
209       Msg.debug("Failed to send the Get Predecessor request");
210     }
211     return predecessorId;
212   }
213
214   /**
215    * @brief Makes the current node find the successor node of an id.
216    * @param node the current node
217    * @param id the id to find
218    * @return the id of the successor node, or -1 if the request failed
219    */
220   private int findSuccessor(int id) {
221     if (isInInterval(id, this.id + 1, fingers[0])) {
222       return fingers[0];
223     }
224
225     int closest = this.closestPrecedingNode(id);
226     return remoteFindSuccessor(closest, id);
227   }
228
229   // Asks another node the successor node of an id.
230   private int remoteFindSuccessor(int askTo, int id) {
231     int successor = -1;
232     boolean stop = false;
233     String askToMailbox = Integer.toString(askTo);
234     Task sendTask = new FindSuccessorTask(getHost().getName(), this.mailbox, id);
235     Msg.debug("Sending a 'Find Successor' request to " + askToMailbox + " for id " + id);
236     try {
237       sendTask.send(askToMailbox, Common.TIMEOUT);
238       do {
239         if (commReceive == null) {
240           commReceive = Task.irecv(this.mailbox);
241         }
242         try {
243           commReceive.waitCompletion(Common.TIMEOUT);
244           Task task = commReceive.getTask();
245           if (task instanceof FindSuccessorAnswerTask) {
246             //TODO: Check if this this our answer.
247             FindSuccessorAnswerTask fTask = (FindSuccessorAnswerTask) task;
248             stop = true;
249             successor = fTask.getAnswerId();
250           } else {
251             handleTask(task);
252           }
253           commReceive = null;
254         }
255         catch (TimeoutException e) {
256           stop = true;
257           commReceive = null;
258         }
259       } while (!stop);
260     }
261     catch (TimeoutException e) {
262       Msg.debug("Failed to send the 'Find Successor' request");
263     }
264     catch (MsgException e) {
265       Msg.debug("Failed to receive Find Successor");
266     }
267
268     return successor;
269   }
270
271   // This function is called periodically. It checks the immediate successor of the current node.
272   private void stabilize() {
273     Msg.debug("Stabilizing node");
274     int candidateId;
275     int successorId = fingers[0];
276     if (successorId != this.id){
277       candidateId = remoteGetPredecessor(successorId);
278     } else {
279       candidateId = predId;
280     }
281     //This node is a candidate to become my new successor
282     if (candidateId != -1 && isInInterval(candidateId, this.id + 1, successorId - 1)) {
283       setFinger(0, candidateId);
284     }
285     if (successorId != this.id) {
286       remoteNotify(successorId, this.id);
287     }
288   }
289
290   /**
291    * @brief Notifies the current node that its predecessor may have changed.
292    * @param candidate_id the possible new predecessor
293    */
294   private void notify(int predecessorCandidateId) {
295     if (predId == -1 || isInInterval(predecessorCandidateId, predId + 1, this.id - 1 )) {
296       setPredecessor(predecessorCandidateId);
297     }
298   }
299
300   /**
301    * @brief Notifies a remote node that its predecessor may have changed.
302    * @param notify_id id of the node to notify
303    * @param candidate_id the possible new predecessor
304    */
305   private void remoteNotify(int notifyId, int predecessorCandidateId) {
306     Msg.debug("Sending a 'Notify' request to " + notifyId);
307     Task sentTask = new NotifyTask(getHost().getName(), this.mailbox, predecessorCandidateId);
308     sentTask.dsend(Integer.toString(notifyId));
309   }
310
311   // This function is called periodically.
312   // It refreshes the finger table of the current node.
313   private void fixFingers() {
314     Msg.debug("Fixing fingers");
315     int i = this.nextFingerToFix;
316     int successorId = this.findSuccessor(this.id + (int)Math.pow(2,i)); //FIXME: SLOW
317     if (successorId != -1) {
318       if (successorId != fingers[i]) {
319         setFinger(i, successorId);
320       }
321       nextFingerToFix = (i + 1) % Common.NB_BITS;
322     }
323   }
324
325   // This function is called periodically.
326   // It checks whether the predecessor has failed
327   private void checkPredecessor() {
328     //TODO
329   }
330
331   // Performs a find successor request to a random id.
332   private void randomLookup() {
333     int dest = 1337;
334     //Msg.info("Making a lookup request for id " + dest);
335     findSuccessor(dest);
336   }
337
338   /**
339    * @brief Returns the closest preceding finger of an id with respect to the finger table of the current node.
340    * @param id the id to find
341    * @return the closest preceding finger of that id
342    */
343   private int closestPrecedingNode(int id) {
344     for (int i = Common.NB_BITS - 1; i >= 0; i--) {
345       if (isInInterval(fingers[i], this.id + 1, id - 1)) {
346         return fingers[i];
347       }
348     }
349     return this.id;
350   }
351
352   /**
353    * @brief Returns whether an id belongs to the interval [start, end].
354    *
355    * The parameters are noramlized to make sure they are between 0 and nb_keys - 1).
356    * 1 belongs to [62, 3]
357    * 1 does not belong to [3, 62]
358    * 63 belongs to [62, 3]
359    * 63 does not belong to [3, 62]
360    * 24 belongs to [21, 29]
361    * 24 does not belong to [29, 21]
362    *
363    * @param id id to check
364    * @param start lower bound
365    * @param end upper bound
366    * @return a non-zero value if id in in [start, end]
367    */
368   private static boolean isInInterval(int id, int start, int end) {
369     int normId = normalize(id);
370     int normStart = normalize(start);
371     int normEnd = normalize(end);
372
373     // make sure end >= start and id >= start
374     if (normEnd < normStart) {
375       normEnd += Common.NB_KEYS;
376     }
377     if (normId < normStart) {
378       normId += Common.NB_KEYS;
379     }
380     return (normId <= normEnd);
381   }
382
383   /**
384    * @brief Turns an id into an equivalent id in [0, nb_keys).
385    * @param id an id
386    * @return the corresponding normalized id
387    */
388   private static int normalize(int id) {
389     return id & (Common.NB_KEYS - 1);
390   }
391
392   /**
393    * @brief Sets a finger of the current node.
394    * @param finger_index index of the finger to set (0 to nb_bits - 1)
395    * @param id the id to set for this finger
396    */
397   private void setFinger(int fingerIndex, int id) {
398     if (id != fingers[fingerIndex]) {
399       fingers[fingerIndex] = id;
400       lastChangeDate = Msg.getClock();
401     }
402   }
403 }