Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
25124b6408b258d07b4a84c11b8f014ac641d35f
[simgrid.git] / examples / java / dht / kademlia / Node.java
1 /* Copyright (c) 2012-2014, 2016. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 package dht.kademlia;
8
9 import org.simgrid.msg.Host;
10
11 import org.simgrid.msg.Msg;
12 import org.simgrid.msg.Comm;
13 import org.simgrid.msg.Task;
14 import org.simgrid.msg.Process;
15 import org.simgrid.msg.MsgException;
16
17 public class Node extends Process {
18   protected int id;
19   protected RoutingTable table;
20   protected int deadline;
21   protected int findNodeSuccedded = 0;
22   protected int findNodeFailed = 0;
23   protected Comm comm;
24
25   public Node(Host host, String name, String[]args) {
26     super(host,name,args);
27   }
28
29   @Override
30   public void main(String[] args) throws MsgException {
31     //Check the number of arguments.
32     if (args.length != 2 && args.length != 3) {
33       Msg.info("Wrong argument count.");
34       return;
35     }
36     this.id = Integer.valueOf(args[0]);
37     this.table = new RoutingTable(this.id);
38
39     if (args.length == 3) {
40       this.deadline = Integer.valueOf(args[2]).intValue();
41       Msg.info("Hi, I'm going to join the network with the id " + id + "!");
42       if (joinNetwork(Integer.valueOf(args[1]))) {
43         this.mainLoop();
44       }
45       else {
46         Msg.info("I couldn't join the network :(");
47       }
48     }
49     else {
50       this.deadline = Integer.valueOf(args[1]).intValue();
51       Msg.info("Hi, I'm going to create the network with the id " + id + "!");
52       table.update(this.id);
53       this.mainLoop();
54     }
55     Msg.debug("I'm leaving the network");
56     Msg.debug("Here is my routing table:" + table);
57   }
58
59   public void mainLoop() {
60     double next_lookup_time = Msg.getClock() + Common.RANDOM_LOOKUP_INTERVAL;
61     while (Msg.getClock() < this.deadline) {
62       try {
63         if (comm == null) {
64           comm = Task.irecv(Integer.toString(id));
65         }
66         if (!comm.test()) {
67           if (Msg.getClock() >= next_lookup_time) {
68             randomLookup();
69             next_lookup_time += Common.RANDOM_LOOKUP_INTERVAL;
70           } else {
71             waitFor(1);
72           }
73         } else {
74           Task task = comm.getTask();
75           handleTask(task);
76           comm = null;
77         }
78       }
79       catch (Exception e) {
80       }
81     }
82     Msg.info(findNodeSuccedded + "/"  + (findNodeSuccedded + findNodeFailed) + " FIND_NODE have succedded.");
83   }
84
85   /**
86    * @brief Try to make the node join the network
87    * @param idKnown Id of someone we know in the system
88    */
89   public boolean joinNetwork(int idKnown) {
90     boolean answerGot = false;
91     double timeBegin = Msg.getClock();
92     Msg.debug("Joining the network knowing " + idKnown);
93     //Add ourselves and the node we know to our routing table
94     table.update(this.id);
95     table.update(idKnown);
96     //Send a "FIND_NODE" to the node we know.
97     sendFindNode(idKnown,this.id);
98     //Wait for the answer.
99     int trials = 0;
100
101     do {
102       try {
103         if (comm == null) {
104           comm = Task.irecv(Integer.toString(id));
105         }
106         if (comm != null) {
107           if (!comm.test()) {
108             waitFor(1);
109           } else {
110             Task task = comm.getTask();
111             if (task instanceof FindNodeAnswerTask) {
112               answerGot = true;
113               //Retrieve the node list and ping them
114               FindNodeAnswerTask answerTask = (FindNodeAnswerTask)task;
115               Answer answer = answerTask.getAnswer();
116               answerGot = true;
117               //answersGotten++;
118               if (answer.getDestinationId() == this.id) {
119                 //Ping everyone in the list
120                 for (Contact c : answer.getNodes()) {
121                   table.update(c.getId());
122                 }
123               }
124             } else {
125               handleTask(task);
126             }
127             comm = null;
128           }
129         }
130       }
131       catch (Exception ex) {
132         trials++;
133         Msg.info("FIND_NODE failed");
134       }
135     } while (!answerGot && trials < Common.MAX_JOIN_TRIALS);
136     /* Second step: Send a FIND_NODE in a node in each bucket */
137     int bucketId = table.findBucket(idKnown).getId();
138     for (int i = 0; ((bucketId - i) > 0 || 
139        (bucketId + i) <= Common.IDENTIFIER_SIZE) && 
140        i < Common.JOIN_BUCKETS_QUERIES; i++) {
141       if (bucketId - i > 0) {
142         int idInBucket = table.getIdInPrefix(this.id,bucketId - i);
143         this.findNode(idInBucket,false);
144       }
145       if (bucketId + i <= Common.IDENTIFIER_SIZE) {
146         int idInBucket = table.getIdInPrefix(this.id,bucketId + i);
147         findNode(idInBucket,false);
148       }
149     }
150     Msg.debug("Time spent:" + (Msg.getClock() - timeBegin));
151     return answerGot;
152   }
153
154   /* Send a request to find a node in the node's routing table. */
155   public boolean findNode(int destination, boolean counts) {
156     int queries;
157     int answers;
158     int nodesAdded = 0;
159     boolean destinationFound = false;
160     int steps = 0;
161     double timeBeginReceive;
162     double timeout;
163     double globalTimeout = Msg.getClock() + Common.FIND_NODE_GLOBAL_TIMEOUT;
164     //Build a list of the closest nodes we already know.
165     Answer nodeList = table.findClosest(destination);
166     Msg.verb("Doing a FIND_NODE on " + destination);
167     do {
168       timeBeginReceive = Msg.getClock();
169       answers = 0;
170       queries = this.sendFindNodeToBest(nodeList);
171       nodesAdded = 0;
172       timeout = Msg.getClock() + Common.FIND_NODE_TIMEOUT;
173       steps++;
174       do {
175         try {
176           if (comm == null) {
177             comm = Task.irecv(Integer.toString(id));
178           }
179           if (!comm.test()) {
180             waitFor(1);
181           } else {
182             Task task = comm.getTask();  
183             if (task instanceof FindNodeAnswerTask) {
184               FindNodeAnswerTask answerTask = (FindNodeAnswerTask)task;
185               //Check if we received what we are looking for.
186               if (answerTask.getDestinationId() == destination) {
187                 table.update(answerTask.getSenderId());
188                 //Add the answer to our routing table
189                 for (Contact c: answerTask.getAnswer().getNodes()) {
190                   table.update(c.getId());
191                 }
192                 answers++;
193                 
194                 nodesAdded = nodeList.merge(answerTask.getAnswer());
195               } else {
196               /* If it's not our answer, we answer to the node that has queried us anyway */
197                 handleTask(task);
198                 //Update the timeout if it's not our answer.
199                 timeout += Msg.getClock() - timeBeginReceive;
200                 timeBeginReceive = Msg.getClock();
201               }
202             } else {
203               handleTask(task);
204               timeout += Msg.getClock() - timeBeginReceive;
205               timeBeginReceive = Msg.getClock();
206             }
207             comm = null;
208           }
209         }
210         catch (Exception e) {
211           comm = null;
212         }
213       } while (answers < queries && Msg.getClock() < timeout);
214       destinationFound = nodeList.destinationFound();
215     } while (!destinationFound && (nodesAdded > 0 || answers == 0) && Msg.getClock() < globalTimeout 
216              && steps < Common.MAX_STEPS);
217
218     if (destinationFound) {
219       if (counts) {
220         findNodeSuccedded++;
221       }
222       Msg.debug("Find node on " + destination + " succedded");
223     } else {
224       Msg.debug("Find node on " + destination + " failed");
225       Msg.debug("Queried " + queries + " nodes to find "  + destination);
226       Msg.debug(nodeList.toString());
227       if (counts) {
228         findNodeFailed++;
229       }
230     }
231     return destinationFound;
232   }
233
234   /**
235    * @brief Sends a "PING" request to a node
236    * @param destination Ping destination id.
237    */
238   public void ping(int destination) {
239     boolean destinationFound = false;
240     double timeout = Msg.getClock() + Common.PING_TIMEOUT;
241     PingTask pingTask = new PingTask(this.id);
242     /* Sending the ping task */
243     pingTask.dsend(Integer.toString(destination));
244     do {
245       try {
246         Task task = Task.receive(Integer.toString(this.id),Common.PING_TIMEOUT);
247         if (task instanceof PingAnswerTask) {
248           PingAnswerTask answerTask = (PingAnswerTask)task;
249           if (answerTask.getSenderId() == destination) {
250             this.table.update(destination);
251             destinationFound = true;
252           } else {
253             handleTask(task);
254           }
255         } else {
256           handleTask(task);
257         }
258         waitFor(1);
259       }
260       catch (Exception ex) {
261       }
262     } while (Msg.getClock() < timeout && !destinationFound);
263   }
264
265   /**
266    * @brief Sends a "FIND_NODE" request (task) to a node we know.
267    * @param id Id of the node we are querying
268    * @param destination id of the node we are trying to find.
269    */
270   public void sendFindNode(int id, int destination) {
271     Msg.debug("Sending a FIND_NODE to " + Integer.toString(id) + " to find " + destination  );
272     FindNodeTask task = new FindNodeTask(this.id,destination);
273     task.dsend(Integer.toString(id));
274   }
275
276   /** Sends a "FIND_NODE" request to the best "alpha" nodes in a node list */
277   public int sendFindNodeToBest(Answer nodeList) {
278     int destination = nodeList.getDestinationId();
279     int i;
280     for (i = 0; i < Common.alpha && i < nodeList.size(); i++) {
281       Contact node = nodeList.getNodes().get(i);
282       if (node.getId() != this.id) {
283         this.sendFindNode(node.getId(),destination);
284       }
285     }
286     return i;
287   }
288
289   public void randomLookup() {
290     findNode(0,true);
291   }
292
293   /**
294    * @brief Handles an incomming task
295    * @param task The task we need to handle
296    */
297   public void handleTask(Task task) {
298     if (task instanceof KademliaTask) {
299       table.update(((KademliaTask) task).getSenderId());
300       if (task instanceof FindNodeTask) {
301         handleFindNode((FindNodeTask)task);
302       }
303       else if (task instanceof PingTask) {
304         handlePing((PingTask)task);
305       }
306     }
307   }
308
309   public void handleFindNode(FindNodeTask task) {
310     Msg.debug("Received a FIND_NODE from " + task.getSenderId());
311     Answer answer = table.findClosest(task.getDestination());
312     FindNodeAnswerTask taskToSend = new FindNodeAnswerTask(this.id,task.getDestination(),answer);
313     taskToSend.dsend(Integer.toString(task.getSenderId()));
314   }
315
316   public void handlePing(PingTask task) {
317     Msg.debug("Received a PING from " + task.getSenderId());
318     PingAnswerTask taskToSend = new PingAnswerTask(this.id);
319     taskToSend.dsend(Integer.toString(task.getSenderId()));
320   }
321 }