Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
cosmetics: rename a function, deprecate old name
[simgrid.git] / examples / s4u / dht-kademlia / node.cpp
1 /* Copyright (c) 2010, 2012-2018. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "node.hpp"
8 #include "routing_table.hpp"
9
10 XBT_LOG_NEW_DEFAULT_CATEGORY(kademlia_node, "Messages specific for this example");
11
12 namespace kademlia {
13 static void destroy(void* message)
14 {
15   Message* msg = static_cast<Message*>(message);
16   delete msg->answer_;
17   delete msg;
18 }
19
20 /**
21   * @brief Tries to join the network
22   * @param known_id id of the node I know in the network.
23   */
24 bool Node::join(unsigned int known_id)
25 {
26   Answer* node_list;
27   unsigned int i;
28   bool got_answer = false;
29
30   /* Add the guy we know to our routing table and ourselves. */
31   routingTableUpdate(id_);
32   routingTableUpdate(known_id);
33
34   /* First step: Send a "FIND_NODE" request to the node we know */
35   sendFindNode(known_id, id_);
36
37   simgrid::s4u::MailboxPtr mailbox = simgrid::s4u::Mailbox::byName(std::to_string(id_));
38   do {
39     if (receive_comm == nullptr)
40       receive_comm = mailbox->get_async(&received_msg);
41     if (receive_comm->test()) {
42       XBT_DEBUG("Received an answer from the node I know.");
43       got_answer = true;
44       // retrieve the node list and ping them.
45       Message* msg = static_cast<Message*>(received_msg);
46       node_list    = msg->answer_;
47       if (node_list) {
48         for (auto contact : node_list->nodes)
49           routingTableUpdate(contact.first);
50       } else {
51         handleFindNode(msg);
52       }
53       delete msg->answer_;
54       delete msg;
55       receive_comm = nullptr;
56     } else
57       simgrid::s4u::this_actor::sleep_for(1);
58   } while (not got_answer);
59
60   /* Second step: Send a FIND_NODE to a a random node in buckets */
61   unsigned int bucket_id = table->findBucket(known_id)->getId();
62   xbt_assert(bucket_id <= identifier_size);
63   for (i = 0; ((bucket_id > i) || (bucket_id + i) <= identifier_size) && i < JOIN_BUCKETS_QUERIES; i++) {
64     if (bucket_id > i) {
65       unsigned int id_in_bucket = get_id_in_prefix(id_, bucket_id - i);
66       findNode(id_in_bucket, false);
67     }
68     if (bucket_id + i <= identifier_size) {
69       unsigned int id_in_bucket = get_id_in_prefix(id_, bucket_id + i);
70       findNode(id_in_bucket, false);
71     }
72   }
73   return got_answer;
74 }
75
76 /** @brief Send a "FIND_NODE" to a node
77   * @param id node we are querying
78   * @param destination node we are trying to find.
79   */
80 void Node::sendFindNode(unsigned int id, unsigned int destination)
81 {
82   /* Gets the mailbox to send to */
83   simgrid::s4u::MailboxPtr mailbox = simgrid::s4u::Mailbox::byName(std::to_string(id));
84   /* Build the task */
85   Message* msg = new Message(id_, destination, simgrid::s4u::Mailbox::byName(std::to_string(id_)),
86                              simgrid::s4u::Host::current()->getCname());
87
88   /* Send the task */
89   mailbox->put_init(msg, 1)->detach(kademlia::destroy);
90   XBT_VERB("Asking %u for its closest nodes", id);
91 }
92
93 /**
94   * Sends to the best "kademlia_alpha" nodes in the "node_list" array a "FIND_NODE" request, to ask them for their best
95   * nodes
96   */
97 unsigned int Node::sendFindNodeToBest(Answer* node_list)
98 {
99   unsigned int i           = 0;
100   unsigned int j           = 0;
101   unsigned int destination = node_list->getDestinationId();
102   for (auto node_to_query : node_list->nodes) {
103     /* We need to have at most "kademlia_alpha" requests each time, according to the protocol */
104     /* Gets the node we want to send the query to */
105     if (node_to_query.first != id_) { /* No need to query ourselves */
106       sendFindNode(node_to_query.first, destination);
107       j++;
108     }
109     i++;
110     if (j == kademlia_alpha)
111       break;
112   }
113   return i;
114 }
115
116 /** @brief Updates/Puts the node id unsigned into our routing table
117   * @param id The id of the node we need to add unsigned into our routing table
118   */
119 void Node::routingTableUpdate(unsigned int id)
120 {
121   // retrieval of the bucket in which the should be
122   Bucket* bucket = table->findBucket(id);
123
124   // check if the id is already in the bucket.
125   auto id_pos = std::find(bucket->nodes.begin(), bucket->nodes.end(), id);
126
127   if (id_pos == bucket->nodes.end()) {
128     /* We check if the bucket is full or not. If it is, we evict an old element */
129     if (bucket->nodes.size() >= BUCKET_SIZE) {
130       bucket->nodes.pop_back();
131     }
132     bucket->nodes.push_front(id);
133     XBT_VERB("I'm adding to my routing table %08x", id);
134   } else {
135     // We push the element to the front
136     bucket->nodes.erase(id_pos);
137     bucket->nodes.push_front(id);
138     XBT_VERB("I'm updating %08x", id);
139   }
140 }
141
142 /** @brief Finds the closest nodes to the node given.
143   * @param node : our node
144   * @param destination_id : the id of the guy we are trying to find
145   */
146 Answer* Node::findClosest(unsigned int destination_id)
147 {
148   Answer* answer = new Answer(destination_id);
149   /* We find the corresponding bucket for the id */
150   Bucket* bucket = table->findBucket(destination_id);
151   int bucket_id  = bucket->getId();
152   xbt_assert((bucket_id <= identifier_size), "Bucket found has a wrong identifier");
153   /* So, we copy the contents of the bucket unsigned into our answer */
154   answer->addBucket(bucket);
155
156   /* However, if we don't have enough elements in our bucket, we NEED to include at least "bucket_size" elements
157    * (if, of course, we know at least "bucket_size" elements. So we're going to look unsigned into the other buckets.
158    */
159   for (int i = 1; answer->getSize() < BUCKET_SIZE && ((bucket_id - i > 0) || (bucket_id + i < identifier_size)); i++) {
160     /* We check the previous buckets */
161     if (bucket_id - i >= 0) {
162       Bucket* bucket_p = table->buckets[bucket_id - i];
163       answer->addBucket(bucket_p);
164     }
165     /* We check the next buckets */
166     if (bucket_id + i <= identifier_size) {
167       Bucket* bucket_n = table->buckets[bucket_id + i];
168       answer->addBucket(bucket_n);
169     }
170   }
171   /* We trim the array to have only bucket_size or less elements */
172   std::sort(answer->nodes.begin(), answer->nodes.end(), sortbydistance);
173   answer->trim();
174
175   return answer;
176 }
177
178 /** @brief Send a request to find a node in the node routing table.
179   * @param id_to_find the id of the node we are trying to find
180   */
181 bool Node::findNode(unsigned int id_to_find, bool count_in_stats)
182 {
183   unsigned int queries;
184   unsigned int answers;
185   bool destination_found   = false;
186   unsigned int nodes_added = 0;
187   double global_timeout    = simgrid::s4u::Engine::getClock() + find_node_global_timeout;
188   unsigned int steps       = 0;
189
190   /* First we build a list of who we already know */
191   Answer* node_list = findClosest(id_to_find);
192   xbt_assert((node_list != nullptr), "node_list incorrect");
193   XBT_DEBUG("Doing a FIND_NODE on %08x", id_to_find);
194
195   /* Ask the nodes on our list if they have information about the node we are trying to find */
196   do {
197     answers        = 0;
198     queries        = sendFindNodeToBest(node_list);
199     nodes_added    = 0;
200     double timeout = simgrid::s4u::Engine::getClock() + find_node_timeout;
201     steps++;
202     double time_beginreceive = simgrid::s4u::Engine::getClock();
203
204     simgrid::s4u::MailboxPtr mailbox = simgrid::s4u::Mailbox::byName(std::to_string(id_));
205     do {
206       if (receive_comm == nullptr)
207         receive_comm = mailbox->get_async(&received_msg);
208
209       if (receive_comm->test()) {
210         Message* msg = static_cast<Message*>(received_msg);
211         // Check if what we have received is what we are looking for.
212         if (msg->answer_ && msg->answer_->getDestinationId() == id_to_find) {
213           routingTableUpdate(msg->sender_id_);
214           // Handle the answer
215           for (auto contact : node_list->nodes)
216             routingTableUpdate(contact.first);
217           answers++;
218
219           nodes_added = node_list->merge(msg->answer_);
220           XBT_DEBUG("Received an answer from %s (%s) with %zu nodes on it", msg->answer_to_->getCname(),
221                     msg->issuer_host_name_, msg->answer_->nodes.size());
222         } else {
223           if (msg->answer_) {
224             routingTableUpdate(msg->sender_id_);
225             XBT_DEBUG("Received a wrong answer for a FIND_NODE");
226           } else {
227             handleFindNode(msg);
228           }
229           // Update the timeout if we didn't have our answer
230           timeout += simgrid::s4u::Engine::getClock() - time_beginreceive;
231           time_beginreceive = simgrid::s4u::Engine::getClock();
232         }
233         delete msg->answer_;
234         delete msg;
235         receive_comm = nullptr;
236       } else {
237         simgrid::s4u::this_actor::sleep_for(1);
238       }
239     } while (simgrid::s4u::Engine::getClock() < timeout && answers < queries);
240     destination_found = node_list->destinationFound();
241   } while (not destination_found && (nodes_added > 0 || answers == 0) &&
242            simgrid::s4u::Engine::getClock() < global_timeout && steps < MAX_STEPS);
243
244   if (destination_found) {
245     if (count_in_stats)
246       find_node_success++;
247     if (queries > 4)
248       XBT_VERB("FIND_NODE on %08x success in %u steps", id_to_find, steps);
249     routingTableUpdate(id_to_find);
250   } else {
251     if (count_in_stats) {
252       find_node_failed++;
253       XBT_VERB("%08x not found in %u steps", id_to_find, steps);
254     }
255   }
256   delete node_list;
257   return destination_found;
258 }
259
260 /** @brief Does a pseudo-random lookup for someone in the system
261   * @param node caller node data
262   */
263 void Node::randomLookup()
264 {
265   unsigned int id_to_look = RANDOM_LOOKUP_NODE; // Totally random.
266   /* TODO: Use some pseudo-random generator like RngStream. */
267   XBT_DEBUG("I'm doing a random lookup");
268   findNode(id_to_look, true);
269 }
270
271 /** @brief Handles the answer to an incoming "find_node" task */
272 void Node::handleFindNode(Message* msg)
273 {
274   routingTableUpdate(msg->sender_id_);
275   XBT_VERB("Received a FIND_NODE from %s (%s), he's trying to find %08x", msg->answer_to_->getCname(),
276            msg->issuer_host_name_, msg->destination_id_);
277   // Building the answer to the request
278   Message* answer =
279       new Message(id_, msg->destination_id_, findClosest(msg->destination_id_),
280                   simgrid::s4u::Mailbox::byName(std::to_string(id_)), simgrid::s4u::Host::current()->getCname());
281   // Sending the answer
282   msg->answer_to_->put_init(answer, 1)->detach(kademlia::destroy);
283 }
284 }
285 /**@brief Returns an identifier which is in a specific bucket of a routing table
286  * @param id id of the routing table owner
287  * @param prefix id of the bucket where we want that identifier to be
288  */
289 unsigned int get_id_in_prefix(unsigned int id, unsigned int prefix)
290 {
291   if (prefix == 0) {
292     return 0;
293   } else {
294     return (1U << ((unsigned int)(prefix - 1))) ^ id;
295   }
296 }
297
298 /** @brief Returns the prefix of an identifier.
299   * The prefix is the id of the bucket in which the remote identifier xor our identifier should be stored.
300   * @param id : big unsigned int id to test
301   * @param nb_bits : key size
302   */
303 unsigned int get_node_prefix(unsigned int id, unsigned int nb_bits)
304 {
305   unsigned int size = sizeof(unsigned int) * 8;
306   for (unsigned int j = 0; j < size; j++) {
307     if (((id >> (size - 1 - j)) & 0x1) != 0) {
308       return nb_bits - (j);
309     }
310   }
311   return 0;
312 }