Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
486b78862bd686a791f5027b96b5a369d05550af
[simgrid.git] / simgrid-java / examples / kademlia / Node.java
1 /* Copyright (c) 2012. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6 package kademlia;
7
8 import org.simgrid.msg.Host;
9
10 import org.simgrid.msg.Comm;
11 import org.simgrid.msg.Msg;
12 import org.simgrid.msg.MsgException;
13 import org.simgrid.msg.Process;
14 import org.simgrid.msg.Task;
15 /**
16  * Main class of the simulation, contains the logic of a node.
17  */
18 public class Node extends Process {
19         /**
20           * Id in the network.
21          */
22         protected int id;
23         /**
24          * Routing table
25          */
26         protected RoutingTable table;
27         /**
28          * Deadline
29          */
30         protected int deadline;
31         /**
32          * FIND_NODE which have succeeded.
33          */
34         protected int findNodeSuccedded = 0;
35         /**
36          * FIND_NODE which have failed
37          */
38         protected int findNodeFailed = 0;
39         
40         protected Comm comm;
41
42         public Node(Host host, String name, String[]args) {
43                 super(host,name,args);
44         }
45         
46         @Override
47         public void main(String[] args) throws MsgException {
48                 //Check the number of arguments.
49                 if (args.length != 2 && args.length != 3) {
50                         Msg.info("Wrong argument count.");
51                         return;
52                 }
53                 this.id = Integer.valueOf(args[0]);
54                 this.table = new RoutingTable(this.id);
55                 
56                 if (args.length == 3) {
57                         this.deadline = Integer.valueOf(args[2]).intValue();
58                         Msg.info("Hi, I'm going to join the network with the id " + id + "!");
59                         if (joinNetwork(Integer.valueOf(args[1]))) {
60                                 this.mainLoop();
61                         } 
62                         else {
63                                 Msg.info("I couldn't join the network :(");
64                         }
65                 }
66                 else {
67                         this.deadline = Integer.valueOf(args[1]).intValue();
68                         Msg.info("Hi, I'm going to create the network with the id " + id + "!");
69                         table.update(this.id);
70                         this.mainLoop();
71                 }               
72                 Msg.debug("I'm leaving the network");
73                 Msg.debug("Here is my routing table:" + table);
74         }
75         /**
76          * Node main loop
77          */
78         public void mainLoop() {
79                 double next_lookup_time = Msg.getClock() + Common.RANDOM_LOOKUP_INTERVAL;
80                 while (Msg.getClock() < this.deadline) {
81                         try {
82                                 if (comm == null) {
83                                         comm = Task.irecv(Integer.toString(id));
84                                 }
85                                 if (!comm.test()) {
86                                         if (Msg.getClock() >= next_lookup_time) {
87                                                 randomLookup();
88                                                 next_lookup_time += Common.RANDOM_LOOKUP_INTERVAL;
89                                         }
90                                         else {
91                                                 waitFor(1);
92                                         }                                               
93                                 }
94                                 else {
95                                         Task task = comm.getTask();
96                                         handleTask(task);
97                                         comm = null;
98                                 }
99                         }
100                         catch (Exception e) {
101                                 
102                         }
103                 }
104                 Msg.info(findNodeSuccedded + "/"  + (findNodeSuccedded + findNodeFailed) + " FIND_NODE have succedded.");
105         }
106         /**
107          * @brief Try to make the node join the network
108          * @param idKnown Id of someone we know in the system
109          */
110         public boolean joinNetwork(int idKnown) {
111                 boolean answerGot = false;
112                 double timeBegin = Msg.getClock();
113                 Msg.debug("Joining the network knowing " + idKnown);
114                 //Add ourselves and the node we know to our routing table
115                 table.update(this.id);
116                 table.update(idKnown);
117                 //Send a "FIND_NODE" to the node we know.
118                 sendFindNode(idKnown,this.id);
119                 //Wait for the answer.
120                 int trials = 0;
121
122                 do {
123                         try {
124                                 if (comm == null) {
125                                         comm = Task.irecv(Integer.toString(id));
126                                 }
127                                 if (comm != null) {
128                                         if (!comm.test()) {
129                                                 waitFor(1);
130                                         }
131                                         else {
132                                                 Task task = comm.getTask();
133                                                 if (task instanceof FindNodeAnswerTask) {
134                                                         answerGot = true;
135                                                         //Retrieve the node list and ping them
136                                                         FindNodeAnswerTask answerTask = (FindNodeAnswerTask)task;
137                                                         Answer answer = answerTask.getAnswer();
138                                                         answerGot = true;
139                                                         //answersGotten++;
140                                                         if (answer.getDestinationId() == this.id) {
141                                                                 //Ping everyone in the list
142                                                                 for (Contact c : answer.getNodes()) {
143                                                                         table.update(c.getId());
144                                                                 }                                               
145                                                         }
146                                                 }
147                                                 else {
148                                                         handleTask(task);
149                                                 }
150                                                 comm = null;
151                                         }
152                                 }
153
154                         }
155                         catch (Exception ex) {
156                                 trials++;
157                                 Msg.info("FIND_NODE failed");
158                         }
159                 } while (!answerGot && trials < Common.MAX_JOIN_TRIALS);
160                 /* Second step: Send a FIND_NODE in a node in each bucket */
161                 int bucketId = table.findBucket(idKnown).getId();
162                 for (int i = 0; ((bucketId - i) > 0 || 
163                          (bucketId + i) <= Common.IDENTIFIER_SIZE) && 
164                          i < Common.JOIN_BUCKETS_QUERIES; i++) {
165                         if (bucketId - i > 0) {
166                                 int idInBucket = table.getIdInPrefix(this.id,bucketId - i);
167                                 this.findNode(idInBucket,false);
168                         }
169                         if (bucketId + i <= Common.IDENTIFIER_SIZE) {
170                                 int idInBucket = table.getIdInPrefix(this.id,bucketId + i);                             
171                                 findNode(idInBucket,false);
172                         }
173                 }
174                 Msg.debug("Time spent:" + (Msg.getClock() - timeBegin));
175                 return answerGot;
176         }
177         /**
178          * Send a request to find a node in the node's routing table.
179          */
180         public boolean findNode(int destination, boolean counts) {
181                 int queries, answers;
182                 int nodesAdded = 0;
183                 boolean destinationFound = false;
184                 int steps = 0;
185                 double timeBeginReceive;
186                 double timeout, globalTimeout = Msg.getClock() + Common.FIND_NODE_GLOBAL_TIMEOUT;
187                 //Build a list of the closest nodes we already know.
188                 Answer nodeList = table.findClosest(destination);
189                 Msg.verb("Doing a FIND_NODE on " + destination);
190                 do {
191                         timeBeginReceive = Msg.getClock();
192                         answers = 0;
193                         queries = this.sendFindNodeToBest(nodeList);
194                         nodesAdded = 0;
195                         timeout = Msg.getClock() + Common.FIND_NODE_TIMEOUT;
196                         steps++;
197                         do {
198                                 try {
199                                         if (comm == null) {
200                                                 comm = Task.irecv(Integer.toString(id));
201                                         }
202                                         if (!comm.test()) {
203                                                 waitFor(1);
204                                         }
205                                         else {
206                                                 Task task = comm.getTask();     
207                                                 if (task instanceof FindNodeAnswerTask) {
208                                                         FindNodeAnswerTask answerTask = (FindNodeAnswerTask)task;
209                                                         //Check if we received what we are looking for.
210                                                         if (answerTask.getDestinationId() == destination) {
211                                                                 table.update(answerTask.getSenderId());
212                                                                 //Add the answer to our routing table
213                                                                 for (Contact c: answerTask.getAnswer().getNodes()) {
214                                                                         table.update(c.getId());
215                                                                 }
216                                                                 answers++;
217                                                                 
218                                                                 nodesAdded = nodeList.merge(answerTask.getAnswer());                                                    
219                                                         }
220                                                         /* If it's not our answer, we answer to the node that
221                                                          * has queried us anyway
222                                                          */
223                                                         else {
224                                                                 handleTask(task);
225                                                                 //Update the timeout if it's not our answer.
226                                                                 timeout += Msg.getClock() - timeBeginReceive;
227                                                                 timeBeginReceive = Msg.getClock();
228                                                         }
229                                                 }
230                                                 else {
231                                                         handleTask(task);
232                                                         timeout += Msg.getClock() - timeBeginReceive;
233                                                         timeBeginReceive = Msg.getClock();
234                                                 }
235                                                 comm = null;
236                                         }
237                                 }
238                                 catch (Exception e) {
239                                         comm = null;
240                                 }
241                         } while (answers < queries && Msg.getClock() < timeout);
242                         destinationFound = nodeList.destinationFound();
243                 } while (!destinationFound && (nodesAdded > 0 || answers == 0) && Msg.getClock() < globalTimeout && steps < Common.MAX_STEPS);
244                 
245                 if (destinationFound) {
246                         if (counts) {
247                                 findNodeSuccedded++;
248                         }
249                         Msg.debug("Find node on " + destination + " succedded");
250                 }
251                 else {
252                         Msg.debug("Find node on " + destination + " failed");
253                         Msg.debug("Queried " + queries + " nodes to find "  + destination);
254                         Msg.debug(nodeList.toString());
255                         if (counts) {
256                                 findNodeFailed++;
257                         }
258                 }
259                 return destinationFound;
260         }
261         /**
262          * Sends a "PING" request to a node
263          * @param destination Ping destination id.
264          */
265         public void ping(int destination) {
266                 boolean destinationFound = false;
267                 double timeout = Msg.getClock() + Common.PING_TIMEOUT;
268                 PingTask pingTask = new PingTask(this.id);
269                 /* Sending the ping task */
270                 pingTask.dsend(Integer.toString(destination));
271                 do
272                 {
273                         try {
274                                 Task task = Task.receive(Integer.toString(this.id),Common.PING_TIMEOUT);
275                                 if (task instanceof PingAnswerTask) {
276                                         PingAnswerTask answerTask = (PingAnswerTask)task;
277                                         if (answerTask.getSenderId() == destination) {
278                                                 this.table.update(destination);
279                                                 destinationFound = true;
280                                         }
281                                         else {
282                                                 handleTask(task);
283                                         }
284                                 }
285                                 else {
286                                         handleTask(task);
287                                 }
288                                 waitFor(1);
289                         }
290                         catch (Exception ex) {
291                         }
292                 } while (Msg.getClock() < timeout && !destinationFound);
293         }
294         /**
295          * Sends a "FIND_NODE" request (task) to a node we know.
296          * @brief id Id of the node we are querying
297          * @brief destination id of the node we are trying to find.
298          */
299         public void sendFindNode(int id, int destination) {
300                 Msg.debug("Sending a FIND_NODE to " + Integer.toString(id) + " to find " + destination  );
301                 FindNodeTask task = new FindNodeTask(this.id,destination);
302                 task.dsend(Integer.toString(id));
303         }
304         /**
305          * Sends a "FIND_NODE" request to the best "alpha" nodes in a node
306          * list
307          */
308         public int sendFindNodeToBest(Answer nodeList) {
309                 int destination = nodeList.getDestinationId();
310                 int i;
311                 for (i = 0; i < Common.alpha && i < nodeList.size(); i++) {
312                         Contact node = nodeList.getNodes().get(i);
313                         if (node.getId() != this.id) {
314                                 this.sendFindNode(node.getId(),destination);
315                         }
316                 }
317                 return i;
318         }
319         /**
320          * Does a random lookup
321          */
322         public void randomLookup() {
323                 findNode(0,true);
324         }
325         /**
326          * Handles an incomming task
327          * @param task The task we need to handle
328          */
329         public void handleTask(Task task) {
330                 if (task instanceof KademliaTask) {
331                         table.update(((KademliaTask) task).getSenderId());
332                         if (task instanceof FindNodeTask) {
333                                 handleFindNode((FindNodeTask)task);
334                         }
335                         else if (task instanceof PingTask) {
336                                 handlePing((PingTask)task);
337                         }
338                 }
339         }
340         public void handleFindNode(FindNodeTask task) {
341                 Msg.debug("Received a FIND_NODE from " + task.getSenderId());
342                 Answer answer = table.findClosest(task.getDestination());
343                 FindNodeAnswerTask taskToSend = new FindNodeAnswerTask(this.id,task.getDestination(),answer);
344                 taskToSend.dsend(Integer.toString(task.getSenderId()));
345         }
346         public void handlePing(PingTask task) {
347                 Msg.debug("Received a PING from " + task.getSenderId());
348                 PingAnswerTask taskToSend = new PingAnswerTask(this.id);
349                 taskToSend.dsend(Integer.toString(task.getSenderId()));
350         }
351 }