Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Bugfix in kademlia example
[simgrid.git] / examples / chord / Node.java
1 package chord;
2
3 import org.simgrid.msg.Comm;
4 import org.simgrid.msg.Host;
5 import org.simgrid.msg.Msg;
6 import org.simgrid.msg.MsgException;
7 import org.simgrid.msg.Process;
8 import org.simgrid.msg.Task;
9 import org.simgrid.msg.TimeoutException;
10 /**
11  * Node data
12  */
13 public class Node extends Process {
14         /**
15          * Node id
16          */
17         protected int id;
18         /**
19          * Node mailbox
20          */
21         protected String mailbox;
22         /**
23          * Predecessor id
24          */
25         protected int predId;
26         /**
27          * Predecessor mailbox
28          */
29         protected String predMailbox;
30         /**
31          * Index of the next finger to fix
32          */
33         protected int nextFingerToFix;
34         /**
35          * Current communication
36          */
37         protected Comm commReceive;
38         /**
39          * Last time I changed a finger or my predecessor
40          */
41         protected double lastChangeDate;
42         /**
43          * Node fingers
44          */
45         int fingers[];
46         /**
47          * Constructor
48          */
49         public Node(Host host, String name, String[] args) {
50                 super(host,name,args);
51         }
52         @Override
53         public void main(String[] args) throws MsgException {
54                 if (args.length != 2 && args.length != 4) {
55                         Msg.info("You need to provide 2 or 4 arguments.");
56                         return; 
57                 }
58                 double initTime = Msg.getClock();
59                 int i;
60                 boolean joinSuccess = false;
61                 double deadline;
62                 
63                 double nextStabilizeDate = initTime + Common.PERIODIC_STABILIZE_DELAY;
64                 double nextFixFingersDate = initTime + Common.PERIODIC_FIX_FINGERS_DELAY;
65                 double nextCheckPredecessorDate = initTime + Common.PERIODIC_CHECK_PREDECESSOR_DELAY;
66                 double nextLookupDate = initTime + Common.PERIODIC_LOOKUP_DELAY;
67                 
68                 id = Integer.valueOf(args[0]);
69                 mailbox = Integer.toString(id);
70
71                 fingers = new int[Common.NB_BITS];
72                 for (i = 0; i < Common.NB_BITS; i++) {
73                         fingers[i] = -1;
74                         setFinger(i,this.id);
75                 }
76                 
77                 //First node
78                 if (args.length == 2) {
79                         deadline = Integer.valueOf(args[1]);
80                         create();
81                         joinSuccess = true;
82                 }
83                 else {
84                         int knownId = Integer.valueOf(args[1]);
85                         deadline = Integer.valueOf(args[3]);
86                         //Msg.info("Hey! Let's join the system with the id " + id + ".");
87                         
88                         joinSuccess = join(knownId);
89                 }
90                 if (joinSuccess) {
91                         double currentClock = Msg.getClock();
92                         while (currentClock < (initTime + deadline) && currentClock < Common.MAX_SIMULATION_TIME) {
93                                 if (commReceive == null) {
94                                         commReceive = Task.irecv(this.mailbox);
95                                 }
96                                 try {
97                                         if (!commReceive.test()) {
98                                                 if (currentClock >= nextStabilizeDate) {
99                                                         stabilize();
100                                                         nextStabilizeDate = Msg.getClock() + Common.PERIODIC_STABILIZE_DELAY;
101                                                 }
102                                                 else if (currentClock >= nextFixFingersDate) {
103                                                         fixFingers();
104                                                         nextFixFingersDate = Msg.getClock() + Common.PERIODIC_FIX_FINGERS_DELAY;
105                                                 }
106                                                 else if (currentClock >= nextCheckPredecessorDate) {
107                                                         this.checkPredecessor();
108                                                         nextCheckPredecessorDate = Msg.getClock() + Common.PERIODIC_CHECK_PREDECESSOR_DELAY;
109                                                 }
110                                                 else if (currentClock >= nextLookupDate) {
111                                                         this.randomLookup();
112                                                         nextLookupDate = Msg.getClock() + Common.PERIODIC_LOOKUP_DELAY;
113                                                 }
114                                                 else {
115                                                         waitFor(5);
116                                                 }
117                                                 currentClock = Msg.getClock();
118                                         }
119                                         else {
120                                                 handleTask(commReceive.getTask());
121                                                 currentClock = Msg.getClock();
122                                                 commReceive = null;
123                                                 
124                                         }
125                                 }
126                                 catch (Exception e) {
127                                         currentClock = Msg.getClock();
128                                         commReceive = null;
129                                 }
130                                 
131                         }
132                         leave();
133                         if (commReceive != null) {
134                                 commReceive = null;
135                         }
136                 }
137                 else {
138                         Msg.info("I couldn't join the ring");
139                 }
140         }
141         void handleTask(Task task) {
142                 if (task instanceof FindSuccessorTask) {
143                         FindSuccessorTask fTask = (FindSuccessorTask)task;
144                         Msg.debug("Receiving a 'Find Successor' request from " + fTask.issuerHostName + " for id " + fTask.requestId);
145                         // is my successor the successor?
146                         if (isInInterval(fTask.requestId, this.id + 1, fingers[0])) {
147                                 //Msg.info("Send the request to " + fTask.answerTo + " with answer " + fingers[0]);
148                                 FindSuccessorAnswerTask answer = new FindSuccessorAnswerTask(host.getName(), mailbox, fingers[0]);
149                                 answer.dsend(fTask.answerTo);
150                         }
151                         else {
152                         // otherwise, forward the request to the closest preceding finger in my table
153                                 int closest = closestPrecedingNode(fTask.requestId);
154                                 //Msg.info("Forward the request to " + closest);
155                                 fTask.dsend(Integer.toString(closest));
156                         }
157                 }
158                 else if (task instanceof GetPredecessorTask) {
159                         GetPredecessorTask gTask = (GetPredecessorTask)(task);
160                         Msg.debug("Receiving a 'Get Predecessor' request from " + gTask.issuerHostName);
161                         GetPredecessorAnswerTask answer = new GetPredecessorAnswerTask(host.getName(), mailbox, predId);
162                         answer.dsend(gTask.answerTo);
163                 }
164                 else if (task instanceof NotifyTask) {
165                         NotifyTask nTask = (NotifyTask)task;
166                         notify(nTask.requestId);
167                 }
168                 else {
169                         Msg.debug("Ignoring unexpected task of type:" + task);
170                 }
171         }
172         /**
173          * @brief Makes the current node quit the system
174          */
175         void leave() {
176                 Msg.debug("Well Guys! I Think it's time for me to quit ;)");
177                 quitNotify(1); //Notify my successor
178                 quitNotify(-1); //Notify my predecessor.
179                 // TODO ...
180         }
181         /**
182          * @brief Notifies the successor or the predecessor of the current node
183          * of the departure
184          * @param to 1 to notify the successor, -1 to notify the predecessor
185          */
186         static void quitNotify( int to) {
187                 //TODO
188         }
189         /**
190       * @brief Initializes the current node as the first one of the system.
191          */
192         void create() {
193                 Msg.debug("Create a new Chord ring...");
194                 setPredecessor(-1);
195                 
196         }
197         /**
198          * Makes the current node join the ring, knowing the id of a node
199          * already in the ring 
200          */
201         boolean join(int knownId) {
202                 Msg.info("Joining the ring with id " + this.id + " knowing node " + knownId);
203                 setPredecessor(-1);
204                 int successorId = remoteFindSuccessor(knownId, this.id);
205                 if (successorId == -1) {
206                         Msg.info("Cannot join the ring.");
207                 }
208                 else {
209                         setFinger(0, successorId);
210                 }
211                 return successorId != -1;
212         }
213         
214         /**
215          * Sets the node predecessor
216          */
217         void setPredecessor(int predecessorId) {
218                 if (predecessorId != predId) {
219                         predId = predecessorId;
220                         if (predecessorId != -1) {
221                                 predMailbox = Integer.toString(predId);
222                         }
223                         lastChangeDate = Msg.getClock();
224                 }
225         }
226         /**
227          * @brief Asks another node its predecessor.
228          * @param askTo the node to ask to
229          * @return the id of its predecessor node, or -1 if the request failed
230          * (or if the node does not know its predecessor)
231          */
232         int remoteGetPredecessor(int askTo) {
233                 int predecessorId = -1;
234                 boolean stop = false;
235                 Msg.debug("Sending a 'Get Predecessor' request to " + askTo);
236                 String mailboxTo = Integer.toString(askTo);
237                 GetPredecessorTask sendTask = new GetPredecessorTask(host.getName(), this.mailbox);
238                 try {
239                         sendTask.send(mailboxTo, Common.TIMEOUT);                       
240                         try {
241                                 do {
242                                         if (commReceive == null) {
243                                                 commReceive = Task.irecv(this.mailbox);
244                                         }
245                                         commReceive.waitCompletion(Common.TIMEOUT);
246                                         Task taskReceived = commReceive.getTask();
247                                         if (taskReceived instanceof GetPredecessorAnswerTask) {
248                                                 predecessorId = ((GetPredecessorAnswerTask) taskReceived).answerId;
249                                                 stop = true;
250                                         }
251                                         else {
252                                                 handleTask(taskReceived);
253                                         }
254                                         commReceive = null;                                     
255                                 } while (!stop);
256                 
257                         }
258                         catch (MsgException e) {
259                                 commReceive = null;     
260                                 stop = true;
261                         }
262                 }
263                 catch (MsgException e) {
264                         Msg.debug("Failed to send the Get Predecessor request");
265                 }
266                 
267                 
268                 return predecessorId;
269         }
270         /**
271          * @brief Makes the current node find the successor node of an id.
272          * @param node the current node
273          * @param id the id to find
274          * @return the id of the successor node, or -1 if the request failed
275          */
276         int findSuccessor(int id) {
277                 if (isInInterval(id, this.id + 1, fingers[0])) {
278                         return fingers[0];
279                 }
280                 
281                 int closest = this.closestPrecedingNode(id);
282                 return remoteFindSuccessor(closest, id);
283         }
284         /**
285          * @brief Asks another node the successor node of an id.
286          */
287         int remoteFindSuccessor(int askTo, int id) {
288                 int successor = -1;
289                 boolean stop = false;
290                 String mailbox = Integer.toString(askTo);
291                 Task sendTask = new FindSuccessorTask(host.getName(), this.mailbox, id);
292                 Msg.debug("Sending a 'Find Successor' request to " + mailbox + " for id " + id);
293                 try {
294                         sendTask.send(mailbox, Common.TIMEOUT);
295                         do {
296                                 if (commReceive == null) {
297                                         commReceive = Task.irecv(this.mailbox);
298                                 }
299                                 try {
300                                         commReceive.waitCompletion(Common.TIMEOUT);
301                                         Task task = commReceive.getTask();
302                                         if (task instanceof FindSuccessorAnswerTask) {
303                                                 //TODO: Check if this this our answer.
304                                                 FindSuccessorAnswerTask fTask = (FindSuccessorAnswerTask) task;
305                                                 stop = true;
306                                                 successor = fTask.answerId;
307                                         }
308                                         else {
309                                                 handleTask(task);
310                                         }
311                                         commReceive = null;
312                                 }
313                                 catch (TimeoutException e) {
314                                         stop = true;
315                                         commReceive = null;
316                                 }
317                         } while (!stop);
318                 }
319                 catch (TimeoutException e) {
320                         Msg.debug("Failed to send the 'Find Successor' request");
321                 }
322                 catch (MsgException e) {
323                         Msg.debug("Failed to receive Find Successor");
324                 }
325                 
326                 return successor;
327
328         }
329         /**
330          * @brief This function is called periodically. It checks the immediate
331          * successor of the current node.
332          */
333         void stabilize() {
334                 Msg.debug("Stabilizing node");
335                 int candidateId;
336                 int successorId = fingers[0];
337                 if (successorId != this.id){
338                         candidateId = remoteGetPredecessor(successorId);
339                 }
340                 else {
341                         candidateId = predId;
342                 }
343                 //This node is a candidate to become my new successor
344                 if (candidateId != -1 && isInInterval(candidateId, this.id + 1, successorId - 1)) {
345                         setFinger(0, candidateId);
346                 }
347                 if (successorId != this.id) {
348                         remoteNotify(successorId, this.id);
349                 }
350                 
351         }
352         /**
353          * \brief Notifies the current node that its predecessor may have changed.
354          * \param candidate_id the possible new predecessor
355          */
356         void notify(int predecessorCandidateId) {
357                 if (predId == -1 || isInInterval(predecessorCandidateId, predId + 1, this.id - 1 )) {
358                         setPredecessor(predecessorCandidateId);
359                 }
360                 else {
361                         //Don't have to change the predecessor.
362                 }
363         }
364         /**
365          * \brief Notifies a remote node that its predecessor may have changed.
366          * \param notify_id id of the node to notify
367          * \param candidate_id the possible new predecessor
368          */     
369         void remoteNotify(int notifyId, int predecessorCandidateId) {
370                 Msg.debug("Sending a 'Notify' request to " + notifyId);
371                 Task sentTask = new NotifyTask(host.getName(), this.mailbox, predecessorCandidateId);
372                 sentTask.dsend(Integer.toString(notifyId));
373         }
374         /**
375          * \brief This function is called periodically.
376          * It refreshes the finger table of the current node.
377          */
378         void fixFingers() {
379                 Msg.debug("Fixing fingers");
380                 int i = this.nextFingerToFix;
381                 int id = this.findSuccessor(this.id + (int)Math.pow(2,i)); //FIXME: SLOW
382                 if (id != -1) {
383                         if (id != fingers[i]) {
384                                 setFinger(i, id);
385                         }
386                         nextFingerToFix = (i + 1) % Common.NB_BITS;
387                 }
388         }
389         /**
390          * \brief This function is called periodically.
391          * It checks whether the predecessor has failed
392          */
393         void checkPredecessor() {
394                 //TODO
395         }
396         /**
397          * \brief Performs a find successor request to a random id.
398          */
399         void randomLookup() {
400                 int id = 1337;
401                 //Msg.info("Making a lookup request for id " + id);
402                 findSuccessor(id);
403         }
404         
405         
406
407         /**
408          * @brief Returns the closest preceding finger of an id
409          * with respect to the finger table of the current node.
410          * @param id the id to find
411          * \return the closest preceding finger of that id
412          */
413         int closestPrecedingNode(int id) {
414                 int i;
415                 for (i = Common.NB_BITS - 1; i >= 0; i--) {
416                         if (isInInterval(fingers[i], this.id + 1, id - 1)) {
417                                 return fingers[i];
418                         }
419                 }               
420                 return this.id;
421         }
422         /**
423          * @brief Returns whether an id belongs to the interval [start, end].
424          *
425          * The parameters are noramlized to make sure they are between 0 and nb_keys - 1).
426          * 1 belongs to [62, 3]
427          * 1 does not belong to [3, 62]
428          * 63 belongs to [62, 3]
429          * 63 does not belong to [3, 62]
430          * 24 belongs to [21, 29]
431          * 24 does not belong to [29, 21]
432          *
433          * \param id id to check
434          * \param start lower bound
435          * \param end upper bound
436          * \return a non-zero value if id in in [start, end]
437          */
438         static boolean isInInterval(int id, int start, int end) {
439                 id = normalize(id);
440                 start = normalize(start);
441                 end = normalize(end);
442                 
443                 // make sure end >= start and id >= start
444                 if (end < start) {
445                         end += Common.NB_KEYS;
446                 }
447                 if (id < start) {
448                         id += Common.NB_KEYS;
449                 }
450                 return (id <= end);
451         
452         }
453         /**
454          * @brief Turns an id into an equivalent id in [0, nb_keys).
455          * @param id an id
456          * @return the corresponding normalized id
457          */
458         static int normalize(int id) {
459                 return id & (Common.NB_KEYS - 1);
460         }
461         /**
462          * \brief Sets a finger of the current node.
463          * \param finger_index index of the finger to set (0 to nb_bits - 1)
464          * \param id the id to set for this finger
465          */
466         void setFinger(int fingerIndex, int id) {
467                 if (id != fingers[fingerIndex]) {
468                         fingers[fingerIndex] = id;
469                         lastChangeDate = Msg.getClock();
470                 }
471         }
472 }