1 /* Copyright (c) 2012-2018. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "dht-kademlia.h"
10 #include "simgrid/msg.h"
12 #include <stdio.h> /* snprintf */
14 /** @addtogroup MSG_examples
15 * <b>kademlia/kademlia.c: Kademlia protocol</b>
16 * Implements the Kademlia protocol, using 32 bits identifiers.
18 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia, "Messages specific for this msg example");
20 /* Main loop for the process */
21 static void main_loop(node_t node, double deadline)
23 double next_lookup_time = MSG_get_clock() + random_lookup_interval;
24 XBT_VERB("Main loop start");
25 while (MSG_get_clock() < deadline) {
27 if (node->receive_comm == NULL) {
28 node->task_received = NULL;
29 node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
31 if (node->receive_comm) {
32 if (!MSG_comm_test(node->receive_comm)) {
33 /* We search for a pseudo random node */
34 if (MSG_get_clock() >= next_lookup_time) {
36 next_lookup_time += random_lookup_interval;
38 //Didn't get a task: sleep for a while...
42 //There has been a transfer, we need to handle it !
43 msg_error_t status = MSG_comm_get_status(node->receive_comm);
44 MSG_comm_destroy(node->receive_comm);
45 node->receive_comm = NULL;
47 if (status == MSG_OK) {
48 xbt_assert((node->task_received != NULL), "We received an incorrect task");
49 handle_task(node, node->task_received);
51 xbt_assert((MSG_comm_get_task(node->receive_comm) == NULL), "Comm failed but received a task.");
52 XBT_DEBUG("Nevermind, the communication has failed.");
56 //Didn't get a comm: sleep.
60 //Cleanup the receiving communication.
61 if (node->receive_comm != NULL) {
62 if (MSG_comm_test(node->receive_comm) && MSG_comm_get_status(node->receive_comm) == MSG_OK) {
63 task_free(MSG_comm_get_task(node->receive_comm));
65 MSG_comm_destroy(node->receive_comm);
69 /** @brief Node function
71 * @param the ID of the person I know in the system (or not)
72 * @param Time before I leave the system because I'm bored
74 static int node(int argc, char *argv[])
76 unsigned int join_sucess = 1;
78 xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
79 /* Node initialization */
80 unsigned int id = strtoul(argv[1], NULL, 0);
81 node_t node = node_init(id);
84 XBT_INFO("Hi, I'm going to join the network with id %s", node->mailbox);
85 unsigned int id_known = strtoul(argv[2], NULL, 0);
86 join_sucess = join(node, id_known);
87 deadline = strtod(argv[3], NULL);
89 deadline = strtod(argv[2], NULL);
90 XBT_INFO("Hi, I'm going to create the network with id %s", node->mailbox);
91 node_routing_table_update(node, node->id);
94 XBT_VERB("Ok, I'm joining the network with id %s", node->mailbox);
95 //We start the main loop
96 main_loop(node, deadline);
98 XBT_INFO("I couldn't join the network :(");
100 XBT_DEBUG("I'm leaving the network");
101 XBT_INFO("%u/%u FIND_NODE have succeeded", node->find_node_success, node->find_node_success + node->find_node_failed);
108 * @brief Tries to join the network
109 * @param node node data
110 * @param id_known id of the node I know in the network.
112 unsigned int join(node_t node, unsigned int id_known)
116 unsigned int trial = 0;
118 unsigned int answer_got = 0;
120 /* Add the guy we know to our routing table and ourselves. */
121 node_routing_table_update(node, node->id);
122 node_routing_table_update(node, id_known);
124 /* First step: Send a "FIND_NODE" request to the node we know */
125 send_find_node(node, id_known, node->id);
127 if (node->receive_comm == NULL) {
128 node->task_received = NULL;
129 node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
131 if (node->receive_comm) {
132 if (MSG_comm_test(node->receive_comm)) {
133 status = MSG_comm_get_status(node->receive_comm);
134 MSG_comm_destroy(node->receive_comm);
135 node->receive_comm = NULL;
136 if (status == MSG_OK) {
137 XBT_DEBUG("Received an answer from the node I know.");
139 //retrieve the node list and ping them.
140 task_data_t data = MSG_task_get_data(node->task_received);
141 xbt_assert((data != NULL), "Null data received");
142 if (data->type == TASK_FIND_NODE_ANSWER) {
143 node_contact_t contact;
144 node_list = data->answer;
145 xbt_dynar_foreach(node_list->nodes, i, contact) {
146 node_routing_table_update(node, contact->id);
148 task_free(node->task_received);
150 handle_task(node, node->task_received);
156 MSG_process_sleep(1);
159 MSG_process_sleep(1);
161 } while (answer_got == 0 && trial < max_join_trials);
162 /* Second step: Send a FIND_NODE to a a random node in buckets */
163 unsigned int bucket_id = routing_table_find_bucket(node->table, id_known)->id;
164 for (i = 0; ((bucket_id > i) || (bucket_id + i) <= identifier_size) && i < JOIN_BUCKETS_QUERIES; i++) {
166 unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id - i);
167 find_node(node, id_in_bucket, 0);
169 if (bucket_id + i <= identifier_size) {
170 unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id + i);
171 find_node(node, id_in_bucket, 0);
177 /** @brief Send a request to find a node in the node routing table.
178 * @param node our node data
179 * @param id_to_find the id of the node we are trying to find
181 unsigned int find_node(node_t node, unsigned int id_to_find, unsigned int count_in_stats)
184 unsigned int queries;
185 unsigned int answers;
186 unsigned int destination_found = 0;
187 unsigned int nodes_added = 0;
188 double global_timeout = MSG_get_clock() + find_node_global_timeout;
189 unsigned int steps = 0;
191 /* First we build a list of who we already know */
192 answer_t node_list = node_find_closest(node, id_to_find);
193 xbt_assert((node_list != NULL), "node_list incorrect");
195 XBT_DEBUG("Doing a FIND_NODE on %08x", id_to_find);
199 /* Ask the nodes on our list if they have information about the node we are trying to find */
202 queries = send_find_node_to_best(node, node_list);
204 double timeout = MSG_get_clock() + find_node_timeout;
206 double time_beginreceive = MSG_get_clock();
208 if (node->receive_comm == NULL) {
209 node->task_received = NULL;
210 node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
212 if (node->receive_comm) {
213 if (MSG_comm_test(node->receive_comm)) {
214 status = MSG_comm_get_status(node->receive_comm);
215 MSG_comm_destroy(node->receive_comm);
216 node->receive_comm = NULL;
217 if (status == MSG_OK) {
218 xbt_assert((node->task_received != NULL), "Invalid task received");
219 //Figure out if we received an answer or something else
220 task_data_t data = MSG_task_get_data(node->task_received);
221 xbt_assert((data != NULL), "No data in the task");
223 //Check if what we have received is what we are looking for.
224 if (data->type == TASK_FIND_NODE_ANSWER && data->answer->destination_id == id_to_find) {
226 node_routing_table_update(node, data->sender_id);
227 node_contact_t contact;
228 xbt_dynar_foreach(node_list->nodes, i, contact) {
229 node_routing_table_update(node, contact->id);
233 nodes_added = answer_merge(node_list, data->answer);
234 XBT_DEBUG("Received an answer from %s (%s) with %lu nodes on it", data->answer_to, data->issuer_host_name,
235 xbt_dynar_length(data->answer->nodes));
237 task_free(node->task_received);
239 handle_task(node, node->task_received);
240 //Update the timeout if we didn't have our answer
241 timeout += MSG_get_clock() - time_beginreceive;
242 time_beginreceive = MSG_get_clock();
246 MSG_process_sleep(1);
249 MSG_process_sleep(1);
251 } while (MSG_get_clock() < timeout && answers < queries);
252 destination_found = answer_destination_found(node_list);
253 } while (!destination_found && (nodes_added > 0 || answers == 0) && MSG_get_clock() < global_timeout
254 && steps < MAX_STEPS);
255 if (destination_found) {
257 node->find_node_success++;
259 XBT_VERB("FIND_NODE on %08x success in %u steps", id_to_find, steps);
260 node_routing_table_update(node, id_to_find);
262 if (count_in_stats) {
263 node->find_node_failed++;
264 XBT_VERB("%08x not found in %u steps", id_to_find, steps);
267 answer_free(node_list);
268 return destination_found;
271 /** @brief Pings a node in the system to see if it is online.
272 * @param node Our node data
273 * @param id_to_ping the id of a node we want to see if it is online.
274 * @return if the ping succeded or not.
276 unsigned int ping(node_t node, unsigned int id_to_ping)
278 char mailbox[MAILBOX_NAME_SIZE];
279 snprintf(mailbox, MAILBOX_NAME_SIZE, "%u", id_to_ping);
281 double timeout = MSG_get_clock() + ping_timeout;
283 msg_task_t ping_task = task_new_ping(node->id, node->mailbox, MSG_host_get_name(MSG_host_self()));
284 msg_task_t task_received = NULL;
286 XBT_VERB("PING %08x", id_to_ping);
288 //Check that we aren't trying to ping ourselves
289 if (id_to_ping == node->id) {
293 /* Sending the ping task */
294 MSG_task_dsend(ping_task, mailbox, task_free_v);
296 task_received = NULL;
298 MSG_task_receive_with_timeout(&task_received, node->mailbox, ping_timeout);
299 if (status == MSG_OK) {
300 xbt_assert((task_received != NULL), "Invalid task received");
301 //Checking if it's what we are waiting for or not.
302 task_data_t data = MSG_task_get_data(task_received);
303 xbt_assert((data != NULL), "didn't receive any data...");
304 if (data->type == TASK_PING_ANSWER && id_to_ping == data->sender_id) {
305 XBT_VERB("Ping to %s succeeded", mailbox);
306 node_routing_table_update(node, data->sender_id);
307 task_free(task_received);
308 return 1; // Destination found, ping succeeded!
310 //If it's not our answer, we answer the query anyway.
311 handle_task(node, task_received);
314 } while (MSG_get_clock() < timeout);
316 if (MSG_get_clock() >= timeout) {
317 XBT_DEBUG("Ping to %s has timeout.", mailbox);
320 XBT_DEBUG("It seems that %s is offline...", mailbox);
324 /** @brief Does a pseudo-random lookup for someone in the system
325 * @param node caller node data
327 void random_lookup(node_t node)
329 unsigned int id_to_look = RANDOM_LOOKUP_NODE; //Totally random.
330 /* TODO: Use some pseudorandom generator like RngStream. */
331 XBT_DEBUG("I'm doing a random lookup");
332 find_node(node, id_to_look, 1);
335 /** @brief Send a "FIND_NODE" to a node
336 * @param node sender node data
337 * @param id node we are querying
338 * @param destination node we are trying to find.
340 void send_find_node(node_t node, unsigned int id, unsigned int destination)
342 char mailbox[MAILBOX_NAME_SIZE];
343 /* Gets the mailbox to send to */
344 get_node_mailbox(id, mailbox);
346 msg_task_t task = task_new_find_node(node->id, destination, node->mailbox, MSG_host_get_name(MSG_host_self()));
348 xbt_assert((task != NULL), "Trying to send a NULL task.");
349 MSG_task_dsend(task, mailbox, task_free_v);
350 XBT_VERB("Asking %s for its closest nodes", mailbox);
354 * Sends to the best "kademlia_alpha" nodes in the "node_list" array a "FIND_NODE" request, to ask them for their best nodes
356 unsigned int send_find_node_to_best(node_t node, answer_t node_list)
360 unsigned int destination = node_list->destination_id;
361 while (j < kademlia_alpha && i < node_list->size) {
362 /* We need to have at most "kademlia_alpha" requests each time, according to the protocol */
363 /* Gets the node we want to send the query to */
364 node_contact_t node_to_query = xbt_dynar_get_as(node_list->nodes, i, node_contact_t);
365 if (node_to_query->id != node->id) { /* No need to query ourselves */
366 send_find_node(node, node_to_query->id, destination);
374 /** @brief Handles an incoming received task */
375 void handle_task(node_t node, msg_task_t task)
377 task_data_t data = MSG_task_get_data(task);
378 xbt_assert((data != NULL), "Received NULL data");
379 //Adding/updating the guy to our routing table
380 node_routing_table_update(node, data->sender_id);
381 switch (data->type) {
383 handle_find_node(node, data);
385 case TASK_FIND_NODE_ANSWER:
386 XBT_DEBUG("Received a wrong answer for a FIND_NODE");
389 handle_ping(node, data);
397 /** @brief Handles the answer to an incoming "find_node" task */
398 void handle_find_node(node_t node, task_data_t data)
400 XBT_VERB("Received a FIND_NODE from %s (%s), he's trying to find %08x",
401 data->answer_to, data->issuer_host_name, data->destination_id);
402 //Building the answer to the request
403 answer_t answer = node_find_closest(node, data->destination_id);
404 //Building the task to send
405 msg_task_t task = task_new_find_node_answer(node->id, data->destination_id, answer, node->mailbox,
406 MSG_host_get_name(MSG_host_self()));
408 MSG_task_dsend(task, data->answer_to, task_free_v);
411 /** @brief handles the answer to a ping */
412 void handle_ping(node_t node, task_data_t data)
414 XBT_VERB("Received a PING request from %s (%s)", data->answer_to, data->issuer_host_name);
415 //Building the answer to the request
416 msg_task_t task = task_new_ping_answer(node->id, data->answer_to, MSG_host_get_name(MSG_host_self()));
418 MSG_task_dsend(task, data->answer_to, task_free_v);
421 /** @brief Main function */
422 int main(int argc, char *argv[])
424 MSG_init(&argc, argv);
426 /* Check the arguments */
427 xbt_assert(argc > 2, "Usage: %s platform_file deployment_file\n\tExample: %s msg_platform.xml msg_deployment.xml\n",
430 const char *platform_file = argv[1];
431 const char *deployment_file = argv[2];
433 MSG_create_environment(platform_file);
434 MSG_function_register("node", node);
435 MSG_launch_application(deployment_file);
437 msg_error_t res = MSG_main();
439 XBT_INFO("Simulated time: %g", MSG_get_clock());
441 return res != MSG_OK;