Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add chord example for java
[simgrid.git] / examples / chord / Node.java
1 package chord;
2
3 import org.simgrid.msg.Comm;
4 import org.simgrid.msg.Host;
5 import org.simgrid.msg.Msg;
6 import org.simgrid.msg.MsgException;
7 import org.simgrid.msg.Process;
8 import org.simgrid.msg.Task;
9 import org.simgrid.msg.TimeoutException;
10 /**
11  * Node data
12  */
13 public class Node extends Process {
14         /**
15          * Node id
16          */
17         protected int id;
18         /**
19          * Node mailbox
20          */
21         protected String mailbox;
22         /**
23          * Predecessor id
24          */
25         protected int predId;
26         /**
27          * Predecessor mailbox
28          */
29         protected String predMailbox;
30         /**
31          * Index of the next finger to fix
32          */
33         protected int nextFingerToFix;
34         /**
35          * Current communication
36          */
37         protected Comm commReceive;
38         /**
39          * Last time I changed a finger or my predecessor
40          */
41         protected double lastChangeDate;
42         /**
43          * Node fingers
44          */
45         int fingers[];
46         /**
47          * Constructor
48          */
49         public Node(Host host, String name, String[] args) {
50                 super(host,name,args);
51         }
52         @Override
53         public void main(String[] args) throws MsgException {
54                 if (args.length != 2 && args.length != 4) {
55                         Msg.info("You need to provide 2 or 4 arguments.");
56                         return; 
57                 }
58                 double initTime = Msg.getClock();
59                 int i;
60                 boolean joinSuccess = false;
61                 double deadline;
62                 
63                 double nextStabilizeDate = initTime + Common.PERIODIC_STABILIZE_DELAY;
64                 double nextFixFingersDate = initTime + Common.PERIODIC_FIX_FINGERS_DELAY;
65                 double nextCheckPredecessorDate = initTime + Common.PERIODIC_CHECK_PREDECESSOR_DELAY;
66                 double nextLookupDate = initTime + Common.PERIODIC_LOOKUP_DELAY;
67                 
68                 id = Integer.valueOf(args[0]);
69                 mailbox = Integer.toString(id);
70
71                 fingers = new int[Common.NB_BITS];
72                 for (i = 0; i < Common.NB_BITS; i++) {
73                         fingers[i] = -1;
74                         setFinger(i,this.id);
75                 }
76                 
77                 //First node
78                 if (args.length == 2) {
79                         deadline = Integer.valueOf(args[1]);
80                         create();
81                         joinSuccess = true;
82                 }
83                 else {
84                         int knownId = Integer.valueOf(args[1]);
85                         deadline = Integer.valueOf(args[3]);
86                         //Msg.info("Hey! Let's join the system with the id " + id + ".");
87                         
88                         joinSuccess = join(knownId);
89                 }
90                 if (joinSuccess) {
91                         double currentClock = Msg.getClock();
92                         while (currentClock < (initTime + deadline) && currentClock < Common.MAX_SIMULATION_TIME) {
93                                 if (commReceive == null) {
94                                         commReceive = Task.irecv(this.mailbox);
95                                 }
96                                 try {
97                                         if (!commReceive.test()) {
98                                                 if (currentClock >= nextStabilizeDate) {
99                                                         stabilize();
100                                                         nextStabilizeDate = Msg.getClock() + Common.PERIODIC_STABILIZE_DELAY;
101                                                 }
102                                                 else if (currentClock >= nextFixFingersDate) {
103                                                         fixFingers();
104                                                         nextFixFingersDate = Msg.getClock() + Common.PERIODIC_FIX_FINGERS_DELAY;
105                                                 }
106                                                 else if (currentClock >= nextCheckPredecessorDate) {
107                                                         this.checkPredecessor();
108                                                         nextCheckPredecessorDate = Msg.getClock() + Common.PERIODIC_CHECK_PREDECESSOR_DELAY;
109                                                 }
110                                                 else if (currentClock >= nextLookupDate) {
111                                                         this.randomLookup();
112                                                         nextLookupDate = Msg.getClock() + Common.PERIODIC_LOOKUP_DELAY;
113                                                 }
114                                                 else {
115                                                         waitFor(5);
116                                                 }
117                                                 currentClock = Msg.getClock();
118                                         }
119                                         else {
120                                                 handleTask(commReceive.getTask());
121                                                 currentClock = Msg.getClock();
122                                                 commReceive = null;
123                                                 
124                                         }
125                                 }
126                                 catch (Exception e) {
127                                         
128                                 }
129                                 
130                         }
131                         leave();
132                         if (commReceive != null) {
133                                 commReceive = null;
134                         }
135                 }
136                 else {
137                         Msg.info("I couldn't join the ring");
138                 }
139         }
140         void handleTask(Task task) {
141                 if (task instanceof FindSuccessorTask) {
142                         FindSuccessorTask fTask = (FindSuccessorTask)task;
143                         Msg.debug("Receiving a 'Find Successor' request from " + fTask.issuerHostName + " for id " + fTask.requestId);
144                         // is my successor the successor?
145                         if (isInInterval(fTask.requestId, this.id + 1, fingers[0])) {
146                                 //Msg.info("Send the request to " + fTask.answerTo + " with answer " + fingers[0]);
147                                 FindSuccessorAnswerTask answer = new FindSuccessorAnswerTask(host.getName(), mailbox, fingers[0]);
148                                 answer.dsend(fTask.answerTo);
149                         }
150                         else {
151                         // otherwise, forward the request to the closest preceding finger in my table
152                                 int closest = closestPrecedingNode(fTask.requestId);
153                                 //Msg.info("Forward the request to " + closest);
154                                 fTask.dsend(Integer.toString(closest));
155                         }
156                 }
157                 else if (task instanceof GetPredecessorTask) {
158                         GetPredecessorTask gTask = (GetPredecessorTask)(task);
159                         Msg.debug("Receiving a 'Get Predecessor' request from " + gTask.issuerHostName);
160                         GetPredecessorAnswerTask answer = new GetPredecessorAnswerTask(host.getName(), mailbox, predId);
161                         answer.dsend(gTask.answerTo);
162                 }
163                 else if (task instanceof NotifyTask) {
164                         NotifyTask nTask = (NotifyTask)task;
165                         notify(nTask.requestId);
166                 }
167                 else {
168                         Msg.debug("Ignoring unexpected task of type:" + task);
169                 }
170         }
171         /**
172          * @brief Makes the current node quit the system
173          */
174         void leave() {
175                 Msg.debug("Well Guys! I Think it's time for me to quit ;)");
176                 quitNotify(1); //Notify my successor
177                 quitNotify(-1); //Notify my predecessor.
178                 // TODO ...
179         }
180         /**
181          * @brief Notifies the successor or the predecessor of the current node
182          * of the departure
183          * @param to 1 to notify the successor, -1 to notify the predecessor
184          */
185         static void quitNotify( int to) {
186                 //TODO
187         }
188         /**
189       * @brief Initializes the current node as the first one of the system.
190          */
191         void create() {
192                 Msg.debug("Create a new Chord ring...");
193                 setPredecessor(-1);
194                 
195         }
196         /**
197          * Makes the current node join the ring, knowing the id of a node
198          * already in the ring 
199          */
200         boolean join(int knownId) {
201                 Msg.info("Joining the ring with id " + this.id + " knowing node " + knownId);
202                 setPredecessor(-1);
203                 int successorId = remoteFindSuccessor(knownId, this.id);
204                 if (successorId == -1) {
205                         Msg.info("Cannot join the ring.");
206                 }
207                 else {
208                         setFinger(0, successorId);
209                 }
210                 return successorId != -1;
211         }
212         
213         /**
214          * Sets the node predecessor
215          */
216         void setPredecessor(int predecessorId) {
217                 if (predecessorId != predId) {
218                         predId = predecessorId;
219                         if (predecessorId != -1) {
220                                 predMailbox = Integer.toString(predId);
221                         }
222                         lastChangeDate = Msg.getClock();
223                 }
224         }
225         /**
226          * @brief Asks another node its predecessor.
227          * @param askTo the node to ask to
228          * @return the id of its predecessor node, or -1 if the request failed
229          * (or if the node does not know its predecessor)
230          */
231         int remoteGetPredecessor(int askTo) {
232                 int predecessorId = -1;
233                 boolean stop = false;
234                 Msg.debug("Sending a 'Get Predecessor' request to " + askTo);
235                 String mailboxTo = Integer.toString(askTo);
236                 GetPredecessorTask sendTask = new GetPredecessorTask(host.getName(), this.mailbox);
237                 try {
238                         sendTask.send(mailboxTo, Common.TIMEOUT);                       
239                         try {
240                                 do {
241                                         if (commReceive == null) {
242                                                 commReceive = Task.irecv(this.mailbox);
243                                         }
244                                         commReceive.waitCompletion(Common.TIMEOUT);
245                                         Task taskReceived = commReceive.getTask();
246                                         if (taskReceived instanceof GetPredecessorAnswerTask) {
247                                                 predecessorId = ((GetPredecessorAnswerTask) taskReceived).answerId;
248                                                 stop = true;
249                                         }
250                                         else {
251                                                 handleTask(taskReceived);
252                                         }
253                                         commReceive = null;                                     
254                                 } while (!stop);
255                 
256                         }
257                         catch (MsgException e) {
258                                 commReceive = null;     
259                                 stop = true;
260                         }
261                 }
262                 catch (MsgException e) {
263                         Msg.debug("Failed to send the Get Predecessor request");
264                 }
265                 
266                 
267                 return predecessorId;
268         }
269         /**
270          * @brief Makes the current node find the successor node of an id.
271          * @param node the current node
272          * @param id the id to find
273          * @return the id of the successor node, or -1 if the request failed
274          */
275         int findSuccessor(int id) {
276                 if (isInInterval(id, this.id + 1, fingers[0])) {
277                         return fingers[0];
278                 }
279                 
280                 int closest = this.closestPrecedingNode(id);
281                 return remoteFindSuccessor(closest, id);
282         }
283         /**
284          * @brief Asks another node the successor node of an id.
285          */
286         int remoteFindSuccessor(int askTo, int id) {
287                 int successor = -1;
288                 boolean stop = false;
289                 String mailbox = Integer.toString(askTo);
290                 Task sendTask = new FindSuccessorTask(host.getName(), this.mailbox, id);
291                 Msg.debug("Sending a 'Find Successor' request to " + mailbox + " for id " + id);
292                 try {
293                         sendTask.send(mailbox, Common.TIMEOUT);
294                         do {
295                                 if (commReceive == null) {
296                                         commReceive = Task.irecv(this.mailbox);
297                                 }
298                                 try {
299                                         commReceive.waitCompletion(Common.TIMEOUT);
300                                         Task task = commReceive.getTask();
301                                         if (task instanceof FindSuccessorAnswerTask) {
302                                                 //TODO: Check if this this our answer.
303                                                 FindSuccessorAnswerTask fTask = (FindSuccessorAnswerTask) task;
304                                                 stop = true;
305                                                 successor = fTask.answerId;
306                                         }
307                                         else {
308                                                 handleTask(task);
309                                         }
310                                         commReceive = null;
311                                 }
312                                 catch (TimeoutException e) {
313                                         stop = true;
314                                         commReceive = null;
315                                 }
316                         } while (!stop);
317                 }
318                 catch (TimeoutException e) {
319                         Msg.debug("Failed to send the 'Find Successor' request");
320                 }
321                 catch (MsgException e) {
322                         Msg.debug("Failed to receive Find Successor");
323                 }
324                 
325                 return successor;
326
327         }
328         /**
329          * @brief This function is called periodically. It checks the immediate
330          * successor of the current node.
331          */
332         void stabilize() {
333                 Msg.debug("Stabilizing node");
334                 int candidateId;
335                 int successorId = fingers[0];
336                 if (successorId != this.id){
337                         candidateId = remoteGetPredecessor(successorId);
338                 }
339                 else {
340                         candidateId = predId;
341                 }
342                 //This node is a candidate to become my new successor
343                 if (candidateId != -1 && isInInterval(candidateId, this.id + 1, successorId - 1)) {
344                         setFinger(0, candidateId);
345                 }
346                 if (successorId != this.id) {
347                         remoteNotify(successorId, this.id);
348                 }
349                 
350         }
351         /**
352          * \brief Notifies the current node that its predecessor may have changed.
353          * \param candidate_id the possible new predecessor
354          */
355         void notify(int predecessorCandidateId) {
356                 if (predId == -1 || isInInterval(predecessorCandidateId, predId + 1, this.id - 1 )) {
357                         setPredecessor(predecessorCandidateId);
358                 }
359                 else {
360                         //Don't have to change the predecessor.
361                 }
362         }
363         /**
364          * \brief Notifies a remote node that its predecessor may have changed.
365          * \param notify_id id of the node to notify
366          * \param candidate_id the possible new predecessor
367          */     
368         void remoteNotify(int notifyId, int predecessorCandidateId) {
369                 Msg.debug("Sending a 'Notify' request to " + notifyId);
370                 Task sentTask = new NotifyTask(host.getName(), this.mailbox, predecessorCandidateId);
371                 sentTask.dsend(Integer.toString(notifyId));
372         }
373         /**
374          * \brief This function is called periodically.
375          * It refreshes the finger table of the current node.
376          */
377         void fixFingers() {
378                 Msg.debug("Fixing fingers");
379                 int i = this.nextFingerToFix;
380                 int id = this.findSuccessor(this.id + (int)Math.pow(2,i)); //FIXME: SLOW
381                 if (id != -1) {
382                         if (id != fingers[i]) {
383                                 setFinger(i, id);
384                         }
385                         nextFingerToFix = (i + 1) % Common.NB_BITS;
386                 }
387         }
388         /**
389          * \brief This function is called periodically.
390          * It checks whether the predecessor has failed
391          */
392         void checkPredecessor() {
393                 //TODO
394         }
395         /**
396          * \brief Performs a find successor request to a random id.
397          */
398         void randomLookup() {
399                 int id = 1337;
400                 //Msg.info("Making a lookup request for id " + id);
401                 findSuccessor(id);
402         }
403         
404         
405
406         /**
407          * @brief Returns the closest preceding finger of an id
408          * with respect to the finger table of the current node.
409          * @param id the id to find
410          * \return the closest preceding finger of that id
411          */
412         int closestPrecedingNode(int id) {
413                 int i;
414                 for (i = Common.NB_BITS - 1; i >= 0; i--) {
415                         if (isInInterval(fingers[i], this.id + 1, id - 1)) {
416                                 return fingers[i];
417                         }
418                 }               
419                 return this.id;
420         }
421         /**
422          * @brief Returns whether an id belongs to the interval [start, end].
423          *
424          * The parameters are noramlized to make sure they are between 0 and nb_keys - 1).
425          * 1 belongs to [62, 3]
426          * 1 does not belong to [3, 62]
427          * 63 belongs to [62, 3]
428          * 63 does not belong to [3, 62]
429          * 24 belongs to [21, 29]
430          * 24 does not belong to [29, 21]
431          *
432          * \param id id to check
433          * \param start lower bound
434          * \param end upper bound
435          * \return a non-zero value if id in in [start, end]
436          */
437         static boolean isInInterval(int id, int start, int end) {
438                 id = normalize(id);
439                 start = normalize(start);
440                 end = normalize(end);
441                 
442                 // make sure end >= start and id >= start
443                 if (end < start) {
444                         end += Common.NB_KEYS;
445                 }
446                 if (id < start) {
447                         id += Common.NB_KEYS;
448                 }
449                 return (id <= end);
450         
451         }
452         /**
453          * @brief Turns an id into an equivalent id in [0, nb_keys).
454          * @param id an id
455          * @return the corresponding normalized id
456          */
457         static int normalize(int id) {
458                 return id & (Common.NB_KEYS - 1);
459         }
460         /**
461          * \brief Sets a finger of the current node.
462          * \param finger_index index of the finger to set (0 to nb_bits - 1)
463          * \param id the id to set for this finger
464          */
465         void setFinger(int fingerIndex, int id) {
466                 if (id != fingers[fingerIndex]) {
467                         fingers[fingerIndex] = id;
468                         lastChangeDate = Msg.getClock();
469                 }
470         }
471 }