Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
2cb52d8899c2b5e3b04488a319716b4de1873ac9
[simgrid.git] / examples / java / dht / chord / Node.java
1 /* Copyright (c) 2006-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 package dht.chord;
8
9 import org.simgrid.msg.Msg;
10 import org.simgrid.msg.Comm;
11 import org.simgrid.msg.Host;
12 import org.simgrid.msg.Task;
13 import org.simgrid.msg.Process;
14 import org.simgrid.msg.MsgException;
15 import org.simgrid.msg.TimeoutException;
16 public class Node extends Process {
17   protected int id;
18   protected String mailbox;
19   protected int predId;
20   protected String predMailbox;
21   protected int nextFingerToFix;
22   protected Comm commReceive;
23   ///Last time I changed a finger or my predecessor
24   protected double lastChangeDate;
25   int[] fingers;
26
27   public Node(Host host, String name, String[] args) {
28     super(host,name,args);
29   }
30
31   @Override
32   public void main(String[] args) throws MsgException {
33     if (args.length != 2 && args.length != 4) {
34       Msg.info("You need to provide 2 or 4 arguments.");
35       return;
36     }
37     double initTime = Msg.getClock();
38     int i;
39     boolean joinSuccess;
40     double deadline;
41
42     double nextStabilizeDate = initTime + Common.PERIODIC_STABILIZE_DELAY;
43     double nextFixFingersDate = initTime + Common.PERIODIC_FIX_FINGERS_DELAY;
44     double nextCheckPredecessorDate = initTime + Common.PERIODIC_CHECK_PREDECESSOR_DELAY;
45     double nextLookupDate = initTime + Common.PERIODIC_LOOKUP_DELAY;
46
47     id = Integer.valueOf(args[0]);
48     mailbox = Integer.toString(id);
49
50     fingers = new int[Common.NB_BITS];
51     for (i = 0; i < Common.NB_BITS; i++) {
52       fingers[i] = -1;
53       setFinger(i,this.id);
54     }
55
56     //First node
57     if (args.length == 2) {
58       deadline = Integer.parseInt(args[1]);
59       create();
60       joinSuccess = true;
61     } else {
62       int knownId = Integer.valueOf(args[1]);
63       deadline = Integer.valueOf(args[3]);
64       //Msg.info("Hey! Let's join the system with the id " + id + ".");
65
66       joinSuccess = join(knownId);
67     }
68     if (joinSuccess) {
69       double currentClock = Msg.getClock();
70       while (currentClock < (initTime + deadline) && currentClock < Common.MAX_SIMULATION_TIME) {
71         if (commReceive == null) {
72           commReceive = Task.irecv(this.mailbox);
73         }
74         try {
75           if (!commReceive.test()) {
76             if (currentClock >= nextStabilizeDate) {
77               stabilize();
78               nextStabilizeDate = Msg.getClock() + Common.PERIODIC_STABILIZE_DELAY;
79             } else if (currentClock >= nextFixFingersDate) {
80               fixFingers();
81               nextFixFingersDate = Msg.getClock() + Common.PERIODIC_FIX_FINGERS_DELAY;
82             } else if (currentClock >= nextCheckPredecessorDate) {
83               this.checkPredecessor();
84               nextCheckPredecessorDate = Msg.getClock() + Common.PERIODIC_CHECK_PREDECESSOR_DELAY;
85             } else if (currentClock >= nextLookupDate) {
86               this.randomLookup();
87               nextLookupDate = Msg.getClock() + Common.PERIODIC_LOOKUP_DELAY;
88             } else {
89               waitFor(5);
90             }
91             currentClock = Msg.getClock();
92           } else {
93             handleTask(commReceive.getTask());
94             currentClock = Msg.getClock();
95             commReceive = null;
96           }
97         }
98         catch (Exception e) {
99           currentClock = Msg.getClock();
100           commReceive = null;
101         }
102       }
103       leave();
104       if (commReceive != null) {
105         commReceive = null;
106       }
107     } else {
108       Msg.info("I couldn't join the ring");
109     }
110   }
111
112   void handleTask(Task task) {
113     if (task instanceof FindSuccessorTask) {
114       FindSuccessorTask fTask = (FindSuccessorTask)task;
115       Msg.debug("Receiving a 'Find Successor' request from " + fTask.getIssuerHostName() + " for id " + 
116                 fTask.getRequestId());
117       // is my successor the successor?
118       if (isInInterval(fTask.getRequestId(), this.id + 1, fingers[0])) {
119         Msg.debug("Send the request to " + fTask.getAnswerTo() + " with answer " + fingers[0]);
120         FindSuccessorAnswerTask answer = new FindSuccessorAnswerTask(getHost().getName(), mailbox, fingers[0]);
121         answer.dsend(fTask.getAnswerTo());
122       } else {
123         // otherwise, forward the request to the closest preceding finger in my table
124         int closest = closestPrecedingNode(fTask.getRequestId());
125         Msg.debug("Forward the request to " + closest);
126         fTask.dsend(Integer.toString(closest));
127       }
128     } else if (task instanceof GetPredecessorTask) {
129       GetPredecessorTask gTask = (GetPredecessorTask)(task);
130       Msg.debug("Receiving a 'Get Predecessor' request from " + gTask.getIssuerHostName());
131       GetPredecessorAnswerTask answer = new GetPredecessorAnswerTask(getHost().getName(), mailbox, predId);
132       answer.dsend(gTask.getAnswerTo());
133     } else if (task instanceof NotifyTask) {
134       NotifyTask nTask = (NotifyTask)task;
135       notify(nTask.getRequestId());
136     } else {
137       Msg.debug("Ignoring unexpected task of type:" + task);
138     }
139   }
140
141   void leave() {
142     Msg.debug("Well Guys! I Think it's time for me to quit ;)");
143     quitNotify(1); //Notify my successor
144     quitNotify(-1); //Notify my predecessor.
145   }
146
147   /**
148    * @brief Notifies the successor or the predecessor of the current node of the departure
149    * @param to 1 to notify the successor, -1 to notify the predecessor
150    */
151   static void quitNotify( int to) {
152     //TODO
153   }
154
155   /**
156    * @brief Initializes the current node as the first one of the system.
157    */
158   void create() {
159     Msg.debug("Create a new Chord ring...");
160     setPredecessor(-1);
161   }
162
163   // Makes the current node join the ring, knowing the id of a node already in the ring 
164   boolean join(int knownId) {
165     Msg.info("Joining the ring with id " + this.id + " knowing node " + knownId);
166     setPredecessor(-1);
167     int successorId = remoteFindSuccessor(knownId, this.id);
168     if (successorId == -1) {
169       Msg.info("Cannot join the ring.");
170     } else {
171       setFinger(0, successorId);
172     }
173     return successorId != -1;
174   }
175
176   void setPredecessor(int predecessorId) {
177     if (predecessorId != predId) {
178       predId = predecessorId;
179       if (predecessorId != -1) {
180         predMailbox = Integer.toString(predId);
181       }
182       lastChangeDate = Msg.getClock();
183     }
184   }
185
186   /**
187    * @brief Asks another node its predecessor.
188    * @param askTo the node to ask to
189    * @return the id of its predecessor node, or -1 if the request failed(or if the node does not know its predecessor)
190    */
191   int remoteGetPredecessor(int askTo) {
192     int predecessorId = -1;
193     boolean stop = false;
194     Msg.debug("Sending a 'Get Predecessor' request to " + askTo);
195     String mailboxTo = Integer.toString(askTo);
196     GetPredecessorTask sendTask = new GetPredecessorTask(getHost().getName(), this.mailbox);
197     try {
198       sendTask.send(mailboxTo, Common.TIMEOUT);
199       try {
200         do {
201           if (commReceive == null) {
202             commReceive = Task.irecv(this.mailbox);
203           }
204           commReceive.waitCompletion(Common.TIMEOUT);
205           Task taskReceived = commReceive.getTask();
206           if (taskReceived instanceof GetPredecessorAnswerTask) {
207             predecessorId = ((GetPredecessorAnswerTask) taskReceived).getAnswerId();
208             stop = true;
209           } else {
210             handleTask(taskReceived);
211           }
212           commReceive = null;
213         } while (!stop);
214       }
215       catch (MsgException e) {
216         commReceive = null;
217         stop = true;
218       }
219     }
220     catch (MsgException e) {
221       Msg.debug("Failed to send the Get Predecessor request");
222     }
223     return predecessorId;
224   }
225
226   /**
227    * @brief Makes the current node find the successor node of an id.
228    * @param node the current node
229    * @param id the id to find
230    * @return the id of the successor node, or -1 if the request failed
231    */
232   int findSuccessor(int id) {
233     if (isInInterval(id, this.id + 1, fingers[0])) {
234       return fingers[0];
235     }
236
237     int closest = this.closestPrecedingNode(id);
238     return remoteFindSuccessor(closest, id);
239   }
240
241   // Asks another node the successor node of an id.
242   int remoteFindSuccessor(int askTo, int id) {
243     int successor = -1;
244     boolean stop = false;
245     String askToMailbox = Integer.toString(askTo);
246     Task sendTask = new FindSuccessorTask(getHost().getName(), this.mailbox, id);
247     Msg.debug("Sending a 'Find Successor' request to " + askToMailbox + " for id " + id);
248     try {
249       sendTask.send(askToMailbox, Common.TIMEOUT);
250       do {
251         if (commReceive == null) {
252           commReceive = Task.irecv(this.mailbox);
253         }
254         try {
255           commReceive.waitCompletion(Common.TIMEOUT);
256           Task task = commReceive.getTask();
257           if (task instanceof FindSuccessorAnswerTask) {
258             //TODO: Check if this this our answer.
259             FindSuccessorAnswerTask fTask = (FindSuccessorAnswerTask) task;
260             stop = true;
261             successor = fTask.getAnswerId();
262           } else {
263             handleTask(task);
264           }
265           commReceive = null;
266         }
267         catch (TimeoutException e) {
268           stop = true;
269           commReceive = null;
270         }
271       } while (!stop);
272     }
273     catch (TimeoutException e) {
274       Msg.debug("Failed to send the 'Find Successor' request");
275     }
276     catch (MsgException e) {
277       Msg.debug("Failed to receive Find Successor");
278     }
279
280     return successor;
281   }
282
283   // This function is called periodically. It checks the immediate successor of the current node.
284   void stabilize() {
285     Msg.debug("Stabilizing node");
286     int candidateId;
287     int successorId = fingers[0];
288     if (successorId != this.id){
289       candidateId = remoteGetPredecessor(successorId);
290     } else {
291       candidateId = predId;
292     }
293     //This node is a candidate to become my new successor
294     if (candidateId != -1 && isInInterval(candidateId, this.id + 1, successorId - 1)) {
295       setFinger(0, candidateId);
296     }
297     if (successorId != this.id) {
298       remoteNotify(successorId, this.id);
299     }
300   }
301
302   /**
303    * @brief Notifies the current node that its predecessor may have changed.
304    * @param candidate_id the possible new predecessor
305    */
306   void notify(int predecessorCandidateId) {
307     if (predId == -1 || isInInterval(predecessorCandidateId, predId + 1, this.id - 1 )) {
308       setPredecessor(predecessorCandidateId);
309     }
310   }
311
312   /**
313    * @brief Notifies a remote node that its predecessor may have changed.
314    * @param notify_id id of the node to notify
315    * @param candidate_id the possible new predecessor
316    */
317   void remoteNotify(int notifyId, int predecessorCandidateId) {
318     Msg.debug("Sending a 'Notify' request to " + notifyId);
319     Task sentTask = new NotifyTask(getHost().getName(), this.mailbox, predecessorCandidateId);
320     sentTask.dsend(Integer.toString(notifyId));
321   }
322
323   // This function is called periodically.
324   // It refreshes the finger table of the current node.
325   void fixFingers() {
326     Msg.debug("Fixing fingers");
327     int i = this.nextFingerToFix;
328     int successorId = this.findSuccessor(this.id + (int)Math.pow(2,i)); //FIXME: SLOW
329     if (successorId != -1) {
330       if (successorId != fingers[i]) {
331         setFinger(i, successorId);
332       }
333       nextFingerToFix = (i + 1) % Common.NB_BITS;
334     }
335   }
336
337   // This function is called periodically.
338   // It checks whether the predecessor has failed
339   void checkPredecessor() {
340     //TODO
341   }
342
343   // Performs a find successor request to a random id.
344   void randomLookup() {
345     int id = 1337;
346     //Msg.info("Making a lookup request for id " + id);
347     findSuccessor(id);
348   }
349
350   /**
351    * @brief Returns the closest preceding finger of an id with respect to the finger table of the current node.
352    * @param id the id to find
353    * @return the closest preceding finger of that id
354    */
355   int closestPrecedingNode(int id) {
356     for (int i = Common.NB_BITS - 1; i >= 0; i--) {
357       if (isInInterval(fingers[i], this.id + 1, id - 1)) {
358         return fingers[i];
359       }
360     }
361     return this.id;
362   }
363
364   /**
365    * @brief Returns whether an id belongs to the interval [start, end].
366    *
367    * The parameters are noramlized to make sure they are between 0 and nb_keys - 1).
368    * 1 belongs to [62, 3]
369    * 1 does not belong to [3, 62]
370    * 63 belongs to [62, 3]
371    * 63 does not belong to [3, 62]
372    * 24 belongs to [21, 29]
373    * 24 does not belong to [29, 21]
374    *
375    * @param id id to check
376    * @param start lower bound
377    * @param end upper bound
378    * @return a non-zero value if id in in [start, end]
379    */
380   static boolean isInInterval(int id, int start, int end) {
381     int normId = normalize(id);
382     int normStart = normalize(start);
383     int normEnd = normalize(end);
384
385     // make sure end >= start and id >= start
386     if (normEnd < normStart) {
387       normEnd += Common.NB_KEYS;
388     }
389     if (normId < normStart) {
390       normId += Common.NB_KEYS;
391     }
392     return (normId <= normEnd);
393   }
394
395   /**
396    * @brief Turns an id into an equivalent id in [0, nb_keys).
397    * @param id an id
398    * @return the corresponding normalized id
399    */
400   static int normalize(int id) {
401     return id & (Common.NB_KEYS - 1);
402   }
403
404   /**
405    * @brief Sets a finger of the current node.
406    * @param finger_index index of the finger to set (0 to nb_bits - 1)
407    * @param id the id to set for this finger
408    */
409   void setFinger(int fingerIndex, int id) {
410     if (id != fingers[fingerIndex]) {
411       fingers[fingerIndex] = id;
412       lastChangeDate = Msg.getClock();
413     }
414   }
415 }