Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Kill some memory leaks.
[simgrid.git] / examples / java / chord / Node.java
1 /*
2  * Copyright (c) 2006-2013. The SimGrid Team.
3  * All rights reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. 
7  */
8 package chord;
9
10 import org.simgrid.msg.Comm;
11 import org.simgrid.msg.Host;
12 import org.simgrid.msg.Msg;
13 import org.simgrid.msg.MsgException;
14 import org.simgrid.msg.Process;
15 import org.simgrid.msg.Task;
16 import org.simgrid.msg.TimeoutException;
17 /**
18  * Node data
19  */
20 public class Node extends Process {
21         /**
22          * Node id
23          */
24         protected int id;
25         /**
26          * Node mailbox
27          */
28         protected String mailbox;
29         /**
30          * Predecessor id
31          */
32         protected int predId;
33         /**
34          * Predecessor mailbox
35          */
36         protected String predMailbox;
37         /**
38          * Index of the next finger to fix
39          */
40         protected int nextFingerToFix;
41         /**
42          * Current communication
43          */
44         protected Comm commReceive;
45         /**
46          * Last time I changed a finger or my predecessor
47          */
48         protected double lastChangeDate;
49         /**
50          * Node fingers
51          */
52         int fingers[];
53         /**
54          * Constructor
55          */
56         public Node(Host host, String name, String[] args) {
57                 super(host,name,args);
58         }
59         @Override
60         public void main(String[] args) throws MsgException {
61                 if (args.length != 2 && args.length != 4) {
62                         Msg.info("You need to provide 2 or 4 arguments.");
63                         return; 
64                 }
65                 double initTime = Msg.getClock();
66                 int i;
67                 boolean joinSuccess = false;
68                 double deadline;
69                 
70                 double nextStabilizeDate = initTime + Common.PERIODIC_STABILIZE_DELAY;
71                 double nextFixFingersDate = initTime + Common.PERIODIC_FIX_FINGERS_DELAY;
72                 double nextCheckPredecessorDate = initTime + Common.PERIODIC_CHECK_PREDECESSOR_DELAY;
73                 double nextLookupDate = initTime + Common.PERIODIC_LOOKUP_DELAY;
74                 
75                 id = Integer.valueOf(args[0]);
76                 mailbox = Integer.toString(id);
77
78                 fingers = new int[Common.NB_BITS];
79                 for (i = 0; i < Common.NB_BITS; i++) {
80                         fingers[i] = -1;
81                         setFinger(i,this.id);
82                 }
83                 
84                 //First node
85                 if (args.length == 2) {
86                         deadline = Integer.valueOf(args[1]);
87                         create();
88                         joinSuccess = true;
89                 }
90                 else {
91                         int knownId = Integer.valueOf(args[1]);
92                         deadline = Integer.valueOf(args[3]);
93                         //Msg.info("Hey! Let's join the system with the id " + id + ".");
94                         
95                         joinSuccess = join(knownId);
96                 }
97                 if (joinSuccess) {
98                         double currentClock = Msg.getClock();
99                         while (currentClock < (initTime + deadline) && currentClock < Common.MAX_SIMULATION_TIME) {
100                                 if (commReceive == null) {
101                                         commReceive = Task.irecv(this.mailbox);
102                                 }
103                                 try {
104                                         if (!commReceive.test()) {
105                                                 if (currentClock >= nextStabilizeDate) {
106                                                         stabilize();
107                                                         nextStabilizeDate = Msg.getClock() + Common.PERIODIC_STABILIZE_DELAY;
108                                                 }
109                                                 else if (currentClock >= nextFixFingersDate) {
110                                                         fixFingers();
111                                                         nextFixFingersDate = Msg.getClock() + Common.PERIODIC_FIX_FINGERS_DELAY;
112                                                 }
113                                                 else if (currentClock >= nextCheckPredecessorDate) {
114                                                         this.checkPredecessor();
115                                                         nextCheckPredecessorDate = Msg.getClock() + Common.PERIODIC_CHECK_PREDECESSOR_DELAY;
116                                                 }
117                                                 else if (currentClock >= nextLookupDate) {
118                                                         this.randomLookup();
119                                                         nextLookupDate = Msg.getClock() + Common.PERIODIC_LOOKUP_DELAY;
120                                                 }
121                                                 else {
122                                                         waitFor(5);
123                                                 }
124                                                 currentClock = Msg.getClock();
125                                         }
126                                         else {
127                                                 handleTask(commReceive.getTask());
128                                                 currentClock = Msg.getClock();
129                                                 commReceive = null;
130                                                 
131                                         }
132                                 }
133                                 catch (Exception e) {
134                                         currentClock = Msg.getClock();
135                                         commReceive = null;
136                                 }
137                                 
138                         }
139                         leave();
140                         if (commReceive != null) {
141                                 commReceive = null;
142                         }
143                 }
144                 else {
145                         Msg.info("I couldn't join the ring");
146                 }
147         }
148         void handleTask(Task task) {
149                 if (task instanceof FindSuccessorTask) {
150                         FindSuccessorTask fTask = (FindSuccessorTask)task;
151                         Msg.debug("Receiving a 'Find Successor' request from " + fTask.issuerHostName + " for id " + fTask.requestId);
152                         // is my successor the successor?
153                         if (isInInterval(fTask.requestId, this.id + 1, fingers[0])) {
154                                 //Msg.info("Send the request to " + fTask.answerTo + " with answer " + fingers[0]);
155                                 FindSuccessorAnswerTask answer = new FindSuccessorAnswerTask(host.getName(), mailbox, fingers[0]);
156                                 answer.dsend(fTask.answerTo);
157                         }
158                         else {
159                         // otherwise, forward the request to the closest preceding finger in my table
160                                 int closest = closestPrecedingNode(fTask.requestId);
161                                 //Msg.info("Forward the request to " + closest);
162                                 fTask.dsend(Integer.toString(closest));
163                         }
164                 }
165                 else if (task instanceof GetPredecessorTask) {
166                         GetPredecessorTask gTask = (GetPredecessorTask)(task);
167                         Msg.debug("Receiving a 'Get Predecessor' request from " + gTask.issuerHostName);
168                         GetPredecessorAnswerTask answer = new GetPredecessorAnswerTask(host.getName(), mailbox, predId);
169                         answer.dsend(gTask.answerTo);
170                 }
171                 else if (task instanceof NotifyTask) {
172                         NotifyTask nTask = (NotifyTask)task;
173                         notify(nTask.requestId);
174                 }
175                 else {
176                         Msg.debug("Ignoring unexpected task of type:" + task);
177                 }
178         }
179         /**
180          * @brief Makes the current node quit the system
181          */
182         void leave() {
183                 Msg.debug("Well Guys! I Think it's time for me to quit ;)");
184                 quitNotify(1); //Notify my successor
185                 quitNotify(-1); //Notify my predecessor.
186                 // TODO ...
187         }
188         /**
189          * @brief Notifies the successor or the predecessor of the current node
190          * of the departure
191          * @param to 1 to notify the successor, -1 to notify the predecessor
192          */
193         static void quitNotify( int to) {
194                 //TODO
195         }
196         /**
197       * @brief Initializes the current node as the first one of the system.
198          */
199         void create() {
200                 Msg.debug("Create a new Chord ring...");
201                 setPredecessor(-1);
202                 
203         }
204         /**
205          * Makes the current node join the ring, knowing the id of a node
206          * already in the ring 
207          */
208         boolean join(int knownId) {
209                 Msg.info("Joining the ring with id " + this.id + " knowing node " + knownId);
210                 setPredecessor(-1);
211                 int successorId = remoteFindSuccessor(knownId, this.id);
212                 if (successorId == -1) {
213                         Msg.info("Cannot join the ring.");
214                 }
215                 else {
216                         setFinger(0, successorId);
217                 }
218                 return successorId != -1;
219         }
220         
221         /**
222          * Sets the node predecessor
223          */
224         void setPredecessor(int predecessorId) {
225                 if (predecessorId != predId) {
226                         predId = predecessorId;
227                         if (predecessorId != -1) {
228                                 predMailbox = Integer.toString(predId);
229                         }
230                         lastChangeDate = Msg.getClock();
231                 }
232         }
233         /**
234          * @brief Asks another node its predecessor.
235          * @param askTo the node to ask to
236          * @return the id of its predecessor node, or -1 if the request failed
237          * (or if the node does not know its predecessor)
238          */
239         int remoteGetPredecessor(int askTo) {
240                 int predecessorId = -1;
241                 boolean stop = false;
242                 Msg.debug("Sending a 'Get Predecessor' request to " + askTo);
243                 String mailboxTo = Integer.toString(askTo);
244                 GetPredecessorTask sendTask = new GetPredecessorTask(host.getName(), this.mailbox);
245                 try {
246                         sendTask.send(mailboxTo, Common.TIMEOUT);                       
247                         try {
248                                 do {
249                                         if (commReceive == null) {
250                                                 commReceive = Task.irecv(this.mailbox);
251                                         }
252                                         commReceive.waitCompletion(Common.TIMEOUT);
253                                         Task taskReceived = commReceive.getTask();
254                                         if (taskReceived instanceof GetPredecessorAnswerTask) {
255                                                 predecessorId = ((GetPredecessorAnswerTask) taskReceived).answerId;
256                                                 stop = true;
257                                         }
258                                         else {
259                                                 handleTask(taskReceived);
260                                         }
261                                         commReceive = null;                                     
262                                 } while (!stop);
263                 
264                         }
265                         catch (MsgException e) {
266                                 commReceive = null;     
267                                 stop = true;
268                         }
269                 }
270                 catch (MsgException e) {
271                         Msg.debug("Failed to send the Get Predecessor request");
272                 }
273                 
274                 
275                 return predecessorId;
276         }
277         /**
278          * @brief Makes the current node find the successor node of an id.
279          * @param node the current node
280          * @param id the id to find
281          * @return the id of the successor node, or -1 if the request failed
282          */
283         int findSuccessor(int id) {
284                 if (isInInterval(id, this.id + 1, fingers[0])) {
285                         return fingers[0];
286                 }
287                 
288                 int closest = this.closestPrecedingNode(id);
289                 return remoteFindSuccessor(closest, id);
290         }
291         /**
292          * @brief Asks another node the successor node of an id.
293          */
294         int remoteFindSuccessor(int askTo, int id) {
295                 int successor = -1;
296                 boolean stop = false;
297                 String mailbox = Integer.toString(askTo);
298                 Task sendTask = new FindSuccessorTask(host.getName(), this.mailbox, id);
299                 Msg.debug("Sending a 'Find Successor' request to " + mailbox + " for id " + id);
300                 try {
301                         sendTask.send(mailbox, Common.TIMEOUT);
302                         do {
303                                 if (commReceive == null) {
304                                         commReceive = Task.irecv(this.mailbox);
305                                 }
306                                 try {
307                                         commReceive.waitCompletion(Common.TIMEOUT);
308                                         Task task = commReceive.getTask();
309                                         if (task instanceof FindSuccessorAnswerTask) {
310                                                 //TODO: Check if this this our answer.
311                                                 FindSuccessorAnswerTask fTask = (FindSuccessorAnswerTask) task;
312                                                 stop = true;
313                                                 successor = fTask.answerId;
314                                         }
315                                         else {
316                                                 handleTask(task);
317                                         }
318                                         commReceive = null;
319                                 }
320                                 catch (TimeoutException e) {
321                                         stop = true;
322                                         commReceive = null;
323                                 }
324                         } while (!stop);
325                 }
326                 catch (TimeoutException e) {
327                         Msg.debug("Failed to send the 'Find Successor' request");
328                 }
329                 catch (MsgException e) {
330                         Msg.debug("Failed to receive Find Successor");
331                 }
332                 
333                 return successor;
334
335         }
336         /**
337          * @brief This function is called periodically. It checks the immediate
338          * successor of the current node.
339          */
340         void stabilize() {
341                 Msg.debug("Stabilizing node");
342                 int candidateId;
343                 int successorId = fingers[0];
344                 if (successorId != this.id){
345                         candidateId = remoteGetPredecessor(successorId);
346                 }
347                 else {
348                         candidateId = predId;
349                 }
350                 //This node is a candidate to become my new successor
351                 if (candidateId != -1 && isInInterval(candidateId, this.id + 1, successorId - 1)) {
352                         setFinger(0, candidateId);
353                 }
354                 if (successorId != this.id) {
355                         remoteNotify(successorId, this.id);
356                 }
357                 
358         }
359         /**
360          * \brief Notifies the current node that its predecessor may have changed.
361          * \param candidate_id the possible new predecessor
362          */
363         void notify(int predecessorCandidateId) {
364                 if (predId == -1 || isInInterval(predecessorCandidateId, predId + 1, this.id - 1 )) {
365                         setPredecessor(predecessorCandidateId);
366                 }
367                 else {
368                         //Don't have to change the predecessor.
369                 }
370         }
371         /**
372          * \brief Notifies a remote node that its predecessor may have changed.
373          * \param notify_id id of the node to notify
374          * \param candidate_id the possible new predecessor
375          */     
376         void remoteNotify(int notifyId, int predecessorCandidateId) {
377                 Msg.debug("Sending a 'Notify' request to " + notifyId);
378                 Task sentTask = new NotifyTask(host.getName(), this.mailbox, predecessorCandidateId);
379                 sentTask.dsend(Integer.toString(notifyId));
380         }
381         /**
382          * \brief This function is called periodically.
383          * It refreshes the finger table of the current node.
384          */
385         void fixFingers() {
386                 Msg.debug("Fixing fingers");
387                 int i = this.nextFingerToFix;
388                 int id = this.findSuccessor(this.id + (int)Math.pow(2,i)); //FIXME: SLOW
389                 if (id != -1) {
390                         if (id != fingers[i]) {
391                                 setFinger(i, id);
392                         }
393                         nextFingerToFix = (i + 1) % Common.NB_BITS;
394                 }
395         }
396         /**
397          * \brief This function is called periodically.
398          * It checks whether the predecessor has failed
399          */
400         void checkPredecessor() {
401                 //TODO
402         }
403         /**
404          * \brief Performs a find successor request to a random id.
405          */
406         void randomLookup() {
407                 int id = 1337;
408                 //Msg.info("Making a lookup request for id " + id);
409                 findSuccessor(id);
410         }
411         
412         
413
414         /**
415          * @brief Returns the closest preceding finger of an id
416          * with respect to the finger table of the current node.
417          * @param id the id to find
418          * \return the closest preceding finger of that id
419          */
420         int closestPrecedingNode(int id) {
421                 int i;
422                 for (i = Common.NB_BITS - 1; i >= 0; i--) {
423                         if (isInInterval(fingers[i], this.id + 1, id - 1)) {
424                                 return fingers[i];
425                         }
426                 }               
427                 return this.id;
428         }
429         /**
430          * @brief Returns whether an id belongs to the interval [start, end].
431          *
432          * The parameters are noramlized to make sure they are between 0 and nb_keys - 1).
433          * 1 belongs to [62, 3]
434          * 1 does not belong to [3, 62]
435          * 63 belongs to [62, 3]
436          * 63 does not belong to [3, 62]
437          * 24 belongs to [21, 29]
438          * 24 does not belong to [29, 21]
439          *
440          * \param id id to check
441          * \param start lower bound
442          * \param end upper bound
443          * \return a non-zero value if id in in [start, end]
444          */
445         static boolean isInInterval(int id, int start, int end) {
446                 id = normalize(id);
447                 start = normalize(start);
448                 end = normalize(end);
449                 
450                 // make sure end >= start and id >= start
451                 if (end < start) {
452                         end += Common.NB_KEYS;
453                 }
454                 if (id < start) {
455                         id += Common.NB_KEYS;
456                 }
457                 return (id <= end);
458         
459         }
460         /**
461          * @brief Turns an id into an equivalent id in [0, nb_keys).
462          * @param id an id
463          * @return the corresponding normalized id
464          */
465         static int normalize(int id) {
466                 return id & (Common.NB_KEYS - 1);
467         }
468         /**
469          * \brief Sets a finger of the current node.
470          * \param finger_index index of the finger to set (0 to nb_bits - 1)
471          * \param id the id to set for this finger
472          */
473         void setFinger(int fingerIndex, int id) {
474                 if (id != fingers[fingerIndex]) {
475                         fingers[fingerIndex] = id;
476                         lastChangeDate = Msg.getClock();
477                 }
478         }
479 }