Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Energy, onHostDestruction: ensured ptr existence
[simgrid.git] / examples / msg / kademlia / kademlia.c
1 /* Copyright (c) 2012, 2014-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "kademlia.h"
8 #include "node.h"
9 #include "task.h"
10
11 #include "simgrid/msg.h"
12 #include "xbt/log.h"
13 #include "xbt/asserts.h"
14 /** @addtogroup MSG_examples
15   * <b>kademlia/kademlia.c: Kademlia protocol</b>
16   * Implements the Kademlia protocol, using 32 bits identifiers.
17   */
18 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia,
19                              "Messages specific for this msg example");
20
21 extern long unsigned int smx_total_comms;
22
23 /**
24   * Main loop for the process
25   */
26 static void main_loop(node_t node, double deadline)
27 {
28   double next_lookup_time = MSG_get_clock() + random_lookup_interval;
29   XBT_VERB("Main loop start");
30   while (MSG_get_clock() < deadline) {
31
32     if (node->receive_comm == NULL) {
33       node->task_received = NULL;
34       node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
35     }
36     if (node->receive_comm) {
37       if (!MSG_comm_test(node->receive_comm)) {
38         /* We search for a pseudo random node */
39         if (MSG_get_clock() >= next_lookup_time) {
40           random_lookup(node);
41           next_lookup_time += random_lookup_interval;
42         } else {
43           //Didn't get a task: sleep for a while...
44           MSG_process_sleep(1);
45         }
46       } else {
47         //There has been a transfer, we need to handle it !
48         msg_error_t status = MSG_comm_get_status(node->receive_comm);
49         MSG_comm_destroy(node->receive_comm);
50         node->receive_comm = NULL;
51
52         if (status == MSG_OK) {
53           xbt_assert((node->task_received != NULL),
54                      "We received an incorrect task");
55           handle_task(node, node->task_received);
56         } else {
57           xbt_assert((MSG_comm_get_task(node->receive_comm) == NULL),
58                      "Comm failed but received a task.");
59           XBT_DEBUG("Nevermind, the communication has failed.");
60         }
61       }
62     } else {
63       //Didn't get a comm: sleep.
64       MSG_process_sleep(1);
65     }
66   }
67   //Cleanup the receiving communication.
68   if (node->receive_comm != NULL) {
69     if (MSG_comm_test(node->receive_comm)
70         && MSG_comm_get_status(node->receive_comm) == MSG_OK) {
71       task_free(MSG_comm_get_task(node->receive_comm));
72     }
73     MSG_comm_destroy(node->receive_comm);
74   }
75 }
76
77 /**
78   * \brief Node function
79   * Arguments :
80   * - my node ID
81   * - the ID of the person I know in the system (or not)
82   * - Time before I leave the system because I'm bored
83   */
84 static int node(int argc, char *argv[])
85 {
86   unsigned int join_sucess = 1;
87   double deadline;
88   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
89   /* Node initialization */
90   unsigned int id = strtoul(argv[1], NULL, 0);
91   node_t node = node_init(id);
92
93   if (argc == 4) {
94     XBT_INFO("Hi, I'm going to join the network with id %s", node->mailbox);
95     unsigned int id_known = strtoul(argv[2], NULL, 0);
96     join_sucess = join(node, id_known);
97     deadline = strtod(argv[3], NULL);
98   } else {
99     deadline = strtod(argv[2], NULL);
100     XBT_INFO("Hi, I'm going to create the network with id %s", node->mailbox);
101     node_routing_table_update(node, node->id);
102   }
103   if (join_sucess) {
104     XBT_VERB("Ok, I'm joining the network with id %s", node->mailbox);
105     //We start the main loop
106     main_loop(node, deadline);
107   } else {
108     XBT_INFO("I couldn't join the network :(");
109   }
110   XBT_DEBUG("I'm leaving the network");
111   XBT_INFO("%d/%d FIND_NODE have succeeded", node->find_node_success,
112            node->find_node_success + node->find_node_failed);
113   node_free(node);
114
115   return 0;
116 }
117
118 /**
119   * Tries to join the network
120   * @param node node data
121   * @param id_known id of the node I know in the network.
122   */
123 unsigned int join(node_t node, unsigned int id_known)
124 {
125   answer_t node_list;
126   msg_error_t status;
127   unsigned int trial = 0;
128   unsigned int i, answer_got = 0;
129
130   /* Add the guy we know to our routing table and ourselves. */
131   node_routing_table_update(node, node->id);
132   node_routing_table_update(node, id_known);
133
134   /* First step: Send a "FIND_NODE" request to the node we know */
135   send_find_node(node, id_known, node->id);
136   do {
137     if (node->receive_comm == NULL) {
138       node->task_received = NULL;
139       node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
140     }
141     if (node->receive_comm) {
142       if (MSG_comm_test(node->receive_comm)) {
143         status = MSG_comm_get_status(node->receive_comm);
144         MSG_comm_destroy(node->receive_comm);
145         node->receive_comm = NULL;
146         if (status == MSG_OK) {
147           XBT_DEBUG("Received an answer from the node I know.");
148           answer_got = 1;
149           //retrieve the node list and ping them.
150           task_data_t data = MSG_task_get_data(node->task_received);
151           xbt_assert((data != NULL), "Null data received");
152           if (data->type == TASK_FIND_NODE_ANSWER) {
153             node_contact_t contact;
154             node_list = data->answer;
155             xbt_dynar_foreach(node_list->nodes, i, contact) {
156               node_routing_table_update(node, contact->id);
157               //ping(node,contact->id);
158             }
159             task_free(node->task_received);
160           } else {
161             handle_task(node, node->task_received);
162           }
163         } else {
164           trial++;
165         }
166       } else {
167         MSG_process_sleep(1);
168       }
169     } else {
170       MSG_process_sleep(1);
171     }
172   } while (answer_got == 0 && trial < max_join_trials);
173   /* Second step: Send a FIND_NODE to a a random node in buckets */
174   unsigned int bucket_id = routing_table_find_bucket(node->table, id_known)->id;
175   for (i = 0;
176        ((bucket_id - i) > 0 || (bucket_id + i) <= identifier_size)
177        && i < JOIN_BUCKETS_QUERIES; i++) {
178     if (bucket_id - i > 0) {
179       unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id - i);
180       find_node(node, id_in_bucket, 0);
181     }
182     if (bucket_id + i <= identifier_size) {
183       unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id + i);
184       find_node(node, id_in_bucket, 0);
185     }
186   }
187   return answer_got;
188 }
189
190 /**
191   * Send a request to find a node in the node routing table.
192   * @brief node our node data
193   * @brief id_to_find the id of the node we are trying to find
194   */
195 unsigned int find_node(node_t node, unsigned int id_to_find,
196                        unsigned int count_in_stats)
197 {
198
199   unsigned int i = 0;
200   unsigned int queries, answers;
201   unsigned int destination_found = 0;
202   unsigned int nodes_added = 0;
203   double time_beginreceive;
204   double timeout, global_timeout = MSG_get_clock() + find_node_global_timeout;
205   unsigned int steps = 0;
206
207   xbt_assert((id_to_find >= 0), "Id supplied incorrect");
208
209   /* First we build a list of who we already know */
210   answer_t node_list = node_find_closest(node, id_to_find);
211   xbt_assert((node_list != NULL), "node_list incorrect");
212
213   XBT_DEBUG("Doing a FIND_NODE on %08x", id_to_find);
214
215   msg_error_t status;
216
217   /* Ask the nodes on our list if they   have information about
218    * the node we are trying to find */
219
220   do {
221     answers = 0;
222     queries = send_find_node_to_best(node, node_list);
223     nodes_added = 0;
224     timeout = MSG_get_clock() + find_node_timeout;
225     steps++;
226     time_beginreceive = MSG_get_clock();
227     do {
228       if (node->receive_comm == NULL) {
229         node->task_received = NULL;
230         node->receive_comm =
231             MSG_task_irecv(&node->task_received, node->mailbox);
232       }
233       if (node->receive_comm) {
234         if (MSG_comm_test(node->receive_comm)) {
235           status = MSG_comm_get_status(node->receive_comm);
236           MSG_comm_destroy(node->receive_comm);
237           node->receive_comm = NULL;
238           if (status == MSG_OK) {
239             xbt_assert((node->task_received != NULL), "Invalid task received");
240             //Figure out if we received an answer or something else
241             task_data_t data = MSG_task_get_data(node->task_received);
242             xbt_assert((data != NULL), "No data in the task");
243
244             //Check if what we have received is what we are looking for.
245             if (data->type == TASK_FIND_NODE_ANSWER
246                 && data->answer->destination_id == id_to_find) {
247               //Handle the answer
248               node_routing_table_update(node, data->sender_id);
249               node_contact_t contact;
250               xbt_dynar_foreach(node_list->nodes, i, contact) {
251                 node_routing_table_update(node, contact->id);
252               }
253               answers++;
254
255               nodes_added = answer_merge(node_list, data->answer);
256               XBT_DEBUG("Received an answer from %s (%s) with %ld nodes on it",
257                         data->answer_to, data->issuer_host_name,
258                         xbt_dynar_length(data->answer->nodes));
259
260               task_free(node->task_received);
261             } else {
262               handle_task(node, node->task_received);
263               //Update the timeout if we didn't have our answer
264               timeout += MSG_get_clock() - time_beginreceive;
265               time_beginreceive = MSG_get_clock();
266             }
267           }
268         } else {
269           MSG_process_sleep(1);
270         }
271       } else {
272         MSG_process_sleep(1);
273       }
274     } while (MSG_get_clock() < timeout && answers < queries);
275     destination_found = answer_destination_found(node_list);
276   } while (!destination_found && (nodes_added > 0 || answers == 0)
277            && MSG_get_clock() < global_timeout && steps < MAX_STEPS);
278   if (destination_found) {
279     if (count_in_stats)
280       node->find_node_success++;
281     if (queries > 4)
282       XBT_VERB("FIND_NODE on %08x success in %d steps", id_to_find, steps);
283     node_routing_table_update(node, id_to_find);
284   } else {
285     if (count_in_stats) {
286       node->find_node_failed++;
287       XBT_VERB("%08x not found in %d steps", id_to_find, steps);
288     }
289   }
290   answer_free(node_list);
291   return destination_found;
292 }
293
294 /**
295   * Pings a node in the system to see if it is online.
296   * @param node Our node data
297   * @param id_to_ping the id of a node we want to see if it is online.
298   * @return if the ping succeded or not.
299   */
300 unsigned int ping(node_t node, unsigned int id_to_ping)
301 {
302   char mailbox[MAILBOX_NAME_SIZE + 1];
303   sprintf(mailbox, "%0*x", MAILBOX_NAME_SIZE, id_to_ping);
304
305   unsigned int destination_found = 0;
306   double timeout = MSG_get_clock() + ping_timeout;
307
308   msg_task_t ping_task =
309       task_new_ping(node->id, node->mailbox,
310                     MSG_host_get_name(MSG_host_self()));
311   msg_task_t task_received = NULL;
312
313   XBT_VERB("PING %08x", id_to_ping);
314
315   //Check that we aren't trying to ping ourselves
316   if (id_to_ping == node->id) {
317     return 1;
318   }
319
320   /* Sending the ping task */
321   MSG_task_dsend(ping_task, mailbox, task_free_v);
322   do {
323     task_received = NULL;
324     msg_error_t status =
325         MSG_task_receive_with_timeout(&task_received, node->mailbox,
326                                       ping_timeout);
327     if (status == MSG_OK) {
328       xbt_assert((task_received != NULL), "Invalid task received");
329       //Checking if it's what we are waiting for or not.
330       task_data_t data = MSG_task_get_data(task_received);
331       xbt_assert((data != NULL), "didn't receive any data...");
332       if (data->type == TASK_PING_ANSWER && id_to_ping == data->sender_id) {
333         XBT_VERB("Ping to %s succeeded", mailbox);
334         node_routing_table_update(node, data->sender_id);
335         destination_found = 1;
336         task_free(task_received);
337       } else {
338         //If it's not our answer, we answer the query anyway.
339         handle_task(node, task_received);
340       }
341     }
342   } while (destination_found == 0 && MSG_get_clock() < timeout);
343
344   if (MSG_get_clock() >= timeout) {
345     XBT_DEBUG("Ping to %s has timeout.", mailbox);
346     return 0;
347   }
348   if (destination_found == -1) {
349     XBT_DEBUG("It seems that %s is offline...", mailbox);
350     return 0;
351   }
352   return 1;
353 }
354
355 /**
356   * Does a pseudo-random lookup for someone in the system
357   * @param node caller node data
358   */
359 void random_lookup(node_t node)
360 {
361   unsigned int id_to_look = RANDOM_LOOKUP_NODE; //Totally random.
362   /* TODO: Use some pseudorandom generator like RngStream. */
363   XBT_DEBUG("I'm doing a random lookup");
364   find_node(node, id_to_look, 1);
365 }
366
367 /**
368   * @brief Send a "FIND_NODE" to a node
369   * @param node sender node data
370   * @param id node we are querying
371   * @param destination node we are trying to find.
372   */
373 void send_find_node(node_t node, unsigned int id, unsigned int destination)
374 {
375   char mailbox[MAILBOX_NAME_SIZE + 1];
376   /* Gets the mailbox to send to */
377   get_node_mailbox(id, mailbox);
378   /* Build the task */
379   msg_task_t task =
380       task_new_find_node(node->id, destination, node->mailbox,
381                          MSG_host_get_name(MSG_host_self()));
382   /* Send the task */
383   xbt_assert((task != NULL), "Trying to send a NULL task.");
384   MSG_task_dsend(task, mailbox, task_free_v);
385   XBT_VERB("Asking %s for its closest nodes", mailbox);
386 }
387
388 /**
389   * Sends to the best "kademlia_alpha" nodes in the "node_list" array a "FIND_NODE" request, to ask them for their best nodes
390   */
391 unsigned int send_find_node_to_best(node_t node, answer_t node_list)
392 {
393   unsigned int i = 0, j = 0;
394   unsigned int destination = node_list->destination_id;
395   node_contact_t node_to_query;
396   while (j < kademlia_alpha && i < node_list->size) {   /* We need to have at most "kademlia_alpha" requets each time, according to the protocol */
397     /* Gets the node we want to send the query to */
398     node_to_query = xbt_dynar_get_as(node_list->nodes, i, node_contact_t);
399     if (node_to_query->id != node->id) {        /* No need to query ourselves */
400       send_find_node(node, node_to_query->id, destination);
401       j++;
402     }
403     i++;
404   }
405   return i;
406 }
407
408 /**
409   * \brief Handles an incomming received task
410   */
411 void handle_task(node_t node, msg_task_t task)
412 {
413   task_data_t data = MSG_task_get_data(task);
414   xbt_assert((data != NULL), "Received NULL data");
415   //Adding/updating the guy to our routing table
416   node_routing_table_update(node, data->sender_id);
417   switch (data->type) {
418   case TASK_FIND_NODE:
419     handle_find_node(node, data);
420     break;
421   case TASK_FIND_NODE_ANSWER:
422     XBT_DEBUG("Received a wrong answer for a FIND_NODE");
423     break;
424   case TASK_PING:
425     handle_ping(node, data);
426     break;
427   default:
428
429     break;
430   }
431   task_free(task);
432 }
433
434 /**
435   * \brief Handles the answer to an incomming "find_node" task
436   */
437 void handle_find_node(node_t node, task_data_t data)
438 {
439   XBT_VERB("Received a FIND_NODE from %s (%s), he's trying to find %08x",
440            data->answer_to, data->issuer_host_name, data->destination_id);
441   //Building the answer to the request
442   answer_t answer = node_find_closest(node, data->destination_id);
443   //Building the task to send
444   msg_task_t task =
445       task_new_find_node_answer(node->id, data->destination_id, answer,
446                                 node->mailbox,
447                                 MSG_host_get_name(MSG_host_self()));
448   //Sending the task
449   MSG_task_dsend(task, data->answer_to, task_free_v);
450 }
451
452 /**
453   * \brief handles the answer to a ping
454   */
455 void handle_ping(node_t node, task_data_t data)
456 {
457   XBT_VERB("Received a PING request from %s (%s)", data->answer_to,
458            data->issuer_host_name);
459   //Building the answer to the request
460   msg_task_t task =
461       task_new_ping_answer(node->id, data->answer_to,
462                            MSG_host_get_name(MSG_host_self()));
463
464   MSG_task_dsend(task, data->answer_to, task_free_v);
465 }
466
467 /**
468   * \brief Main function
469   */
470 int main(int argc, char *argv[])
471 {
472
473   MSG_init(&argc, argv);
474
475   /* Check the arguments */
476   xbt_assert(argc > 2, "Usage: %s platform_file deployment_file\n"
477        "\tExample: %s msg_platform.xml msg_deployment.xml\n", 
478        argv[0], argv[0]);
479
480   const char *platform_file = argv[1];
481   const char *deployment_file = argv[2];
482
483   MSG_create_environment(platform_file);
484
485   MSG_function_register("node", node);
486   MSG_launch_application(deployment_file);
487
488   msg_error_t res = MSG_main();
489
490   XBT_CRITICAL("Messages created: %ld", smx_total_comms);
491   XBT_INFO("Simulated time: %g", MSG_get_clock());
492
493   return res != MSG_OK;
494 }