1 /* Copyright (c) 2012, 2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
13 #include "xbt/asserts.h"
14 /** @addtogroup MSG_examples
15 * <b>kademlia/kademlia.c: Kademlia protocol</b>
16 * Implements the Kademlia protocol, using 32 bits identifiers.
18 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia,
19 "Messages specific for this msg example");
21 extern long unsigned int smx_total_comms;
24 * Main loop for the process
26 static void main_loop(node_t node, double deadline)
28 double next_lookup_time = MSG_get_clock() + random_lookup_interval;
29 XBT_VERB("Main loop start");
30 while (MSG_get_clock() < deadline) {
32 if (node->receive_comm == NULL) {
33 node->task_received = NULL;
34 node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
36 if (node->receive_comm) {
37 if (!MSG_comm_test(node->receive_comm)) {
38 /* We search for a pseudo random node */
39 if (MSG_get_clock() >= next_lookup_time) {
41 next_lookup_time += random_lookup_interval;
43 //Didn't get a task: sleep for a while...
47 //There has been a transfer, we need to handle it !
48 msg_error_t status = MSG_comm_get_status(node->receive_comm);
49 MSG_comm_destroy(node->receive_comm);
50 node->receive_comm = NULL;
52 if (status == MSG_OK) {
53 xbt_assert((node->task_received != NULL),
54 "We received an incorrect task");
55 handle_task(node, node->task_received);
57 xbt_assert((MSG_comm_get_task(node->receive_comm) == NULL),
58 "Comm failed but received a task.");
59 XBT_DEBUG("Nevermind, the communication has failed.");
63 //Didn't get a comm: sleep.
67 //Cleanup the receiving communication.
68 if (node->receive_comm != NULL) {
69 if (MSG_comm_test(node->receive_comm)
70 && MSG_comm_get_status(node->receive_comm) == MSG_OK) {
71 task_free(MSG_comm_get_task(node->receive_comm));
73 MSG_comm_destroy(node->receive_comm);
78 * \brief Node function
81 * - the ID of the person I know in the system (or not)
82 * - Time before I leave the system because I'm bored
84 static int node(int argc, char *argv[])
86 unsigned int join_sucess = 1;
88 xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
89 /* Node initialization */
90 unsigned int id = strtoul(argv[1], NULL, 0);
91 node_t node = node_init(id);
94 XBT_INFO("Hi, I'm going to join the network with id %s", node->mailbox);
95 unsigned int id_known = strtoul(argv[2], NULL, 0);
96 join_sucess = join(node, id_known);
97 deadline = strtod(argv[3], NULL);
99 deadline = strtod(argv[2], NULL);
100 XBT_INFO("Hi, I'm going to create the network with id %s", node->mailbox);
101 node_routing_table_update(node, node->id);
104 XBT_VERB("Ok, I'm joining the network with id %s", node->mailbox);
105 //We start the main loop
106 main_loop(node, deadline);
108 XBT_INFO("I couldn't join the network :(");
110 XBT_DEBUG("I'm leaving the network");
111 XBT_INFO("%d/%d FIND_NODE have succeeded", node->find_node_success,
112 node->find_node_success + node->find_node_failed);
119 * Tries to join the network
120 * @param node node data
121 * @param id_known id of the node I know in the network.
123 unsigned int join(node_t node, unsigned int id_known)
127 unsigned int trial = 0;
128 unsigned int i, answer_got = 0;
130 /* Add the guy we know to our routing table and ourselves. */
131 node_routing_table_update(node, node->id);
132 node_routing_table_update(node, id_known);
134 /* First step: Send a "FIND_NODE" request to the node we know */
135 send_find_node(node, id_known, node->id);
137 if (node->receive_comm == NULL) {
138 node->task_received = NULL;
139 node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
141 if (node->receive_comm) {
142 if (MSG_comm_test(node->receive_comm)) {
143 status = MSG_comm_get_status(node->receive_comm);
144 MSG_comm_destroy(node->receive_comm);
145 node->receive_comm = NULL;
146 if (status == MSG_OK) {
147 XBT_DEBUG("Received an answer from the node I know.");
149 //retrieve the node list and ping them.
150 task_data_t data = MSG_task_get_data(node->task_received);
151 xbt_assert((data != NULL), "Null data received");
152 if (data->type == TASK_FIND_NODE_ANSWER) {
153 node_contact_t contact;
154 node_list = data->answer;
155 xbt_dynar_foreach(node_list->nodes, i, contact) {
156 node_routing_table_update(node, contact->id);
157 //ping(node,contact->id);
159 task_free(node->task_received);
161 handle_task(node, node->task_received);
167 MSG_process_sleep(1);
170 MSG_process_sleep(1);
172 } while (answer_got == 0 && trial < max_join_trials);
173 /* Second step: Send a FIND_NODE to a a random node in buckets */
174 unsigned int bucket_id = routing_table_find_bucket(node->table, id_known)->id;
176 ((bucket_id - i) > 0 || (bucket_id + i) <= identifier_size)
177 && i < JOIN_BUCKETS_QUERIES; i++) {
178 if (bucket_id - i > 0) {
179 unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id - i);
180 find_node(node, id_in_bucket, 0);
182 if (bucket_id + i <= identifier_size) {
183 unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id + i);
184 find_node(node, id_in_bucket, 0);
191 * Send a request to find a node in the node routing table.
192 * @brief node our node data
193 * @brief id_to_find the id of the node we are trying to find
195 unsigned int find_node(node_t node, unsigned int id_to_find,
196 unsigned int count_in_stats)
200 unsigned int queries, answers;
201 unsigned int destination_found = 0;
202 unsigned int nodes_added = 0;
203 double time_beginreceive;
204 double timeout, global_timeout = MSG_get_clock() + find_node_global_timeout;
205 unsigned int steps = 0;
207 xbt_assert((id_to_find >= 0), "Id supplied incorrect");
209 /* First we build a list of who we already know */
210 answer_t node_list = node_find_closest(node, id_to_find);
211 xbt_assert((node_list != NULL), "node_list incorrect");
213 XBT_DEBUG("Doing a FIND_NODE on %08x", id_to_find);
217 /* Ask the nodes on our list if they have information about
218 * the node we are trying to find */
222 queries = send_find_node_to_best(node, node_list);
224 timeout = MSG_get_clock() + find_node_timeout;
226 time_beginreceive = MSG_get_clock();
228 if (node->receive_comm == NULL) {
229 node->task_received = NULL;
231 MSG_task_irecv(&node->task_received, node->mailbox);
233 if (node->receive_comm) {
234 if (MSG_comm_test(node->receive_comm)) {
235 status = MSG_comm_get_status(node->receive_comm);
236 MSG_comm_destroy(node->receive_comm);
237 node->receive_comm = NULL;
238 if (status == MSG_OK) {
239 xbt_assert((node->task_received != NULL), "Invalid task received");
240 //Figure out if we received an answer or something else
241 task_data_t data = MSG_task_get_data(node->task_received);
242 xbt_assert((data != NULL), "No data in the task");
244 //Check if what we have received is what we are looking for.
245 if (data->type == TASK_FIND_NODE_ANSWER
246 && data->answer->destination_id == id_to_find) {
248 node_routing_table_update(node, data->sender_id);
249 node_contact_t contact;
250 xbt_dynar_foreach(node_list->nodes, i, contact) {
251 node_routing_table_update(node, contact->id);
255 nodes_added = answer_merge(node_list, data->answer);
256 XBT_DEBUG("Received an answer from %s (%s) with %ld nodes on it",
257 data->answer_to, data->issuer_host_name,
258 xbt_dynar_length(data->answer->nodes));
260 task_free(node->task_received);
262 handle_task(node, node->task_received);
263 //Update the timeout if we didn't have our answer
264 timeout += MSG_get_clock() - time_beginreceive;
265 time_beginreceive = MSG_get_clock();
269 MSG_process_sleep(1);
272 MSG_process_sleep(1);
274 } while (MSG_get_clock() < timeout && answers < queries);
275 destination_found = answer_destination_found(node_list);
276 } while (!destination_found && (nodes_added > 0 || answers == 0)
277 && MSG_get_clock() < global_timeout && steps < MAX_STEPS);
278 if (destination_found) {
280 node->find_node_success++;
282 XBT_VERB("FIND_NODE on %08x success in %d steps", id_to_find, steps);
283 node_routing_table_update(node, id_to_find);
285 if (count_in_stats) {
286 node->find_node_failed++;
287 XBT_VERB("%08x not found in %d steps", id_to_find, steps);
290 answer_free(node_list);
291 return destination_found;
295 * Pings a node in the system to see if it is online.
296 * @param node Our node data
297 * @param id_to_ping the id of a node we want to see if it is online.
298 * @return if the ping succeded or not.
300 unsigned int ping(node_t node, unsigned int id_to_ping)
302 char mailbox[MAILBOX_NAME_SIZE + 1];
303 sprintf(mailbox, "%0*x", MAILBOX_NAME_SIZE, id_to_ping);
305 unsigned int destination_found = 0;
306 double timeout = MSG_get_clock() + ping_timeout;
308 msg_task_t ping_task =
309 task_new_ping(node->id, node->mailbox,
310 MSG_host_get_name(MSG_host_self()));
311 msg_task_t task_received = NULL;
313 XBT_VERB("PING %08x", id_to_ping);
315 //Check that we aren't trying to ping ourselves
316 if (id_to_ping == node->id) {
320 /* Sending the ping task */
321 MSG_task_dsend(ping_task, mailbox, task_free_v);
323 task_received = NULL;
325 MSG_task_receive_with_timeout(&task_received, node->mailbox,
327 if (status == MSG_OK) {
328 xbt_assert((task_received != NULL), "Invalid task received");
329 //Checking if it's what we are waiting for or not.
330 task_data_t data = MSG_task_get_data(task_received);
331 xbt_assert((data != NULL), "didn't receive any data...");
332 if (data->type == TASK_PING_ANSWER && id_to_ping == data->sender_id) {
333 XBT_VERB("Ping to %s succeeded", mailbox);
334 node_routing_table_update(node, data->sender_id);
335 destination_found = 1;
336 task_free(task_received);
338 //If it's not our answer, we answer the query anyway.
339 handle_task(node, task_received);
342 } while (destination_found == 0 && MSG_get_clock() < timeout);
344 if (MSG_get_clock() >= timeout) {
345 XBT_DEBUG("Ping to %s has timeout.", mailbox);
348 if (destination_found == -1) {
349 XBT_DEBUG("It seems that %s is offline...", mailbox);
356 * Does a pseudo-random lookup for someone in the system
357 * @param node caller node data
359 void random_lookup(node_t node)
361 unsigned int id_to_look = RANDOM_LOOKUP_NODE; //Totally random.
362 /* TODO: Use some pseudorandom generator like RngStream. */
363 XBT_DEBUG("I'm doing a random lookup");
364 find_node(node, id_to_look, 1);
368 * @brief Send a "FIND_NODE" to a node
369 * @param node sender node data
370 * @param id node we are querying
371 * @param destination node we are trying to find.
373 void send_find_node(node_t node, unsigned int id, unsigned int destination)
375 char mailbox[MAILBOX_NAME_SIZE + 1];
376 /* Gets the mailbox to send to */
377 get_node_mailbox(id, mailbox);
380 task_new_find_node(node->id, destination, node->mailbox,
381 MSG_host_get_name(MSG_host_self()));
383 xbt_assert((task != NULL), "Trying to send a NULL task.");
384 MSG_task_dsend(task, mailbox, task_free_v);
385 XBT_VERB("Asking %s for its closest nodes", mailbox);
389 * Sends to the best "kademlia_alpha" nodes in the "node_list" array a "FIND_NODE" request, to ask them for their best nodes
391 unsigned int send_find_node_to_best(node_t node, answer_t node_list)
393 unsigned int i = 0, j = 0;
394 unsigned int destination = node_list->destination_id;
395 node_contact_t node_to_query;
396 while (j < kademlia_alpha && i < node_list->size) { /* We need to have at most "kademlia_alpha" requets each time, according to the protocol */
397 /* Gets the node we want to send the query to */
398 node_to_query = xbt_dynar_get_as(node_list->nodes, i, node_contact_t);
399 if (node_to_query->id != node->id) { /* No need to query ourselves */
400 send_find_node(node, node_to_query->id, destination);
409 * \brief Handles an incomming received task
411 void handle_task(node_t node, msg_task_t task)
413 task_data_t data = MSG_task_get_data(task);
414 xbt_assert((data != NULL), "Received NULL data");
415 //Adding/updating the guy to our routing table
416 node_routing_table_update(node, data->sender_id);
417 switch (data->type) {
419 handle_find_node(node, data);
421 case TASK_FIND_NODE_ANSWER:
422 XBT_DEBUG("Received a wrong answer for a FIND_NODE");
425 handle_ping(node, data);
435 * \brief Handles the answer to an incomming "find_node" task
437 void handle_find_node(node_t node, task_data_t data)
439 XBT_VERB("Received a FIND_NODE from %s (%s), he's trying to find %08x",
440 data->answer_to, data->issuer_host_name, data->destination_id);
441 //Building the answer to the request
442 answer_t answer = node_find_closest(node, data->destination_id);
443 //Building the task to send
445 task_new_find_node_answer(node->id, data->destination_id, answer,
447 MSG_host_get_name(MSG_host_self()));
449 MSG_task_dsend(task, data->answer_to, task_free_v);
453 * \brief handles the answer to a ping
455 void handle_ping(node_t node, task_data_t data)
457 XBT_VERB("Received a PING request from %s (%s)", data->answer_to,
458 data->issuer_host_name);
459 //Building the answer to the request
461 task_new_ping_answer(node->id, data->answer_to,
462 MSG_host_get_name(MSG_host_self()));
464 MSG_task_dsend(task, data->answer_to, task_free_v);
468 * \brief Main function
470 int main(int argc, char *argv[])
473 MSG_init(&argc, argv);
475 /* Check the arguments */
477 printf("Usage: %s platform_file deployment_file \n", argv[0]);
481 const char *platform_file = argv[1];
482 const char *deployment_file = argv[2];
484 MSG_create_environment(platform_file);
486 MSG_function_register("node", node);
487 MSG_launch_application(deployment_file);
489 msg_error_t res = MSG_main();
491 XBT_CRITICAL("Messages created: %ld", smx_total_comms);
492 XBT_INFO("Simulated time: %g", MSG_get_clock());