1 /* Copyright (c) 2012. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
12 #include "xbt/asserts.h"
13 /** @addtogroup MSG_examples
14 * <b>kademlia/kademlia.c: Kademlia protocol</b>
15 * Implements the Kademlia protocol, using 32 bits identifiers.
17 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia,
18 "Messages specific for this msg example");
20 extern long unsigned int smx_total_comms;
23 * \brief Node function
26 * - the ID of the person I know in the system (or not)
27 * - Time before I leave the system because I'm bored
29 int node(int argc, char *argv[]) {
30 unsigned int join_sucess = 1, deadline;
31 xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
32 /* Node initialization */
33 unsigned int id = atoi(argv[1]);;
34 node_t node = node_init(id);
37 XBT_INFO("Hi, I'm going to join the network !");
38 unsigned int id_known = (unsigned int)atoi(argv[2]);
39 join_sucess = join(node,id_known);
40 deadline = atoi(argv[3]);
43 deadline = atoi(argv[2]);
44 XBT_INFO("Hi, I'm going to create the network with id %s !",node->mailbox);
45 node_routing_table_update(node,node->id);
48 XBT_VERB("Ok, I'm joining the network with id %s",node->mailbox);
49 //We start the main loop
50 main_loop(node,deadline);
53 XBT_INFO("I couldn't join the network :(");
55 XBT_DEBUG("I'm leaving the network");
56 XBT_INFO("%d/%d FIND_NODE have succeeded",node->find_node_success,node->find_node_success + node->find_node_failed);
62 * Main loop for the process
64 void main_loop(node_t node, unsigned int deadline) {
65 double next_lookup_time = MSG_get_clock() + random_lookup_interval;
66 XBT_VERB("Main loop start");
67 while (MSG_get_clock() < deadline) {
69 if (node->receive_comm == NULL) {
70 node->task_received = NULL;
71 node->receive_comm = MSG_task_irecv(&node->task_received,node->mailbox);
73 if (node->receive_comm) {
74 if (!MSG_comm_test(node->receive_comm)) {
75 /* We search for a pseudo random node */
76 if (MSG_get_clock() >= next_lookup_time) {
78 next_lookup_time += random_lookup_interval;
81 //Didn't get a task: sleep for a while...
86 //There has been a transfer, we need to handle it !
87 msg_error_t status = MSG_comm_get_status(node->receive_comm);
88 MSG_comm_destroy(node->receive_comm);
89 node->receive_comm = NULL;
91 if (status == MSG_OK) {
92 xbt_assert( (node->task_received != NULL), "We received an incorrect task");
93 handle_task(node,node->task_received);
96 xbt_assert((MSG_comm_get_task(node->receive_comm) == NULL),"Comm failed but received a task.");
97 XBT_DEBUG("Nevermind, the communication has failed.");
102 //Didn't get a comm: sleep.
103 MSG_process_sleep(1);
106 //Cleanup the receiving communication.
107 if (node->receive_comm != NULL ) {
108 if (MSG_comm_test(node->receive_comm) && MSG_comm_get_status(node->receive_comm) == MSG_OK) {
109 task_free(MSG_comm_get_task(node->receive_comm));
111 MSG_comm_destroy(node->receive_comm);
115 * Tries to join the network
116 * @param node node data
117 * @param id_known id of the node I know in the network.
119 unsigned int join(node_t node, unsigned int id_known) {
122 unsigned int trial = 0;
123 unsigned int i, answer_got = 0;
125 /* Add the guy we know to our routing table and ourselves. */
126 node_routing_table_update(node,node->id);
127 node_routing_table_update(node,id_known);
129 /* First step: Send a "FIND_NODE" request to the node we know */
130 send_find_node(node,id_known,node->id);
132 if (node->receive_comm == NULL) {
133 node->task_received = NULL;
134 node->receive_comm = MSG_task_irecv(&node->task_received,node->mailbox);
136 if (node->receive_comm) {
137 if (MSG_comm_test(node->receive_comm)) {
138 status = MSG_comm_get_status(node->receive_comm);
139 MSG_comm_destroy(node->receive_comm);
140 node->receive_comm = NULL;
141 if (status == MSG_OK) {
142 XBT_DEBUG("Received an answer from the node I know.");
144 //retrieve the node list and ping them.
145 task_data_t data = MSG_task_get_data(node->task_received);
146 xbt_assert( (data != NULL), "Null data received");
147 if (data->type == TASK_FIND_NODE_ANSWER) {
148 node_contact_t contact;
149 node_list = data->answer;
150 xbt_dynar_foreach(node_list->nodes,i, contact) {
151 node_routing_table_update(node,contact->id);
152 //ping(node,contact->id);
154 task_free(node->task_received);
157 handle_task(node,node->task_received);
165 MSG_process_sleep(1);
169 MSG_process_sleep(1);
171 } while (answer_got == 0 && trial < max_join_trials);
172 /* Second step: Send a FIND_NODE to a a random node in buckets */
173 unsigned int bucket_id = routing_table_find_bucket(node->table,id_known)->id;
174 for (i = 0; ((bucket_id -i) > 0 || (bucket_id + i) <= identifier_size) && i < JOIN_BUCKETS_QUERIES; i++) {
175 if (bucket_id - i > 0) {
176 unsigned int id_in_bucket = get_id_in_prefix(node->id,bucket_id - i);
177 find_node(node,id_in_bucket,0);
179 if (bucket_id + i <= identifier_size) {
180 unsigned int id_in_bucket = get_id_in_prefix(node->id,bucket_id + i);
181 find_node(node,id_in_bucket,0);
187 * Send a request to find a node in the node routing table.
188 * @brief node our node data
189 * @brief id_to_find the id of the node we are trying to find
191 unsigned int find_node(node_t node, unsigned int id_to_find, unsigned int count_in_stats) {
194 unsigned int queries, answers;
195 unsigned int destination_found = 0;
196 unsigned int nodes_added = 0;
197 double time_beginreceive;
198 double timeout, global_timeout = MSG_get_clock() + find_node_global_timeout;
199 unsigned int steps = 0;
201 xbt_assert( (id_to_find >= 0), "Id supplied incorrect");
203 /* First we build a list of who we already know */
204 answer_t node_list = node_find_closest(node,id_to_find);
205 xbt_assert((node_list != NULL),"node_list incorrect");
207 XBT_DEBUG("Doing a FIND_NODE on %d", id_to_find);
211 /* Ask the nodes on our list if they have information about
212 * the node we are trying to find */
216 queries = send_find_node_to_best(node,node_list);
218 timeout = MSG_get_clock() + find_node_timeout;
220 time_beginreceive = MSG_get_clock();
222 if (node->receive_comm == NULL) {
223 node->task_received = NULL;
224 node->receive_comm = MSG_task_irecv(&node->task_received,node->mailbox);
226 if (node->receive_comm) {
227 if (MSG_comm_test(node->receive_comm)) {
228 status = MSG_comm_get_status(node->receive_comm);
229 MSG_comm_destroy(node->receive_comm);
230 node->receive_comm = NULL;
231 if (status == MSG_OK) {
232 xbt_assert( (node->task_received != NULL), "Invalid task received");
233 //Figure out if we received an answer or something else
234 task_data_t data = MSG_task_get_data(node->task_received);
235 xbt_assert( (data != NULL), "No data in the task");
237 //Check if what we have received is what we are looking for.
238 if (data->type == TASK_FIND_NODE_ANSWER && data->answer->destination_id == id_to_find) {
240 node_routing_table_update(node,data->sender_id);
241 node_contact_t contact;
242 xbt_dynar_foreach(node_list->nodes,i, contact) {
243 node_routing_table_update(node,contact->id);
247 nodes_added = answer_merge(node_list,data->answer);
248 XBT_DEBUG("Received an answer from %s (%s) with %ld nodes on it",data->answer_to,data->issuer_host_name,xbt_dynar_length(data->answer->nodes));
250 task_free(node->task_received);
253 handle_task(node,node->task_received);
254 //Update the timeout if we didn't have our answer
255 timeout += MSG_get_clock() - time_beginreceive;
256 time_beginreceive = MSG_get_clock();
261 MSG_process_sleep(1);
265 MSG_process_sleep(1);
267 } while (MSG_get_clock() < timeout && answers < queries) ;
268 destination_found = answer_destination_found(node_list);
269 } while (!destination_found && (nodes_added > 0 || answers == 0) && MSG_get_clock() < global_timeout && steps < MAX_STEPS);
270 if (destination_found) {
272 node->find_node_success++;
274 XBT_VERB("FIND_NODE on %d success in %d steps",id_to_find,steps);
275 node_routing_table_update(node,id_to_find);
278 if (count_in_stats) {
279 node->find_node_failed++;
280 XBT_VERB("%d not found in %d steps",id_to_find,steps);
283 answer_free(node_list);
284 return destination_found;
287 * Pings a node in the system to see if it is online.
288 * @param node Our node data
289 * @param id_to_ping the id of a node we want to see if it is online.
290 * @return if the ping succeded or not.
292 unsigned int ping(node_t node, unsigned int id_to_ping) {
293 char mailbox[MAILBOX_NAME_SIZE];
294 sprintf(mailbox,"%d",id_to_ping);
296 unsigned int destination_found = 0;
297 unsigned int timeout = MSG_get_clock() + ping_timeout;
299 msg_task_t ping_task = task_new_ping(node->id,node->mailbox,MSG_host_get_name(MSG_host_self()));
300 msg_task_t task_received = NULL;
302 XBT_VERB("PING %d",id_to_ping);
304 //Check that we aren't trying to ping ourselves
305 if (id_to_ping == node->id) {
309 /* Sending the ping task */
310 MSG_task_dsend(ping_task,mailbox,task_free_v);
313 task_received = NULL;
314 msg_error_t status = MSG_task_receive_with_timeout(&task_received,node->mailbox,ping_timeout);
315 if (status == MSG_OK) {
316 xbt_assert( (task_received != NULL), "Invalid task received");
317 //Checking if it's what we are waiting for or not.
318 task_data_t data = MSG_task_get_data(task_received);
319 xbt_assert( (data != NULL) ,"didn't receive any data...");
320 if (data->type == TASK_PING_ANSWER && id_to_ping == data->sender_id) {
321 XBT_VERB("Ping to %s succeeded",mailbox);
322 node_routing_table_update(node,data->sender_id);
323 destination_found = 1;
324 task_free(task_received);
327 //If it's not our answer, we answer the query anyway.
328 handle_task(node,task_received);
331 } while (destination_found == 0 && MSG_get_clock() < timeout);
333 if (MSG_get_clock() >= timeout) {
334 XBT_DEBUG("Ping to %s has timeout.",mailbox);
337 if (destination_found == -1) {
338 XBT_DEBUG("It seems that %s is offline...",mailbox);
344 * Does a pseudo-random lookup for someone in the system
345 * @param node caller node data
347 void random_lookup(node_t node) {
348 unsigned int id_to_look = RANDOM_LOOKUP_NODE; //Totally random.
349 /* TODO: Use some pseudorandom generator like RngStream. */
350 XBT_DEBUG("I'm doing a random lookup");
351 find_node(node,id_to_look,1);
354 * @brief Send a "FIND_NODE" to a node
355 * @param node sender node data
356 * @param id node we are querying
357 * @param destination node we are trying to find.
359 void send_find_node(node_t node, unsigned int id, unsigned int destination) {
360 char mailbox[MAILBOX_NAME_SIZE];
361 /* Gets the mailbox to send to */
362 get_node_mailbox(id,mailbox);
364 msg_task_t task = task_new_find_node(node->id,destination,node->mailbox,MSG_host_get_name(MSG_host_self()));
366 xbt_assert( (task != NULL), "Trying to send a NULL task.");
367 MSG_task_dsend(task,mailbox,task_free_v);
368 XBT_VERB("Asking %s for its closest nodes",mailbox);
371 * Sends to the best "kademlia_alpha" nodes in the "node_list" array a "FIND_NODE" request, to ask them for their best nodes
373 unsigned int send_find_node_to_best(node_t node, answer_t node_list) {
374 unsigned int i = 0, j = 0;
375 unsigned int destination = node_list->destination_id;
376 node_contact_t node_to_query;
377 while (j < kademlia_alpha && i < node_list->size) { /* We need to have at most "kademlia_alpha" requets each time, according to the protocol */
378 /* Gets the node we want to send the query to */
379 node_to_query = xbt_dynar_get_as(node_list->nodes,i,node_contact_t);
380 if (node_to_query->id != node->id) { /* No need to query ourselves */
381 send_find_node(node,node_to_query->id,destination);
389 * \brief Handles an incomming received task
391 void handle_task(node_t node, msg_task_t task) {
392 task_data_t data = MSG_task_get_data(task);
393 xbt_assert( (data != NULL), "Received NULL data");
394 //Adding/updating the guy to our routing table
395 node_routing_table_update(node,data->sender_id);
396 switch (data->type) {
398 handle_find_node(node,data);
400 case TASK_FIND_NODE_ANSWER:
401 XBT_DEBUG("Received a wrong answer for a FIND_NODE");
404 handle_ping(node,data);
413 * \brief Handles the answer to an incomming "find_node" task
415 void handle_find_node(node_t node, task_data_t data) {
416 XBT_VERB("Received a FIND_NODE from %s (%s), he's trying to find %d",data->answer_to,data->issuer_host_name,data->destination_id);
417 //Building the answer to the request
418 answer_t answer = node_find_closest(node,data->destination_id);
419 //Building the task to send
420 msg_task_t task = task_new_find_node_answer(node->id,data->destination_id,answer,node->mailbox,MSG_host_get_name(MSG_host_self()));
422 MSG_task_dsend(task,data->answer_to,task_free_v);
425 * \brief handles the answer to a ping
427 void handle_ping(node_t node, task_data_t data) {
428 XBT_VERB("Received a PING request from %s (%s)",data->answer_to,data->issuer_host_name);
429 //Building the answer to the request
430 msg_task_t task = task_new_ping_answer(node->id,data->answer_to,MSG_host_get_name(MSG_host_self()));
432 MSG_task_dsend(task,data->answer_to,task_free_v);
435 * \brief Main function
437 int main(int argc, char *argv[]) {
439 MSG_init(&argc, argv);
441 /* Check the arguments */
443 printf("Usage: %s platform_file deployment_file \n",argv[0]);
447 const char *platform_file = argv[1];
448 const char *deployment_file = argv[2];
450 MSG_create_environment(platform_file);
452 MSG_function_register("node",node);
453 MSG_launch_application(deployment_file);
455 msg_error_t res = MSG_main();
457 XBT_CRITICAL("Messages created: %ld", smx_total_comms);
458 XBT_INFO("Simulated time: %g", MSG_get_clock());