Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Example cleaning
[simgrid.git] / examples / msg / kademlia / kademlia.c
1 /* Copyright (c) 2012, 2014-2016. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "kademlia.h"
8 #include "node.h"
9 #include "task.h"
10
11 #include "simgrid/msg.h"
12 #include "xbt/log.h"
13 #include "xbt/asserts.h"
14 /** @addtogroup MSG_examples
15   * <b>kademlia/kademlia.c: Kademlia protocol</b>
16   * Implements the Kademlia protocol, using 32 bits identifiers.
17   */
18 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia, "Messages specific for this msg example");
19
20 extern long unsigned int smx_total_comms;
21
22 /* Main loop for the process */
23 static void main_loop(node_t node, double deadline)
24 {
25   double next_lookup_time = MSG_get_clock() + random_lookup_interval;
26   XBT_VERB("Main loop start");
27   while (MSG_get_clock() < deadline) {
28
29     if (node->receive_comm == NULL) {
30       node->task_received = NULL;
31       node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
32     }
33     if (node->receive_comm) {
34       if (!MSG_comm_test(node->receive_comm)) {
35         /* We search for a pseudo random node */
36         if (MSG_get_clock() >= next_lookup_time) {
37           random_lookup(node);
38           next_lookup_time += random_lookup_interval;
39         } else {
40           //Didn't get a task: sleep for a while...
41           MSG_process_sleep(1);
42         }
43       } else {
44         //There has been a transfer, we need to handle it !
45         msg_error_t status = MSG_comm_get_status(node->receive_comm);
46         MSG_comm_destroy(node->receive_comm);
47         node->receive_comm = NULL;
48
49         if (status == MSG_OK) {
50           xbt_assert((node->task_received != NULL), "We received an incorrect task");
51           handle_task(node, node->task_received);
52         } else {
53           xbt_assert((MSG_comm_get_task(node->receive_comm) == NULL), "Comm failed but received a task.");
54           XBT_DEBUG("Nevermind, the communication has failed.");
55         }
56       }
57     } else {
58       //Didn't get a comm: sleep.
59       MSG_process_sleep(1);
60     }
61   }
62   //Cleanup the receiving communication.
63   if (node->receive_comm != NULL) {
64     if (MSG_comm_test(node->receive_comm) && MSG_comm_get_status(node->receive_comm) == MSG_OK) {
65       task_free(MSG_comm_get_task(node->receive_comm));
66     }
67     MSG_comm_destroy(node->receive_comm);
68   }
69 }
70
71 /** @brief Node function
72   * @param my node ID
73   * @param the ID of the person I know in the system (or not)
74   * @param Time before I leave the system because I'm bored
75   */
76 static int node(int argc, char *argv[])
77 {
78   unsigned int join_sucess = 1;
79   double deadline;
80   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
81   /* Node initialization */
82   unsigned int id = strtoul(argv[1], NULL, 0);
83   node_t node = node_init(id);
84
85   if (argc == 4) {
86     XBT_INFO("Hi, I'm going to join the network with id %s", node->mailbox);
87     unsigned int id_known = strtoul(argv[2], NULL, 0);
88     join_sucess = join(node, id_known);
89     deadline = strtod(argv[3], NULL);
90   } else {
91     deadline = strtod(argv[2], NULL);
92     XBT_INFO("Hi, I'm going to create the network with id %s", node->mailbox);
93     node_routing_table_update(node, node->id);
94   }
95   if (join_sucess) {
96     XBT_VERB("Ok, I'm joining the network with id %s", node->mailbox);
97     //We start the main loop
98     main_loop(node, deadline);
99   } else {
100     XBT_INFO("I couldn't join the network :(");
101   }
102   XBT_DEBUG("I'm leaving the network");
103   XBT_INFO("%d/%d FIND_NODE have succeeded", node->find_node_success, node->find_node_success + node->find_node_failed);
104   node_free(node);
105
106   return 0;
107 }
108
109 /**
110   * @brief Tries to join the network
111   * @param node node data
112   * @param id_known id of the node I know in the network.
113   */
114 unsigned int join(node_t node, unsigned int id_known)
115 {
116   answer_t node_list;
117   msg_error_t status;
118   unsigned int trial = 0;
119   unsigned int i, answer_got = 0;
120
121   /* Add the guy we know to our routing table and ourselves. */
122   node_routing_table_update(node, node->id);
123   node_routing_table_update(node, id_known);
124
125   /* First step: Send a "FIND_NODE" request to the node we know */
126   send_find_node(node, id_known, node->id);
127   do {
128     if (node->receive_comm == NULL) {
129       node->task_received = NULL;
130       node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
131     }
132     if (node->receive_comm) {
133       if (MSG_comm_test(node->receive_comm)) {
134         status = MSG_comm_get_status(node->receive_comm);
135         MSG_comm_destroy(node->receive_comm);
136         node->receive_comm = NULL;
137         if (status == MSG_OK) {
138           XBT_DEBUG("Received an answer from the node I know.");
139           answer_got = 1;
140           //retrieve the node list and ping them.
141           task_data_t data = MSG_task_get_data(node->task_received);
142           xbt_assert((data != NULL), "Null data received");
143           if (data->type == TASK_FIND_NODE_ANSWER) {
144             node_contact_t contact;
145             node_list = data->answer;
146             xbt_dynar_foreach(node_list->nodes, i, contact) {
147               node_routing_table_update(node, contact->id);
148             }
149             task_free(node->task_received);
150           } else {
151             handle_task(node, node->task_received);
152           }
153         } else {
154           trial++;
155         }
156       } else {
157         MSG_process_sleep(1);
158       }
159     } else {
160       MSG_process_sleep(1);
161     }
162   } while (answer_got == 0 && trial < max_join_trials);
163   /* Second step: Send a FIND_NODE to a a random node in buckets */
164   unsigned int bucket_id = routing_table_find_bucket(node->table, id_known)->id;
165   for (i = 0; ((bucket_id - i) > 0 || (bucket_id + i) <= identifier_size) && i < JOIN_BUCKETS_QUERIES; i++) {
166     if (bucket_id - i > 0) {
167       unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id - i);
168       find_node(node, id_in_bucket, 0);
169     }
170     if (bucket_id + i <= identifier_size) {
171       unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id + i);
172       find_node(node, id_in_bucket, 0);
173     }
174   }
175   return answer_got;
176 }
177
178 /** @brief Send a request to find a node in the node routing table.
179   * @param node our node data
180   * @param id_to_find the id of the node we are trying to find
181   */
182 unsigned int find_node(node_t node, unsigned int id_to_find, unsigned int count_in_stats)
183 {
184   unsigned int i = 0;
185   unsigned int queries, answers;
186   unsigned int destination_found = 0;
187   unsigned int nodes_added = 0;
188   double time_beginreceive;
189   double timeout, global_timeout = MSG_get_clock() + find_node_global_timeout;
190   unsigned int steps = 0;
191
192   xbt_assert((id_to_find >= 0), "Id supplied incorrect");
193
194   /* First we build a list of who we already know */
195   answer_t node_list = node_find_closest(node, id_to_find);
196   xbt_assert((node_list != NULL), "node_list incorrect");
197
198   XBT_DEBUG("Doing a FIND_NODE on %08x", id_to_find);
199
200   msg_error_t status;
201
202   /* Ask the nodes on our list if they   have information about the node we are trying to find */
203   do {
204     answers = 0;
205     queries = send_find_node_to_best(node, node_list);
206     nodes_added = 0;
207     timeout = MSG_get_clock() + find_node_timeout;
208     steps++;
209     time_beginreceive = MSG_get_clock();
210     do {
211       if (node->receive_comm == NULL) {
212         node->task_received = NULL;
213         node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
214       }
215       if (node->receive_comm) {
216         if (MSG_comm_test(node->receive_comm)) {
217           status = MSG_comm_get_status(node->receive_comm);
218           MSG_comm_destroy(node->receive_comm);
219           node->receive_comm = NULL;
220           if (status == MSG_OK) {
221             xbt_assert((node->task_received != NULL), "Invalid task received");
222             //Figure out if we received an answer or something else
223             task_data_t data = MSG_task_get_data(node->task_received);
224             xbt_assert((data != NULL), "No data in the task");
225
226             //Check if what we have received is what we are looking for.
227             if (data->type == TASK_FIND_NODE_ANSWER && data->answer->destination_id == id_to_find) {
228               //Handle the answer
229               node_routing_table_update(node, data->sender_id);
230               node_contact_t contact;
231               xbt_dynar_foreach(node_list->nodes, i, contact) {
232                 node_routing_table_update(node, contact->id);
233               }
234               answers++;
235
236               nodes_added = answer_merge(node_list, data->answer);
237               XBT_DEBUG("Received an answer from %s (%s) with %ld nodes on it",
238                         data->answer_to, data->issuer_host_name, xbt_dynar_length(data->answer->nodes));
239
240               task_free(node->task_received);
241             } else {
242               handle_task(node, node->task_received);
243               //Update the timeout if we didn't have our answer
244               timeout += MSG_get_clock() - time_beginreceive;
245               time_beginreceive = MSG_get_clock();
246             }
247           }
248         } else {
249           MSG_process_sleep(1);
250         }
251       } else {
252         MSG_process_sleep(1);
253       }
254     } while (MSG_get_clock() < timeout && answers < queries);
255     destination_found = answer_destination_found(node_list);
256   } while (!destination_found && (nodes_added > 0 || answers == 0) && MSG_get_clock() < global_timeout
257             && steps < MAX_STEPS);
258   if (destination_found) {
259     if (count_in_stats)
260       node->find_node_success++;
261     if (queries > 4)
262       XBT_VERB("FIND_NODE on %08x success in %d steps", id_to_find, steps);
263     node_routing_table_update(node, id_to_find);
264   } else {
265     if (count_in_stats) {
266       node->find_node_failed++;
267       XBT_VERB("%08x not found in %d steps", id_to_find, steps);
268     }
269   }
270   answer_free(node_list);
271   return destination_found;
272 }
273
274 /** @brief Pings a node in the system to see if it is online.
275   * @param node Our node data
276   * @param id_to_ping the id of a node we want to see if it is online.
277   * @return if the ping succeded or not.
278   */
279 unsigned int ping(node_t node, unsigned int id_to_ping)
280 {
281   char mailbox[MAILBOX_NAME_SIZE + 1];
282   sprintf(mailbox, "%0*x", MAILBOX_NAME_SIZE, id_to_ping);
283
284   unsigned int destination_found = 0;
285   double timeout = MSG_get_clock() + ping_timeout;
286
287   msg_task_t ping_task = task_new_ping(node->id, node->mailbox, MSG_host_get_name(MSG_host_self()));
288   msg_task_t task_received = NULL;
289
290   XBT_VERB("PING %08x", id_to_ping);
291
292   //Check that we aren't trying to ping ourselves
293   if (id_to_ping == node->id) {
294     return 1;
295   }
296
297   /* Sending the ping task */
298   MSG_task_dsend(ping_task, mailbox, task_free_v);
299   do {
300     task_received = NULL;
301     msg_error_t status =
302         MSG_task_receive_with_timeout(&task_received, node->mailbox, ping_timeout);
303     if (status == MSG_OK) {
304       xbt_assert((task_received != NULL), "Invalid task received");
305       //Checking if it's what we are waiting for or not.
306       task_data_t data = MSG_task_get_data(task_received);
307       xbt_assert((data != NULL), "didn't receive any data...");
308       if (data->type == TASK_PING_ANSWER && id_to_ping == data->sender_id) {
309         XBT_VERB("Ping to %s succeeded", mailbox);
310         node_routing_table_update(node, data->sender_id);
311         destination_found = 1;
312         task_free(task_received);
313       } else {
314         //If it's not our answer, we answer the query anyway.
315         handle_task(node, task_received);
316       }
317     }
318   } while (destination_found == 0 && MSG_get_clock() < timeout);
319
320   if (MSG_get_clock() >= timeout) {
321     XBT_DEBUG("Ping to %s has timeout.", mailbox);
322     return 0;
323   }
324   if (destination_found == -1) {
325     XBT_DEBUG("It seems that %s is offline...", mailbox);
326     return 0;
327   }
328   return 1;
329 }
330
331 /** @brief Does a pseudo-random lookup for someone in the system
332   * @param node caller node data
333   */
334 void random_lookup(node_t node)
335 {
336   unsigned int id_to_look = RANDOM_LOOKUP_NODE; //Totally random.
337   /* TODO: Use some pseudorandom generator like RngStream. */
338   XBT_DEBUG("I'm doing a random lookup");
339   find_node(node, id_to_look, 1);
340 }
341
342 /** @brief Send a "FIND_NODE" to a node
343   * @param node sender node data
344   * @param id node we are querying
345   * @param destination node we are trying to find.
346   */
347 void send_find_node(node_t node, unsigned int id, unsigned int destination)
348 {
349   char mailbox[MAILBOX_NAME_SIZE + 1];
350   /* Gets the mailbox to send to */
351   get_node_mailbox(id, mailbox);
352   /* Build the task */
353   msg_task_t task = task_new_find_node(node->id, destination, node->mailbox, MSG_host_get_name(MSG_host_self()));
354   /* Send the task */
355   xbt_assert((task != NULL), "Trying to send a NULL task.");
356   MSG_task_dsend(task, mailbox, task_free_v);
357   XBT_VERB("Asking %s for its closest nodes", mailbox);
358 }
359
360 /**
361   * Sends to the best "kademlia_alpha" nodes in the "node_list" array a "FIND_NODE" request, to ask them for their best nodes
362   */
363 unsigned int send_find_node_to_best(node_t node, answer_t node_list)
364 {
365   unsigned int i = 0, j = 0;
366   unsigned int destination = node_list->destination_id;
367   node_contact_t node_to_query;
368   while (j < kademlia_alpha && i < node_list->size) {
369     /* We need to have at most "kademlia_alpha" requests each time, according to the protocol */
370     /* Gets the node we want to send the query to */
371     node_to_query = xbt_dynar_get_as(node_list->nodes, i, node_contact_t);
372     if (node_to_query->id != node->id) {        /* No need to query ourselves */
373       send_find_node(node, node_to_query->id, destination);
374       j++;
375     }
376     i++;
377   }
378   return i;
379 }
380
381 /** @brief Handles an incoming received task */
382 void handle_task(node_t node, msg_task_t task)
383 {
384   task_data_t data = MSG_task_get_data(task);
385   xbt_assert((data != NULL), "Received NULL data");
386   //Adding/updating the guy to our routing table
387   node_routing_table_update(node, data->sender_id);
388   switch (data->type) {
389   case TASK_FIND_NODE:
390     handle_find_node(node, data);
391     break;
392   case TASK_FIND_NODE_ANSWER:
393     XBT_DEBUG("Received a wrong answer for a FIND_NODE");
394     break;
395   case TASK_PING:
396     handle_ping(node, data);
397     break;
398   default:
399     break;
400   }
401   task_free(task);
402 }
403
404 /** @brief Handles the answer to an incoming "find_node" task */
405 void handle_find_node(node_t node, task_data_t data)
406 {
407   XBT_VERB("Received a FIND_NODE from %s (%s), he's trying to find %08x",
408            data->answer_to, data->issuer_host_name, data->destination_id);
409   //Building the answer to the request
410   answer_t answer = node_find_closest(node, data->destination_id);
411   //Building the task to send
412   msg_task_t task = task_new_find_node_answer(node->id, data->destination_id, answer, node->mailbox,
413                                               MSG_host_get_name(MSG_host_self()));
414   //Sending the task
415   MSG_task_dsend(task, data->answer_to, task_free_v);
416 }
417
418 /** @brief handles the answer to a ping */
419 void handle_ping(node_t node, task_data_t data)
420 {
421   XBT_VERB("Received a PING request from %s (%s)", data->answer_to, data->issuer_host_name);
422   //Building the answer to the request
423   msg_task_t task = task_new_ping_answer(node->id, data->answer_to, MSG_host_get_name(MSG_host_self()));
424
425   MSG_task_dsend(task, data->answer_to, task_free_v);
426 }
427
428 /** @brief Main function */
429 int main(int argc, char *argv[])
430 {
431   MSG_init(&argc, argv);
432
433   /* Check the arguments */
434   xbt_assert(argc > 2, "Usage: %s platform_file deployment_file\n\tExample: %s msg_platform.xml msg_deployment.xml\n",
435              argv[0], argv[0]);
436
437   const char *platform_file = argv[1];
438   const char *deployment_file = argv[2];
439
440   MSG_create_environment(platform_file);
441   MSG_function_register("node", node);
442   MSG_launch_application(deployment_file);
443
444   msg_error_t res = MSG_main();
445
446   XBT_CRITICAL("Messages created: %ld", smx_total_comms);
447   XBT_INFO("Simulated time: %g", MSG_get_clock());
448
449   return res != MSG_OK;
450 }