Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Correct a bug in kademlia
[simgrid.git] / examples / msg / kademlia / kademlia.c
1 /* Copyright (c) 2012. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "kademlia.h"
7 #include "node.h"
8 #include "task.h"
9
10 #include "msg/msg.h"
11 #include "xbt/log.h"
12 #include "xbt/asserts.h"
13 /** @addtogroup MSG_examples
14   * <b>kademlia/kademlia.c: Kademlia protocol</b>
15   * Implements the Kademlia protocol, using 32 bits identifiers.
16   */
17 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia,
18                              "Messages specific for this msg example");
19
20 extern long unsigned int smx_total_comms;
21
22 /**
23   * Main loop for the process
24   */
25 static void main_loop(node_t node, double deadline)
26 {
27   double next_lookup_time = MSG_get_clock() + random_lookup_interval;
28   XBT_VERB("Main loop start");
29   while (MSG_get_clock() < deadline) {
30
31     if (node->receive_comm == NULL) {
32       node->task_received = NULL;
33       node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
34     }
35     if (node->receive_comm) {
36       if (!MSG_comm_test(node->receive_comm)) {
37         /* We search for a pseudo random node */
38         if (MSG_get_clock() >= next_lookup_time) {
39           random_lookup(node);
40           next_lookup_time += random_lookup_interval;
41         } else {
42           //Didn't get a task: sleep for a while...
43           MSG_process_sleep(1);
44         }
45       } else {
46         //There has been a transfer, we need to handle it !
47         msg_error_t status = MSG_comm_get_status(node->receive_comm);
48         MSG_comm_destroy(node->receive_comm);
49         node->receive_comm = NULL;
50
51         if (status == MSG_OK) {
52           xbt_assert((node->task_received != NULL),
53                      "We received an incorrect task");
54           handle_task(node, node->task_received);
55         } else {
56           xbt_assert((MSG_comm_get_task(node->receive_comm) == NULL),
57                      "Comm failed but received a task.");
58           XBT_DEBUG("Nevermind, the communication has failed.");
59         }
60       }
61     } else {
62       //Didn't get a comm: sleep.
63       MSG_process_sleep(1);
64     }
65   }
66   //Cleanup the receiving communication.
67   if (node->receive_comm != NULL) {
68     if (MSG_comm_test(node->receive_comm)
69         && MSG_comm_get_status(node->receive_comm) == MSG_OK) {
70       task_free(MSG_comm_get_task(node->receive_comm));
71     }
72     MSG_comm_destroy(node->receive_comm);
73   }
74 }
75
76 /**
77   * \brief Node function
78   * Arguments :
79   * - my node ID
80   * - the ID of the person I know in the system (or not)
81   * - Time before I leave the system because I'm bored
82   */
83 static int node(int argc, char *argv[])
84 {
85   unsigned int join_sucess = 1;
86   double deadline;
87   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
88   /* Node initialization */
89   unsigned int id = strtoul(argv[1], NULL, 0);
90   node_t node = node_init(id);
91
92   if (argc == 4) {
93     XBT_INFO("Hi, I'm going to join the network with id %s", node->mailbox);
94     unsigned int id_known = strtoul(argv[2], NULL, 0);
95     join_sucess = join(node, id_known);
96     deadline = strtod(argv[3], NULL);
97   } else {
98     deadline = strtod(argv[2], NULL);
99     XBT_INFO("Hi, I'm going to create the network with id %s", node->mailbox);
100     node_routing_table_update(node, node->id);
101   }
102   if (join_sucess) {
103     XBT_VERB("Ok, I'm joining the network with id %s", node->mailbox);
104     //We start the main loop
105     main_loop(node, deadline);
106   } else {
107     XBT_INFO("I couldn't join the network :(");
108   }
109   XBT_DEBUG("I'm leaving the network");
110   XBT_INFO("%d/%d FIND_NODE have succeeded", node->find_node_success,
111            node->find_node_success + node->find_node_failed);
112   node_free(node);
113
114   return 0;
115 }
116
117 /**
118   * Tries to join the network
119   * @param node node data
120   * @param id_known id of the node I know in the network.
121   */
122 unsigned int join(node_t node, unsigned int id_known)
123 {
124   answer_t node_list;
125   msg_error_t status;
126   unsigned int trial = 0;
127   unsigned int i, answer_got = 0;
128
129   /* Add the guy we know to our routing table and ourselves. */
130   node_routing_table_update(node, node->id);
131   node_routing_table_update(node, id_known);
132
133   /* First step: Send a "FIND_NODE" request to the node we know */
134   send_find_node(node, id_known, node->id);
135   do {
136     if (node->receive_comm == NULL) {
137       node->task_received = NULL;
138       node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
139     }
140     if (node->receive_comm) {
141       if (MSG_comm_test(node->receive_comm)) {
142         status = MSG_comm_get_status(node->receive_comm);
143         MSG_comm_destroy(node->receive_comm);
144         node->receive_comm = NULL;
145         if (status == MSG_OK) {
146           XBT_DEBUG("Received an answer from the node I know.");
147           answer_got = 1;
148           //retrieve the node list and ping them.
149           task_data_t data = MSG_task_get_data(node->task_received);
150           xbt_assert((data != NULL), "Null data received");
151           if (data->type == TASK_FIND_NODE_ANSWER) {
152             node_contact_t contact;
153             node_list = data->answer;
154             xbt_dynar_foreach(node_list->nodes, i, contact) {
155               node_routing_table_update(node, contact->id);
156               //ping(node,contact->id);
157             }
158             task_free(node->task_received);
159           } else {
160             handle_task(node, node->task_received);
161           }
162         } else {
163           trial++;
164         }
165       } else {
166         MSG_process_sleep(1);
167       }
168     } else {
169       MSG_process_sleep(1);
170     }
171   } while (answer_got == 0 && trial < max_join_trials);
172   /* Second step: Send a FIND_NODE to a a random node in buckets */
173   unsigned int bucket_id = routing_table_find_bucket(node->table, id_known)->id;
174   for (i = 0;
175        ((bucket_id - i) > 0 || (bucket_id + i) <= identifier_size)
176        && i < JOIN_BUCKETS_QUERIES; i++) {
177     if (bucket_id - i > 0) {
178       unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id - i);
179       find_node(node, id_in_bucket, 0);
180     }
181     if (bucket_id + i <= identifier_size) {
182       unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id + i);
183       find_node(node, id_in_bucket, 0);
184     }
185   }
186   return answer_got;
187 }
188
189 /**
190   * Send a request to find a node in the node routing table.
191   * @brief node our node data
192   * @brief id_to_find the id of the node we are trying to find
193   */
194 unsigned int find_node(node_t node, unsigned int id_to_find,
195                        unsigned int count_in_stats)
196 {
197
198   unsigned int i = 0;
199   unsigned int queries, answers;
200   unsigned int destination_found = 0;
201   unsigned int nodes_added = 0;
202   double time_beginreceive;
203   double timeout, global_timeout = MSG_get_clock() + find_node_global_timeout;
204   unsigned int steps = 0;
205
206   xbt_assert((id_to_find >= 0), "Id supplied incorrect");
207
208   /* First we build a list of who we already know */
209   answer_t node_list = node_find_closest(node, id_to_find);
210   xbt_assert((node_list != NULL), "node_list incorrect");
211
212   XBT_DEBUG("Doing a FIND_NODE on %08x", id_to_find);
213
214   msg_error_t status;
215
216   /* Ask the nodes on our list if they   have information about
217    * the node we are trying to find */
218
219   do {
220     answers = 0;
221     queries = send_find_node_to_best(node, node_list);
222     nodes_added = 0;
223     timeout = MSG_get_clock() + find_node_timeout;
224     steps++;
225     time_beginreceive = MSG_get_clock();
226     do {
227       if (node->receive_comm == NULL) {
228         node->task_received = NULL;
229         node->receive_comm =
230             MSG_task_irecv(&node->task_received, node->mailbox);
231       }
232       if (node->receive_comm) {
233         if (MSG_comm_test(node->receive_comm)) {
234           status = MSG_comm_get_status(node->receive_comm);
235           MSG_comm_destroy(node->receive_comm);
236           node->receive_comm = NULL;
237           if (status == MSG_OK) {
238             xbt_assert((node->task_received != NULL), "Invalid task received");
239             //Figure out if we received an answer or something else
240             task_data_t data = MSG_task_get_data(node->task_received);
241             xbt_assert((data != NULL), "No data in the task");
242
243             //Check if what we have received is what we are looking for.
244             if (data->type == TASK_FIND_NODE_ANSWER
245                 && data->answer->destination_id == id_to_find) {
246               //Handle the answer
247               node_routing_table_update(node, data->sender_id);
248               node_contact_t contact;
249               xbt_dynar_foreach(node_list->nodes, i, contact) {
250                 node_routing_table_update(node, contact->id);
251               }
252               answers++;
253
254               nodes_added = answer_merge(node_list, data->answer);
255               XBT_DEBUG("Received an answer from %s (%s) with %ld nodes on it",
256                         data->answer_to, data->issuer_host_name,
257                         xbt_dynar_length(data->answer->nodes));
258
259               task_free(node->task_received);
260             } else {
261               handle_task(node, node->task_received);
262               //Update the timeout if we didn't have our answer
263               timeout += MSG_get_clock() - time_beginreceive;
264               time_beginreceive = MSG_get_clock();
265             }
266           }
267         } else {
268           MSG_process_sleep(1);
269         }
270       } else {
271         MSG_process_sleep(1);
272       }
273     } while (MSG_get_clock() < timeout && answers < queries);
274     destination_found = answer_destination_found(node_list);
275   } while (!destination_found && (nodes_added > 0 || answers == 0)
276            && MSG_get_clock() < global_timeout && steps < MAX_STEPS);
277   if (destination_found) {
278     if (count_in_stats)
279       node->find_node_success++;
280     if (queries > 4)
281       XBT_VERB("FIND_NODE on %08x success in %d steps", id_to_find, steps);
282     node_routing_table_update(node, id_to_find);
283   } else {
284     if (count_in_stats) {
285       node->find_node_failed++;
286       XBT_VERB("%08x not found in %d steps", id_to_find, steps);
287     }
288   }
289   answer_free(node_list);
290   return destination_found;
291 }
292
293 /**
294   * Pings a node in the system to see if it is online.
295   * @param node Our node data
296   * @param id_to_ping the id of a node we want to see if it is online.
297   * @return if the ping succeded or not.
298   */
299 unsigned int ping(node_t node, unsigned int id_to_ping)
300 {
301   char mailbox[MAILBOX_NAME_SIZE + 1];
302   sprintf(mailbox, "%0*x", MAILBOX_NAME_SIZE, id_to_ping);
303
304   unsigned int destination_found = 0;
305   double timeout = MSG_get_clock() + ping_timeout;
306
307   msg_task_t ping_task =
308       task_new_ping(node->id, node->mailbox,
309                     MSG_host_get_name(MSG_host_self()));
310   msg_task_t task_received = NULL;
311
312   XBT_VERB("PING %08x", id_to_ping);
313
314   //Check that we aren't trying to ping ourselves
315   if (id_to_ping == node->id) {
316     return 1;
317   }
318
319   /* Sending the ping task */
320   MSG_task_dsend(ping_task, mailbox, task_free_v);
321   do {
322     task_received = NULL;
323     msg_error_t status =
324         MSG_task_receive_with_timeout(&task_received, node->mailbox,
325                                       ping_timeout);
326     if (status == MSG_OK) {
327       xbt_assert((task_received != NULL), "Invalid task received");
328       //Checking if it's what we are waiting for or not.
329       task_data_t data = MSG_task_get_data(task_received);
330       xbt_assert((data != NULL), "didn't receive any data...");
331       if (data->type == TASK_PING_ANSWER && id_to_ping == data->sender_id) {
332         XBT_VERB("Ping to %s succeeded", mailbox);
333         node_routing_table_update(node, data->sender_id);
334         destination_found = 1;
335         task_free(task_received);
336       } else {
337         //If it's not our answer, we answer the query anyway.
338         handle_task(node, task_received);
339       }
340     }
341   } while (destination_found == 0 && MSG_get_clock() < timeout);
342
343   if (MSG_get_clock() >= timeout) {
344     XBT_DEBUG("Ping to %s has timeout.", mailbox);
345     return 0;
346   }
347   if (destination_found == -1) {
348     XBT_DEBUG("It seems that %s is offline...", mailbox);
349     return 0;
350   }
351   return 1;
352 }
353
354 /**
355   * Does a pseudo-random lookup for someone in the system
356   * @param node caller node data
357   */
358 void random_lookup(node_t node)
359 {
360   unsigned int id_to_look = RANDOM_LOOKUP_NODE; //Totally random.
361   /* TODO: Use some pseudorandom generator like RngStream. */
362   XBT_DEBUG("I'm doing a random lookup");
363   find_node(node, id_to_look, 1);
364 }
365
366 /**
367   * @brief Send a "FIND_NODE" to a node
368   * @param node sender node data
369   * @param id node we are querying
370   * @param destination node we are trying to find.
371   */
372 void send_find_node(node_t node, unsigned int id, unsigned int destination)
373 {
374   char mailbox[MAILBOX_NAME_SIZE + 1];
375   /* Gets the mailbox to send to */
376   get_node_mailbox(id, mailbox);
377   /* Build the task */
378   msg_task_t task =
379       task_new_find_node(node->id, destination, node->mailbox,
380                          MSG_host_get_name(MSG_host_self()));
381   /* Send the task */
382   xbt_assert((task != NULL), "Trying to send a NULL task.");
383   MSG_task_dsend(task, mailbox, task_free_v);
384   XBT_VERB("Asking %s for its closest nodes", mailbox);
385 }
386
387 /**
388   * Sends to the best "kademlia_alpha" nodes in the "node_list" array a "FIND_NODE" request, to ask them for their best nodes
389   */
390 unsigned int send_find_node_to_best(node_t node, answer_t node_list)
391 {
392   unsigned int i = 0, j = 0;
393   unsigned int destination = node_list->destination_id;
394   node_contact_t node_to_query;
395   while (j < kademlia_alpha && i < node_list->size) {   /* We need to have at most "kademlia_alpha" requets each time, according to the protocol */
396     /* Gets the node we want to send the query to */
397     node_to_query = xbt_dynar_get_as(node_list->nodes, i, node_contact_t);
398     if (node_to_query->id != node->id) {        /* No need to query ourselves */
399       send_find_node(node, node_to_query->id, destination);
400       j++;
401     }
402     i++;
403   }
404   return i;
405 }
406
407 /**
408   * \brief Handles an incomming received task
409   */
410 void handle_task(node_t node, msg_task_t task)
411 {
412   task_data_t data = MSG_task_get_data(task);
413   xbt_assert((data != NULL), "Received NULL data");
414   //Adding/updating the guy to our routing table
415   node_routing_table_update(node, data->sender_id);
416   switch (data->type) {
417   case TASK_FIND_NODE:
418     handle_find_node(node, data);
419     break;
420   case TASK_FIND_NODE_ANSWER:
421     XBT_DEBUG("Received a wrong answer for a FIND_NODE");
422     break;
423   case TASK_PING:
424     handle_ping(node, data);
425     break;
426   default:
427
428     break;
429   }
430   task_free(task);
431 }
432
433 /**
434   * \brief Handles the answer to an incomming "find_node" task
435   */
436 void handle_find_node(node_t node, task_data_t data)
437 {
438   XBT_VERB("Received a FIND_NODE from %s (%s), he's trying to find %08x",
439            data->answer_to, data->issuer_host_name, data->destination_id);
440   //Building the answer to the request
441   answer_t answer = node_find_closest(node, data->destination_id);
442   //Building the task to send
443   msg_task_t task =
444       task_new_find_node_answer(node->id, data->destination_id, answer,
445                                 node->mailbox,
446                                 MSG_host_get_name(MSG_host_self()));
447   //Sending the task
448   MSG_task_dsend(task, data->answer_to, task_free_v);
449 }
450
451 /**
452   * \brief handles the answer to a ping
453   */
454 void handle_ping(node_t node, task_data_t data)
455 {
456   XBT_VERB("Received a PING request from %s (%s)", data->answer_to,
457            data->issuer_host_name);
458   //Building the answer to the request
459   msg_task_t task =
460       task_new_ping_answer(node->id, data->answer_to,
461                            MSG_host_get_name(MSG_host_self()));
462
463   MSG_task_dsend(task, data->answer_to, task_free_v);
464 }
465
466 /**
467   * \brief Main function
468   */
469 int main(int argc, char *argv[])
470 {
471
472   MSG_init(&argc, argv);
473
474   /* Check the arguments */
475   if (argc < 3) {
476     printf("Usage: %s platform_file deployment_file \n", argv[0]);
477     return -1;
478   }
479
480   const char *platform_file = argv[1];
481   const char *deployment_file = argv[2];
482
483   MSG_create_environment(platform_file);
484
485   MSG_function_register("node", node);
486   MSG_launch_application(deployment_file);
487
488   msg_error_t res = MSG_main();
489
490   XBT_CRITICAL("Messages created: %ld", smx_total_comms);
491   XBT_INFO("Simulated time: %g", MSG_get_clock());
492
493   if (res == MSG_OK)
494     return 0;
495   else
496     return 1;
497 }