Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines.
[simgrid.git] / examples / s4u / dht-kademlia / node.cpp
1 /* Copyright (c) 2010-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "node.hpp"
7 #include "routing_table.hpp"
8
9 XBT_LOG_NEW_DEFAULT_CATEGORY(kademlia_node, "Messages specific for this example");
10
11 namespace kademlia {
12 static void destroy(void* message)
13 {
14   const auto* msg = static_cast<Message*>(message);
15   delete msg;
16 }
17
18 /**
19   * @brief Tries to join the network
20   * @param known_id id of the node I know in the network.
21   */
22 bool Node::join(unsigned int known_id)
23 {
24   bool got_answer = false;
25
26   /* Add the guy we know to our routing table and ourselves. */
27   routingTableUpdate(id_);
28   routingTableUpdate(known_id);
29
30   /* First step: Send a "FIND_NODE" request to the node we know */
31   sendFindNode(known_id, id_);
32
33   simgrid::s4u::Mailbox* mailbox = simgrid::s4u::Mailbox::by_name(std::to_string(id_));
34   do {
35     if (receive_comm == nullptr)
36       receive_comm = mailbox->get_async<Message>(&received_msg);
37     if (receive_comm->test()) {
38       XBT_DEBUG("Received an answer from the node I know.");
39       got_answer = true;
40       // retrieve the node list and ping them.
41       const Answer* node_list = received_msg->answer_.get();
42       if (node_list) {
43         for (auto const& contact : node_list->getNodes())
44           routingTableUpdate(contact.first);
45       } else {
46         handleFindNode(received_msg);
47       }
48       delete received_msg;
49       receive_comm = nullptr;
50     } else
51       simgrid::s4u::this_actor::sleep_for(1);
52   } while (not got_answer);
53
54   /* Second step: Send a FIND_NODE to a random node in buckets */
55   unsigned int bucket_id = table.findBucket(known_id)->getId();
56   xbt_assert(bucket_id <= IDENTIFIER_SIZE);
57   for (unsigned int i = 0; ((bucket_id > i) || (bucket_id + i) <= IDENTIFIER_SIZE) && i < JOIN_BUCKETS_QUERIES; i++) {
58     if (bucket_id > i) {
59       unsigned int id_in_bucket = get_id_in_prefix(id_, bucket_id - i);
60       findNode(id_in_bucket, false);
61     }
62     if (bucket_id + i <= IDENTIFIER_SIZE) {
63       unsigned int id_in_bucket = get_id_in_prefix(id_, bucket_id + i);
64       findNode(id_in_bucket, false);
65     }
66   }
67   return got_answer;
68 }
69
70 /** @brief Send a "FIND_NODE" to a node
71   * @param id node we are querying
72   * @param destination node we are trying to find.
73   */
74 void Node::sendFindNode(unsigned int id, unsigned int destination) const
75 {
76   /* Gets the mailbox to send to */
77   simgrid::s4u::Mailbox* mailbox = simgrid::s4u::Mailbox::by_name(std::to_string(id));
78   /* Build the task */
79
80   auto* msg = new Message(id_, destination, simgrid::s4u::Mailbox::by_name(std::to_string(id_)),
81                           simgrid::s4u::Host::current()->get_cname());
82
83   /* Send the task */
84   mailbox->put_init(msg, 1)->detach(kademlia::destroy);
85   XBT_VERB("Asking %u for its closest nodes", id);
86 }
87
88 /**
89   * Sends to the best "KADEMLIA_ALPHA" nodes in the "node_list" array a "FIND_NODE" request, to ask them for their best
90   * nodes
91   */
92 unsigned int Node::sendFindNodeToBest(const Answer* node_list) const
93 {
94   unsigned int i           = 0;
95   unsigned int j           = 0;
96   unsigned int destination = node_list->getDestinationId();
97   for (auto const& node_to_query : node_list->getNodes()) {
98     /* We need to have at most "KADEMLIA_ALPHA" requests each time, according to the protocol */
99     /* Gets the node we want to send the query to */
100     if (node_to_query.first != id_) { /* No need to query ourselves */
101       sendFindNode(node_to_query.first, destination);
102       j++;
103     }
104     i++;
105     if (j == KADEMLIA_ALPHA)
106       break;
107   }
108   return i;
109 }
110
111 /** @brief Updates/Puts the node id unsigned into our routing table
112   * @param id The id of the node we need to add unsigned into our routing table
113   */
114 void Node::routingTableUpdate(unsigned int id)
115 {
116   // retrieval of the bucket in which the should be
117   Bucket* bucket = table.findBucket(id);
118
119   // check if the id is already in the bucket.
120   auto id_pos = std::find(bucket->nodes_.begin(), bucket->nodes_.end(), id);
121
122   if (id_pos == bucket->nodes_.end()) {
123     /* We check if the bucket is full or not. If it is, we evict an old element */
124     if (bucket->nodes_.size() >= BUCKET_SIZE) {
125       bucket->nodes_.pop_back();
126     }
127     bucket->nodes_.push_front(id);
128     XBT_VERB("I'm adding to my routing table %08x", id);
129   } else {
130     // We push the element to the front
131     bucket->nodes_.erase(id_pos);
132     bucket->nodes_.push_front(id);
133     XBT_VERB("I'm updating %08x", id);
134   }
135 }
136
137 /** @brief Finds the closest nodes to the node given.
138   * @param node : our node
139   * @param destination_id : the id of the guy we are trying to find
140   */
141 std::unique_ptr<Answer> Node::findClosest(unsigned int destination_id)
142 {
143   auto answer = std::make_unique<Answer>(destination_id);
144   /* We find the corresponding bucket for the id */
145   const Bucket* bucket = table.findBucket(destination_id);
146   int bucket_id  = bucket->getId();
147   xbt_assert((bucket_id <= IDENTIFIER_SIZE), "Bucket found has a wrong identifier");
148   /* So, we copy the contents of the bucket unsigned into our answer */
149   answer->addBucket(bucket);
150
151   /* However, if we don't have enough elements in our bucket, we NEED to include at least "BUCKET_SIZE" elements
152    * (if, of course, we know at least "BUCKET_SIZE" elements. So we're going to look unsigned into the other buckets.
153    */
154   for (int i = 1; answer->getSize() < BUCKET_SIZE && ((bucket_id - i > 0) || (bucket_id + i < IDENTIFIER_SIZE)); i++) {
155     /* We check the previous buckets */
156     if (bucket_id - i >= 0) {
157       const Bucket* bucket_p = &table.getBucketAt(bucket_id - i);
158       answer->addBucket(bucket_p);
159     }
160     /* We check the next buckets */
161     if (bucket_id + i <= IDENTIFIER_SIZE) {
162       const Bucket* bucket_n = &table.getBucketAt(bucket_id + i);
163       answer->addBucket(bucket_n);
164     }
165   }
166   /* We trim the array to have only BUCKET_SIZE or less elements */
167   answer->trim();
168
169   return answer;
170 }
171
172 /** @brief Send a request to find a node in the node routing table.
173   * @param id_to_find the id of the node we are trying to find
174   */
175 bool Node::findNode(unsigned int id_to_find, bool count_in_stats)
176 {
177   unsigned int queries;
178   unsigned int answers;
179   bool destination_found   = false;
180   unsigned int nodes_added = 0;
181   double global_timeout    = simgrid::s4u::Engine::get_clock() + FIND_NODE_GLOBAL_TIMEOUT;
182   unsigned int steps       = 0;
183
184   /* First we build a list of who we already know */
185   std::unique_ptr<Answer> node_list = findClosest(id_to_find);
186   xbt_assert((node_list != nullptr), "node_list incorrect");
187   XBT_DEBUG("Doing a FIND_NODE on %08x", id_to_find);
188
189   /* Ask the nodes on our list if they have information about the node we are trying to find */
190   do {
191     answers        = 0;
192     queries        = sendFindNodeToBest(node_list.get());
193     nodes_added    = 0;
194     double timeout = simgrid::s4u::Engine::get_clock() + FIND_NODE_TIMEOUT;
195     steps++;
196     double time_beginreceive = simgrid::s4u::Engine::get_clock();
197
198     simgrid::s4u::Mailbox* mailbox = simgrid::s4u::Mailbox::by_name(std::to_string(id_));
199     do {
200       if (receive_comm == nullptr)
201         receive_comm = mailbox->get_async<Message>(&received_msg);
202
203       if (receive_comm->test()) {
204         // Check if what we have received is what we are looking for.
205         if (received_msg->answer_ && received_msg->answer_->getDestinationId() == id_to_find) {
206           routingTableUpdate(received_msg->sender_id_);
207           // Handle the answer
208           for (auto const& contact : node_list->getNodes())
209             routingTableUpdate(contact.first);
210           answers++;
211
212           nodes_added = node_list->merge(received_msg->answer_.get());
213           XBT_DEBUG("Received an answer from %s (%s) with %zu nodes on it", received_msg->answer_to_->get_cname(),
214                     received_msg->issuer_host_name_.c_str(), received_msg->answer_->getSize());
215         } else {
216           if (received_msg->answer_) {
217             routingTableUpdate(received_msg->sender_id_);
218             XBT_DEBUG("Received a wrong answer for a FIND_NODE");
219           } else {
220             handleFindNode(received_msg);
221           }
222           // Update the timeout if we didn't have our answer
223           timeout += simgrid::s4u::Engine::get_clock() - time_beginreceive;
224           time_beginreceive = simgrid::s4u::Engine::get_clock();
225         }
226         delete received_msg;
227         receive_comm = nullptr;
228       } else {
229         simgrid::s4u::this_actor::sleep_for(1);
230       }
231     } while (simgrid::s4u::Engine::get_clock() < timeout && answers < queries);
232     destination_found = node_list->destinationFound();
233   } while (not destination_found && (nodes_added > 0 || answers == 0) &&
234            simgrid::s4u::Engine::get_clock() < global_timeout && steps < MAX_STEPS);
235
236   if (destination_found) {
237     if (count_in_stats)
238       find_node_success++;
239     if (queries > 4)
240       XBT_VERB("FIND_NODE on %08x success in %u steps", id_to_find, steps);
241     routingTableUpdate(id_to_find);
242   } else {
243     if (count_in_stats) {
244       find_node_failed++;
245       XBT_VERB("%08x not found in %u steps", id_to_find, steps);
246     }
247   }
248   return destination_found;
249 }
250
251 /** @brief Does a pseudo-random lookup for someone in the system
252   * @param node caller node data
253   */
254 void Node::randomLookup()
255 {
256   unsigned int id_to_look = RANDOM_LOOKUP_NODE; // Totally random.
257   /* TODO: Use some pseudo-random generator. */
258   XBT_DEBUG("I'm doing a random lookup");
259   findNode(id_to_look, true);
260 }
261
262 /** @brief Handles the answer to an incoming "find_node" task */
263 void Node::handleFindNode(const Message* msg)
264 {
265   routingTableUpdate(msg->sender_id_);
266   XBT_VERB("Received a FIND_NODE from %s (%s), he's trying to find %08x", msg->answer_to_->get_cname(),
267            msg->issuer_host_name_.c_str(), msg->destination_id_);
268   // Building the answer to the request
269   auto* answer =
270       new Message(id_, msg->destination_id_, findClosest(msg->destination_id_),
271                   simgrid::s4u::Mailbox::by_name(std::to_string(id_)), simgrid::s4u::Host::current()->get_cname());
272   // Sending the answer
273   msg->answer_to_->put_init(answer, 1)->detach(kademlia::destroy);
274 }
275
276 void Node::displaySuccessRate() const
277 {
278   XBT_INFO("%u/%u FIND_NODE have succeeded", find_node_success, find_node_success + find_node_failed);
279 }
280 } // namespace kademlia
281 /**@brief Returns an identifier which is in a specific bucket of a routing table
282  * @param id id of the routing table owner
283  * @param prefix id of the bucket where we want that identifier to be
284  */
285 unsigned int get_id_in_prefix(unsigned int id, unsigned int prefix)
286 {
287   if (prefix == 0) {
288     return 0;
289   } else {
290     return (1U << (prefix - 1)) ^ id;
291   }
292 }
293
294 /** @brief Returns the prefix of an identifier.
295   * The prefix is the id of the bucket in which the remote identifier xor our identifier should be stored.
296   * @param id : big unsigned int id to test
297   * @param nb_bits : key size
298   */
299 unsigned int get_node_prefix(unsigned int id, unsigned int nb_bits)
300 {
301   unsigned int size = sizeof(unsigned int) * 8;
302   for (unsigned int j = 0; j < size; j++) {
303     if (((id >> (size - 1 - j)) & 0x1) != 0) {
304       return nb_bits - (j);
305     }
306   }
307   return 0;
308 }