Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
49374f88e8a226ec579d8510d6c6cf25207a0c5b
[simgrid.git] / examples / cpp / dht-kademlia / node.cpp
1 /* Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "node.hpp"
7 #include "routing_table.hpp"
8
9 XBT_LOG_NEW_DEFAULT_CATEGORY(kademlia_node, "Messages specific for this example");
10 namespace sg4 = simgrid::s4u;
11
12 namespace kademlia {
13 static void destroy(void* message)
14 {
15   const auto* msg = static_cast<Message*>(message);
16   delete msg;
17 }
18
19 /**
20   * Try to asynchronously get a new message from given mailbox. Return null if none available.
21   */
22 Message* Node::receive(sg4::Mailbox* mailbox)
23 {
24   if (receive_comm == nullptr)
25     receive_comm = mailbox->get_async<kademlia::Message>(&received_msg);
26   if (not receive_comm->test())
27     return nullptr;
28   receive_comm = nullptr;
29   return received_msg;
30 }
31
32 /**
33   * @brief Tries to join the network
34   * @param known_id id of the node I know in the network.
35   */
36 bool Node::join(unsigned int known_id)
37 {
38   bool got_answer = false;
39
40   /* Add the guy we know to our routing table and ourselves. */
41   routingTableUpdate(id_);
42   routingTableUpdate(known_id);
43
44   /* First step: Send a "FIND_NODE" request to the node we know */
45   sendFindNode(known_id, id_);
46
47   sg4::Mailbox* mailbox = sg4::Mailbox::by_name(std::to_string(id_));
48   do {
49     if (Message* msg = receive(mailbox)) {
50       XBT_DEBUG("Received an answer from the node I know.");
51       got_answer = true;
52       // retrieve the node list and ping them.
53       const Answer* node_list = msg->answer_.get();
54       if (node_list) {
55         for (auto const& contact : node_list->getNodes())
56           routingTableUpdate(contact.first);
57       } else {
58         handleFindNode(msg);
59       }
60       delete msg;
61     } else
62       sg4::this_actor::sleep_for(1);
63   } while (not got_answer);
64
65   /* Second step: Send a FIND_NODE to a random node in buckets */
66   unsigned int bucket_id = table.findBucket(known_id)->getId();
67   xbt_assert(bucket_id <= IDENTIFIER_SIZE);
68   for (unsigned int i = 0; ((bucket_id > i) || (bucket_id + i) <= IDENTIFIER_SIZE) && i < JOIN_BUCKETS_QUERIES; i++) {
69     if (bucket_id > i) {
70       unsigned int id_in_bucket = get_id_in_prefix(id_, bucket_id - i);
71       findNode(id_in_bucket, false);
72     }
73     if (bucket_id + i <= IDENTIFIER_SIZE) {
74       unsigned int id_in_bucket = get_id_in_prefix(id_, bucket_id + i);
75       findNode(id_in_bucket, false);
76     }
77   }
78   return got_answer;
79 }
80
81 /** @brief Send a "FIND_NODE" to a node
82   * @param id node we are querying
83   * @param destination node we are trying to find.
84   */
85 void Node::sendFindNode(unsigned int id, unsigned int destination) const
86 {
87   /* Gets the mailbox to send to */
88   sg4::Mailbox* mailbox = sg4::Mailbox::by_name(std::to_string(id));
89   /* Build the task */
90
91   auto* msg =
92       new Message(id_, destination, sg4::Mailbox::by_name(std::to_string(id_)), sg4::Host::current()->get_cname());
93
94   /* Send the task */
95   mailbox->put_init(msg, 1)->detach(kademlia::destroy);
96   XBT_VERB("Asking %u for its closest nodes", id);
97 }
98
99 /**
100   * Sends to the best "KADEMLIA_ALPHA" nodes in the "node_list" array a "FIND_NODE" request, to ask them for their best
101   * nodes
102   */
103 unsigned int Node::sendFindNodeToBest(const Answer* node_list) const
104 {
105   unsigned int i           = 0;
106   unsigned int j           = 0;
107   unsigned int destination = node_list->getDestinationId();
108   for (auto const& node_to_query : node_list->getNodes()) {
109     /* We need to have at most "KADEMLIA_ALPHA" requests each time, according to the protocol */
110     /* Gets the node we want to send the query to */
111     if (node_to_query.first != id_) { /* No need to query ourselves */
112       sendFindNode(node_to_query.first, destination);
113       j++;
114     }
115     i++;
116     if (j == KADEMLIA_ALPHA)
117       break;
118   }
119   return i;
120 }
121
122 /** @brief Updates/Puts the node id unsigned into our routing table
123   * @param id The id of the node we need to add unsigned into our routing table
124   */
125 void Node::routingTableUpdate(unsigned int id)
126 {
127   // retrieval of the bucket in which the should be
128   Bucket* bucket = table.findBucket(id);
129
130   // check if the id is already in the bucket.
131   auto id_pos = std::find(bucket->nodes_.begin(), bucket->nodes_.end(), id);
132
133   if (id_pos == bucket->nodes_.end()) {
134     /* We check if the bucket is full or not. If it is, we evict an old element */
135     if (bucket->nodes_.size() >= BUCKET_SIZE) {
136       bucket->nodes_.pop_back();
137     }
138     bucket->nodes_.push_front(id);
139     XBT_VERB("I'm adding to my routing table %08x", id);
140   } else {
141     // We push the element to the front
142     bucket->nodes_.erase(id_pos);
143     bucket->nodes_.push_front(id);
144     XBT_VERB("I'm updating %08x", id);
145   }
146 }
147
148 /** @brief Finds the closest nodes to the node given.
149   * @param node : our node
150   * @param destination_id : the id of the guy we are trying to find
151   */
152 std::unique_ptr<Answer> Node::findClosest(unsigned int destination_id)
153 {
154   auto answer = std::make_unique<Answer>(destination_id);
155   /* We find the corresponding bucket for the id */
156   const Bucket* bucket = table.findBucket(destination_id);
157   int bucket_id  = bucket->getId();
158   xbt_assert((bucket_id <= IDENTIFIER_SIZE), "Bucket found has a wrong identifier");
159   /* So, we copy the contents of the bucket unsigned into our answer */
160   answer->addBucket(bucket);
161
162   /* However, if we don't have enough elements in our bucket, we NEED to include at least "BUCKET_SIZE" elements
163    * (if, of course, we know at least "BUCKET_SIZE" elements. So we're going to look unsigned into the other buckets.
164    */
165   for (int i = 1; answer->getSize() < BUCKET_SIZE && ((bucket_id - i > 0) || (bucket_id + i < IDENTIFIER_SIZE)); i++) {
166     /* We check the previous buckets */
167     if (bucket_id - i >= 0) {
168       const Bucket* bucket_p = &table.getBucketAt(bucket_id - i);
169       answer->addBucket(bucket_p);
170     }
171     /* We check the next buckets */
172     if (bucket_id + i <= IDENTIFIER_SIZE) {
173       const Bucket* bucket_n = &table.getBucketAt(bucket_id + i);
174       answer->addBucket(bucket_n);
175     }
176   }
177   /* We trim the array to have only BUCKET_SIZE or less elements */
178   answer->trim();
179
180   return answer;
181 }
182
183 /** @brief Send a request to find a node in the node routing table.
184   * @param id_to_find the id of the node we are trying to find
185   */
186 bool Node::findNode(unsigned int id_to_find, bool count_in_stats)
187 {
188   unsigned int queries;
189   unsigned int answers;
190   bool destination_found   = false;
191   unsigned int nodes_added = 0;
192   double global_timeout    = sg4::Engine::get_clock() + FIND_NODE_GLOBAL_TIMEOUT;
193   unsigned int steps       = 0;
194
195   /* First we build a list of who we already know */
196   std::unique_ptr<Answer> node_list = findClosest(id_to_find);
197   xbt_assert((node_list != nullptr), "node_list incorrect");
198   XBT_DEBUG("Doing a FIND_NODE on %08x", id_to_find);
199
200   /* Ask the nodes on our list if they have information about the node we are trying to find */
201   do {
202     answers        = 0;
203     queries        = sendFindNodeToBest(node_list.get());
204     nodes_added    = 0;
205     double timeout = sg4::Engine::get_clock() + FIND_NODE_TIMEOUT;
206     steps++;
207     double time_beginreceive = sg4::Engine::get_clock();
208
209     sg4::Mailbox* mailbox = sg4::Mailbox::by_name(std::to_string(id_));
210     do {
211       if (Message* msg = receive(mailbox)) {
212         // Check if what we have received is what we are looking for.
213         if (msg->answer_ && msg->answer_->getDestinationId() == id_to_find) {
214           routingTableUpdate(msg->sender_id_);
215           // Handle the answer
216           for (auto const& contact : node_list->getNodes())
217             routingTableUpdate(contact.first);
218           answers++;
219
220           nodes_added = node_list->merge(msg->answer_.get());
221           XBT_DEBUG("Received an answer from %s (%s) with %zu nodes on it", msg->answer_to_->get_cname(),
222                     msg->issuer_host_name_.c_str(), msg->answer_->getSize());
223         } else {
224           if (msg->answer_) {
225             routingTableUpdate(msg->sender_id_);
226             XBT_DEBUG("Received a wrong answer for a FIND_NODE");
227           } else {
228             handleFindNode(msg);
229           }
230           // Update the timeout if we didn't have our answer
231           timeout += sg4::Engine::get_clock() - time_beginreceive;
232           time_beginreceive = sg4::Engine::get_clock();
233         }
234         delete msg;
235       } else {
236         sg4::this_actor::sleep_for(1);
237       }
238     } while (sg4::Engine::get_clock() < timeout && answers < queries);
239     destination_found = node_list->destinationFound();
240   } while (not destination_found && (nodes_added > 0 || answers == 0) && sg4::Engine::get_clock() < global_timeout &&
241            steps < MAX_STEPS);
242
243   if (destination_found) {
244     if (count_in_stats)
245       find_node_success++;
246     if (queries > 4)
247       XBT_VERB("FIND_NODE on %08x success in %u steps", id_to_find, steps);
248     routingTableUpdate(id_to_find);
249   } else {
250     if (count_in_stats) {
251       find_node_failed++;
252       XBT_VERB("%08x not found in %u steps", id_to_find, steps);
253     }
254   }
255   return destination_found;
256 }
257
258 /** @brief Does a pseudo-random lookup for someone in the system
259   * @param node caller node data
260   */
261 void Node::randomLookup()
262 {
263   unsigned int id_to_look = RANDOM_LOOKUP_NODE; // Totally random.
264   /* TODO: Use some pseudo-random generator. */
265   XBT_DEBUG("I'm doing a random lookup");
266   findNode(id_to_look, true);
267 }
268
269 /** @brief Handles the answer to an incoming "find_node" task */
270 void Node::handleFindNode(const Message* msg)
271 {
272   routingTableUpdate(msg->sender_id_);
273   XBT_VERB("Received a FIND_NODE from %s (%s), he's trying to find %08x", msg->answer_to_->get_cname(),
274            msg->issuer_host_name_.c_str(), msg->destination_id_);
275   // Building the answer to the request
276   auto* answer = new Message(id_, msg->destination_id_, findClosest(msg->destination_id_),
277                              sg4::Mailbox::by_name(std::to_string(id_)), sg4::Host::current()->get_cname());
278   // Sending the answer
279   msg->answer_to_->put_init(answer, 1)->detach(kademlia::destroy);
280 }
281
282 void Node::displaySuccessRate() const
283 {
284   XBT_INFO("%u/%u FIND_NODE have succeeded", find_node_success, find_node_success + find_node_failed);
285 }
286 } // namespace kademlia
287 /**@brief Returns an identifier which is in a specific bucket of a routing table
288  * @param id id of the routing table owner
289  * @param prefix id of the bucket where we want that identifier to be
290  */
291 unsigned int get_id_in_prefix(unsigned int id, unsigned int prefix)
292 {
293   if (prefix == 0) {
294     return 0;
295   } else {
296     return (1U << (prefix - 1)) ^ id;
297   }
298 }
299
300 /** @brief Returns the prefix of an identifier.
301   * The prefix is the id of the bucket in which the remote identifier xor our identifier should be stored.
302   * @param id : big unsigned int id to test
303   * @param nb_bits : key size
304   */
305 unsigned int get_node_prefix(unsigned int id, unsigned int nb_bits)
306 {
307   unsigned int size = sizeof(unsigned int) * 8;
308   for (unsigned int j = 0; j < size; j++) {
309     if (((id >> (size - 1 - j)) & 0x1) != 0) {
310       return nb_bits - j;
311     }
312   }
313   return 0;
314 }