Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
b9d8e377b390b9fff0a354514c9efb1067df8d6e
[simgrid.git] / examples / c / dht-kademlia / node.c
1 /* Copyright (c) 2010-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "node.h"
7 #include "routing_table.h"
8 #include "simgrid/comm.h"
9
10 #include <stdio.h> /* snprintf */
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(dht_kademlia_node, "Messages specific for this example");
13
14 /** @brief Initialization of a node
15  * @param node_id the id of the node
16  * @return the node created
17  */
18 node_t node_init(unsigned int node_id)
19 {
20   node_t node             = xbt_new(s_node_t, 1);
21   node->id                = node_id;
22   node->table             = routing_table_init(node_id);
23   node->mailbox           = get_node_mailbox(node_id);
24   node->find_node_failed  = 0;
25   node->find_node_success = 0;
26
27   node->received_msg = NULL;
28   node->receive_comm = NULL;
29
30   return node;
31 }
32
33 /* @brief Node destructor  */
34 void node_free(node_t node)
35 {
36   routing_table_free(node->table);
37   xbt_free(node);
38 }
39
40 /**
41   * Try to asynchronously get a new message from given mailbox. Return null if none available.
42   */
43 kademlia_message_t receive(node_t node, sg_mailbox_t mailbox)
44 {
45   if (node->receive_comm == NULL)
46     node->receive_comm = sg_mailbox_get_async(mailbox, &node->received_msg);
47   if (!sg_comm_test(node->receive_comm))
48     return NULL;
49   node->receive_comm = NULL;
50   return node->received_msg;
51 }
52
53 /**
54  * @brief Tries to join the network
55  * @param node node data
56  * @param id_known id of the node I know in the network.
57  */
58 unsigned int join(node_t node, unsigned int id_known)
59 {
60   unsigned int i;
61   unsigned int got_answer = 0;
62
63   sg_mailbox_t mailbox = get_node_mailbox(node->id);
64
65   /* Add the guy we know to our routing table and ourselves. */
66   routing_table_update(node, node->id);
67   routing_table_update(node, id_known);
68
69   /* First step: Send a "FIND_NODE" request to the node we know */
70   send_find_node(node, id_known, node->id);
71   do {
72     const kademlia_message_t msg = receive(node, mailbox);
73     if (msg) {
74       XBT_DEBUG("Received an answer from the node I know.");
75       got_answer = 1;
76       // retrieve the node list and ping them.
77       const s_answer_t* node_list  = msg->answer;
78       if (node_list != NULL) {
79         node_contact_t contact;
80         xbt_dynar_foreach (node_list->nodes, i, contact)
81           routing_table_update(node, contact->id);
82         node->received_msg = NULL;
83       } else {
84         handle_find_node(node, msg);
85       }
86       answer_free(msg->answer);
87       free(msg);
88     } else {
89       sg_actor_sleep_for(1);
90     }
91   } while (got_answer == 0);
92
93   /* Second step: Send a FIND_NODE to a random node in buckets */
94   unsigned int bucket_id = routing_table_find_bucket(node->table, id_known)->id;
95   xbt_assert(bucket_id <= IDENTIFIER_SIZE);
96   for (i = 0; ((bucket_id > i) || (bucket_id + i) <= IDENTIFIER_SIZE) && i < JOIN_BUCKETS_QUERIES; i++) {
97     if (bucket_id > i) {
98       unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id - i);
99       find_node(node, id_in_bucket, 0);
100     }
101     if (bucket_id + i <= IDENTIFIER_SIZE) {
102       unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id + i);
103       find_node(node, id_in_bucket, 0);
104     }
105   }
106   return got_answer;
107 }
108
109 /** @brief Send a "FIND_NODE" to a node
110  * @param node sender node data
111  * @param id node we are querying
112  * @param destination node we are trying to find.
113  */
114 void send_find_node(const_node_t node, unsigned int id, unsigned int destination)
115 {
116   /* Gets the mailbox to send to */
117   sg_mailbox_t mailbox = get_node_mailbox(id);
118   /* Build the message */
119   kademlia_message_t msg = new_message(node->id, destination, NULL, node->mailbox, sg_host_self_get_name());
120   sg_comm_t comm         = sg_mailbox_put_init(mailbox, msg, COMM_SIZE);
121   sg_comm_detach(comm, free_message);
122   XBT_VERB("Asking %u for its closest nodes", id);
123 }
124
125 /**
126  * Sends to the best "KADEMLIA_ALPHA" nodes in the "node_list" array a "FIND_NODE" request, to ask them for their best
127  * nodes
128  */
129 unsigned int send_find_node_to_best(const_node_t node, const_answer_t node_list)
130 {
131   unsigned int i           = 0;
132   unsigned int j           = 0;
133   unsigned int destination = node_list->destination_id;
134   while (j < KADEMLIA_ALPHA && i < node_list->size) {
135     /* We need to have at most "KADEMLIA_ALPHA" requests each time, according to the protocol */
136     /* Gets the node we want to send the query to */
137     const s_node_contact_t* node_to_query = xbt_dynar_get_as(node_list->nodes, i, node_contact_t);
138     if (node_to_query->id != node->id) { /* No need to query ourselves */
139       send_find_node(node, node_to_query->id, destination);
140       j++;
141     }
142     i++;
143   }
144   return i;
145 }
146
147 /** @brief Updates/Puts the node id unsigned into our routing table
148  * @param node Our node data
149  * @param id The id of the node we need to add unsigned into our routing table
150  */
151 void routing_table_update(const_node_t node, unsigned int id)
152 {
153   const_routing_table_t table = node->table;
154   // retrieval of the bucket in which the should be
155   const_bucket_t bucket = routing_table_find_bucket(table, id);
156
157   // check if the id is already in the bucket.
158   unsigned int id_pos = bucket_find_id(bucket, id);
159
160   if (id_pos == -1) {
161     /* We check if the bucket is full or not. If it is, we evict an old element */
162     if (xbt_dynar_length(bucket->nodes) >= BUCKET_SIZE)
163       xbt_dynar_pop(bucket->nodes, NULL);
164
165     xbt_dynar_unshift(bucket->nodes, &id);
166     XBT_VERB("I'm adding to my routing table %08x", id);
167   } else {
168     // We push to the front of the dynar the element.
169     unsigned int element = xbt_dynar_get_as(bucket->nodes, id_pos, unsigned int);
170     xbt_dynar_remove_at(bucket->nodes, id_pos, NULL);
171     xbt_dynar_unshift(bucket->nodes, &element);
172     XBT_VERB("I'm updating %08x", element);
173   }
174 }
175
176 /** @brief Finds the closest nodes to the node given.
177  * @param node : our node
178  * @param destination_id : the id of the guy we are trying to find
179  */
180 answer_t find_closest(const_node_t node, unsigned int destination_id)
181 {
182   answer_t answer = answer_init(destination_id);
183   /* We find the corresponding bucket for the id */
184   const_bucket_t bucket = routing_table_find_bucket(node->table, destination_id);
185   int bucket_id         = bucket->id;
186   xbt_assert((bucket_id <= IDENTIFIER_SIZE), "Bucket found has a wrong identifier");
187   /* So, we copy the contents of the bucket unsigned into our result dynar */
188   answer_add_bucket(bucket, answer);
189
190   /* However, if we don't have enough elements in our bucket, we NEED to include at least "BUCKET_SIZE" elements
191    * (if, of course, we know at least "BUCKET_SIZE" elements. So we're going to look unsigned into the other buckets.
192    */
193   for (int i = 1; answer->size < BUCKET_SIZE && ((bucket_id - i > 0) || (bucket_id + i < IDENTIFIER_SIZE)); i++) {
194     /* We check the previous buckets */
195     if (bucket_id - i >= 0) {
196       const_bucket_t bucket_p = &node->table->buckets[bucket_id - i];
197       answer_add_bucket(bucket_p, answer);
198     }
199     /* We check the next buckets */
200     if (bucket_id + i <= IDENTIFIER_SIZE) {
201       const_bucket_t bucket_n = &node->table->buckets[bucket_id + i];
202       answer_add_bucket(bucket_n, answer);
203     }
204   }
205   /* We sort the array by XOR distance */
206   answer_sort(answer);
207   /* We trim the array to have only BUCKET_SIZE or less elements */
208   answer_trim(answer);
209
210   return answer;
211 }
212
213 unsigned int find_node(node_t node, unsigned int id_to_find, unsigned int count_in_stats)
214 {
215   unsigned int queries;
216   unsigned int answers;
217   unsigned int destination_found = 0;
218   unsigned int nodes_added       = 0;
219   double global_timeout          = simgrid_get_clock() + FIND_NODE_GLOBAL_TIMEOUT;
220   unsigned int steps             = 0;
221
222   /* First we build a list of who we already know */
223   answer_t node_list = find_closest(node, id_to_find);
224   xbt_assert((node_list != NULL), "node_list incorrect");
225
226   XBT_DEBUG("Doing a FIND_NODE on %08x", id_to_find);
227
228   /* Ask the nodes on our list if they   have information about the node we are trying to find */
229   sg_mailbox_t mailbox = get_node_mailbox(node->id);
230   do {
231     answers        = 0;
232     queries        = send_find_node_to_best(node, node_list);
233     nodes_added    = 0;
234     double timeout = simgrid_get_clock() + FIND_NODE_TIMEOUT;
235     steps++;
236     double time_beginreceive = simgrid_get_clock();
237
238     do {
239       const kademlia_message_t msg = receive(node, mailbox);
240       if (msg) {
241         // Figure out if we received an answer or something else
242         // Check if what we have received is what we are looking for.
243         if (msg->answer != NULL && msg->answer->destination_id == id_to_find) {
244           // Handle the answer
245           routing_table_update(node, msg->sender_id);
246           node_contact_t contact;
247           unsigned int i;
248           xbt_dynar_foreach (node_list->nodes, i, contact)
249             routing_table_update(node, contact->id);
250
251           answers++;
252
253           nodes_added = answer_merge(node_list, msg->answer);
254           XBT_DEBUG("Received an answer from %s (%s) with %lu nodes on it", sg_mailbox_get_name(msg->answer_to),
255                     msg->issuer_host_name, xbt_dynar_length(msg->answer->nodes));
256         } else {
257           if (msg->answer != NULL) {
258             routing_table_update(node, msg->sender_id);
259             XBT_DEBUG("Received a wrong answer for a FIND_NODE");
260           } else {
261             handle_find_node(node, msg);
262           }
263           // Update the timeout if we didn't have our answer
264           timeout += simgrid_get_clock() - time_beginreceive;
265           time_beginreceive = simgrid_get_clock();
266         }
267         answer_free(msg->answer);
268         free(msg);
269       } else {
270         sg_actor_sleep_for(1);
271       }
272     } while (simgrid_get_clock() < timeout && answers < queries);
273     destination_found = answer_destination_found(node_list);
274   } while (!destination_found && (nodes_added > 0 || answers == 0) && simgrid_get_clock() < global_timeout &&
275            steps < MAX_STEPS);
276   if (destination_found) {
277     if (count_in_stats)
278       node->find_node_success++;
279     if (queries > 4)
280       XBT_VERB("FIND_NODE on %08x success in %u steps", id_to_find, steps);
281     routing_table_update(node, id_to_find);
282   } else {
283     if (count_in_stats) {
284       node->find_node_failed++;
285       XBT_VERB("%08x not found in %u steps", id_to_find, steps);
286     }
287   }
288   answer_free(node_list);
289   return destination_found;
290 }
291
292 /** @brief Does a pseudo-random lookup for someone in the system
293  * @param node caller node data
294  */
295 void random_lookup(node_t node)
296 {
297   unsigned int id_to_look = RANDOM_LOOKUP_NODE; // Totally random.
298   /* TODO: Use some pseudorandom generator. */
299   XBT_DEBUG("I'm doing a random lookup");
300   find_node(node, id_to_look, 1);
301 }
302
303 /** @brief Handles the answer to an incoming "find_node" message */
304 void handle_find_node(const_node_t node, const_kademlia_message_t msg)
305 {
306   routing_table_update(node, msg->sender_id);
307   XBT_VERB("Received a FIND_NODE from %s (%s), he's trying to find %08x", sg_mailbox_get_name(msg->answer_to),
308            msg->issuer_host_name, msg->destination_id);
309   // Building the msg to send
310   kademlia_message_t answer = new_message(node->id, msg->destination_id, find_closest(node, msg->destination_id),
311                                           node->mailbox, sg_host_self_get_name());
312   // Sending the msg
313   sg_comm_t comm = sg_mailbox_put_init(msg->answer_to, answer, COMM_SIZE);
314   sg_comm_detach(comm, &free_message);
315 }
316
317 /**@brief Returns an identifier which is in a specific bucket of a routing table
318  * @param id id of the routing table owner
319  * @param prefix id of the bucket where we want that identifier to be
320  */
321 unsigned int get_id_in_prefix(unsigned int id, unsigned int prefix)
322 {
323   if (prefix == 0) {
324     return 0;
325   } else {
326     return (1U << (prefix - 1)) ^ id;
327   }
328 }
329
330 /** @brief Returns the prefix of an identifier.
331  * The prefix is the id of the bucket in which the remote identifier xor our identifier should be stored.
332  * @param id : big unsigned int id to test
333  * @param nb_bits : key size
334  */
335 unsigned int get_node_prefix(unsigned int id, unsigned int nb_bits)
336 {
337   unsigned int size = sizeof(unsigned int) * 8;
338   for (unsigned int j = 0; j < size; j++) {
339     if (((id >> (size - 1 - j)) & 0x1) != 0) {
340       return nb_bits - j;
341     }
342   }
343   return 0;
344 }
345
346 /** @brief Gets the mailbox name of a host given its identifier */
347 sg_mailbox_t get_node_mailbox(unsigned int id)
348 {
349   char mailbox_name[MAILBOX_NAME_SIZE];
350   snprintf(mailbox_name, MAILBOX_NAME_SIZE - 1, "%u", id);
351   return sg_mailbox_by_name(mailbox_name);
352 }
353
354 /** Constructor, build a new contact information. */
355 node_contact_t node_contact_new(unsigned int id, unsigned int distance)
356 {
357   node_contact_t contact = xbt_new(s_node_contact_t, 1);
358
359   contact->id       = id;
360   contact->distance = distance;
361
362   return contact;
363 }
364
365 /** Builds a contact information from a contact information */
366 node_contact_t node_contact_copy(const_node_contact_t node_contact)
367 {
368   node_contact_t contact = xbt_new(s_node_contact_t, 1);
369
370   contact->id       = node_contact->id;
371   contact->distance = node_contact->distance;
372
373   return contact;
374 }
375
376 /** Destructor */
377 void node_contact_free(node_contact_t contact)
378 {
379   xbt_free(contact);
380 }