Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
woopsy, forgot to commit that file
[simgrid.git] / examples / msg / dht-kademlia / dht-kademlia.c
1 /* Copyright (c) 2012, 2014-2016. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "dht-kademlia.h"
8 #include "node.h"
9 #include "task.h"
10
11 #include "simgrid/msg.h"
12 /** @addtogroup MSG_examples
13   * <b>kademlia/kademlia.c: Kademlia protocol</b>
14   * Implements the Kademlia protocol, using 32 bits identifiers.
15   */
16 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia, "Messages specific for this msg example");
17
18 /* Main loop for the process */
19 static void main_loop(node_t node, double deadline)
20 {
21   double next_lookup_time = MSG_get_clock() + random_lookup_interval;
22   XBT_VERB("Main loop start");
23   while (MSG_get_clock() < deadline) {
24
25     if (node->receive_comm == NULL) {
26       node->task_received = NULL;
27       node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
28     }
29     if (node->receive_comm) {
30       if (!MSG_comm_test(node->receive_comm)) {
31         /* We search for a pseudo random node */
32         if (MSG_get_clock() >= next_lookup_time) {
33           random_lookup(node);
34           next_lookup_time += random_lookup_interval;
35         } else {
36           //Didn't get a task: sleep for a while...
37           MSG_process_sleep(1);
38         }
39       } else {
40         //There has been a transfer, we need to handle it !
41         msg_error_t status = MSG_comm_get_status(node->receive_comm);
42         MSG_comm_destroy(node->receive_comm);
43         node->receive_comm = NULL;
44
45         if (status == MSG_OK) {
46           xbt_assert((node->task_received != NULL), "We received an incorrect task");
47           handle_task(node, node->task_received);
48         } else {
49           xbt_assert((MSG_comm_get_task(node->receive_comm) == NULL), "Comm failed but received a task.");
50           XBT_DEBUG("Nevermind, the communication has failed.");
51         }
52       }
53     } else {
54       //Didn't get a comm: sleep.
55       MSG_process_sleep(1);
56     }
57   }
58   //Cleanup the receiving communication.
59   if (node->receive_comm != NULL) {
60     if (MSG_comm_test(node->receive_comm) && MSG_comm_get_status(node->receive_comm) == MSG_OK) {
61       task_free(MSG_comm_get_task(node->receive_comm));
62     }
63     MSG_comm_destroy(node->receive_comm);
64   }
65 }
66
67 /** @brief Node function
68   * @param my node ID
69   * @param the ID of the person I know in the system (or not)
70   * @param Time before I leave the system because I'm bored
71   */
72 static int node(int argc, char *argv[])
73 {
74   unsigned int join_sucess = 1;
75   double deadline;
76   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
77   /* Node initialization */
78   unsigned int id = strtoul(argv[1], NULL, 0);
79   node_t node = node_init(id);
80
81   if (argc == 4) {
82     XBT_INFO("Hi, I'm going to join the network with id %s", node->mailbox);
83     unsigned int id_known = strtoul(argv[2], NULL, 0);
84     join_sucess = join(node, id_known);
85     deadline = strtod(argv[3], NULL);
86   } else {
87     deadline = strtod(argv[2], NULL);
88     XBT_INFO("Hi, I'm going to create the network with id %s", node->mailbox);
89     node_routing_table_update(node, node->id);
90   }
91   if (join_sucess) {
92     XBT_VERB("Ok, I'm joining the network with id %s", node->mailbox);
93     //We start the main loop
94     main_loop(node, deadline);
95   } else {
96     XBT_INFO("I couldn't join the network :(");
97   }
98   XBT_DEBUG("I'm leaving the network");
99   XBT_INFO("%d/%d FIND_NODE have succeeded", node->find_node_success, node->find_node_success + node->find_node_failed);
100   node_free(node);
101
102   return 0;
103 }
104
105 /**
106   * @brief Tries to join the network
107   * @param node node data
108   * @param id_known id of the node I know in the network.
109   */
110 unsigned int join(node_t node, unsigned int id_known)
111 {
112   answer_t node_list;
113   msg_error_t status;
114   unsigned int trial = 0;
115   unsigned int i, answer_got = 0;
116
117   /* Add the guy we know to our routing table and ourselves. */
118   node_routing_table_update(node, node->id);
119   node_routing_table_update(node, id_known);
120
121   /* First step: Send a "FIND_NODE" request to the node we know */
122   send_find_node(node, id_known, node->id);
123   do {
124     if (node->receive_comm == NULL) {
125       node->task_received = NULL;
126       node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
127     }
128     if (node->receive_comm) {
129       if (MSG_comm_test(node->receive_comm)) {
130         status = MSG_comm_get_status(node->receive_comm);
131         MSG_comm_destroy(node->receive_comm);
132         node->receive_comm = NULL;
133         if (status == MSG_OK) {
134           XBT_DEBUG("Received an answer from the node I know.");
135           answer_got = 1;
136           //retrieve the node list and ping them.
137           task_data_t data = MSG_task_get_data(node->task_received);
138           xbt_assert((data != NULL), "Null data received");
139           if (data->type == TASK_FIND_NODE_ANSWER) {
140             node_contact_t contact;
141             node_list = data->answer;
142             xbt_dynar_foreach(node_list->nodes, i, contact) {
143               node_routing_table_update(node, contact->id);
144             }
145             task_free(node->task_received);
146           } else {
147             handle_task(node, node->task_received);
148           }
149         } else {
150           trial++;
151         }
152       } else {
153         MSG_process_sleep(1);
154       }
155     } else {
156       MSG_process_sleep(1);
157     }
158   } while (answer_got == 0 && trial < max_join_trials);
159   /* Second step: Send a FIND_NODE to a a random node in buckets */
160   unsigned int bucket_id = routing_table_find_bucket(node->table, id_known)->id;
161   for (i = 0; ((bucket_id - i) > 0 || (bucket_id + i) <= identifier_size) && i < JOIN_BUCKETS_QUERIES; i++) {
162     if (bucket_id - i > 0) {
163       unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id - i);
164       find_node(node, id_in_bucket, 0);
165     }
166     if (bucket_id + i <= identifier_size) {
167       unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id + i);
168       find_node(node, id_in_bucket, 0);
169     }
170   }
171   return answer_got;
172 }
173
174 /** @brief Send a request to find a node in the node routing table.
175   * @param node our node data
176   * @param id_to_find the id of the node we are trying to find
177   */
178 unsigned int find_node(node_t node, unsigned int id_to_find, unsigned int count_in_stats)
179 {
180   unsigned int i = 0;
181   unsigned int queries, answers;
182   unsigned int destination_found = 0;
183   unsigned int nodes_added = 0;
184   double time_beginreceive;
185   double timeout, global_timeout = MSG_get_clock() + find_node_global_timeout;
186   unsigned int steps = 0;
187
188   xbt_assert((id_to_find >= 0), "Id supplied incorrect");
189
190   /* First we build a list of who we already know */
191   answer_t node_list = node_find_closest(node, id_to_find);
192   xbt_assert((node_list != NULL), "node_list incorrect");
193
194   XBT_DEBUG("Doing a FIND_NODE on %08x", id_to_find);
195
196   msg_error_t status;
197
198   /* Ask the nodes on our list if they   have information about the node we are trying to find */
199   do {
200     answers = 0;
201     queries = send_find_node_to_best(node, node_list);
202     nodes_added = 0;
203     timeout = MSG_get_clock() + find_node_timeout;
204     steps++;
205     time_beginreceive = MSG_get_clock();
206     do {
207       if (node->receive_comm == NULL) {
208         node->task_received = NULL;
209         node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
210       }
211       if (node->receive_comm) {
212         if (MSG_comm_test(node->receive_comm)) {
213           status = MSG_comm_get_status(node->receive_comm);
214           MSG_comm_destroy(node->receive_comm);
215           node->receive_comm = NULL;
216           if (status == MSG_OK) {
217             xbt_assert((node->task_received != NULL), "Invalid task received");
218             //Figure out if we received an answer or something else
219             task_data_t data = MSG_task_get_data(node->task_received);
220             xbt_assert((data != NULL), "No data in the task");
221
222             //Check if what we have received is what we are looking for.
223             if (data->type == TASK_FIND_NODE_ANSWER && data->answer->destination_id == id_to_find) {
224               //Handle the answer
225               node_routing_table_update(node, data->sender_id);
226               node_contact_t contact;
227               xbt_dynar_foreach(node_list->nodes, i, contact) {
228                 node_routing_table_update(node, contact->id);
229               }
230               answers++;
231
232               nodes_added = answer_merge(node_list, data->answer);
233               XBT_DEBUG("Received an answer from %s (%s) with %ld nodes on it",
234                         data->answer_to, data->issuer_host_name, xbt_dynar_length(data->answer->nodes));
235
236               task_free(node->task_received);
237             } else {
238               handle_task(node, node->task_received);
239               //Update the timeout if we didn't have our answer
240               timeout += MSG_get_clock() - time_beginreceive;
241               time_beginreceive = MSG_get_clock();
242             }
243           }
244         } else {
245           MSG_process_sleep(1);
246         }
247       } else {
248         MSG_process_sleep(1);
249       }
250     } while (MSG_get_clock() < timeout && answers < queries);
251     destination_found = answer_destination_found(node_list);
252   } while (!destination_found && (nodes_added > 0 || answers == 0) && MSG_get_clock() < global_timeout
253             && steps < MAX_STEPS);
254   if (destination_found) {
255     if (count_in_stats)
256       node->find_node_success++;
257     if (queries > 4)
258       XBT_VERB("FIND_NODE on %08x success in %d steps", id_to_find, steps);
259     node_routing_table_update(node, id_to_find);
260   } else {
261     if (count_in_stats) {
262       node->find_node_failed++;
263       XBT_VERB("%08x not found in %d steps", id_to_find, steps);
264     }
265   }
266   answer_free(node_list);
267   return destination_found;
268 }
269
270 /** @brief Pings a node in the system to see if it is online.
271   * @param node Our node data
272   * @param id_to_ping the id of a node we want to see if it is online.
273   * @return if the ping succeded or not.
274   */
275 unsigned int ping(node_t node, unsigned int id_to_ping)
276 {
277   char mailbox[MAILBOX_NAME_SIZE];
278   snprintf(mailbox,MAILBOX_NAME_SIZE, "%d", id_to_ping);
279
280   unsigned int destination_found = 0;
281   double timeout = MSG_get_clock() + ping_timeout;
282
283   msg_task_t ping_task = task_new_ping(node->id, node->mailbox, MSG_host_get_name(MSG_host_self()));
284   msg_task_t task_received = NULL;
285
286   XBT_VERB("PING %08x", id_to_ping);
287
288   //Check that we aren't trying to ping ourselves
289   if (id_to_ping == node->id) {
290     return 1;
291   }
292
293   /* Sending the ping task */
294   MSG_task_dsend(ping_task, mailbox, task_free_v);
295   do {
296     task_received = NULL;
297     msg_error_t status =
298         MSG_task_receive_with_timeout(&task_received, node->mailbox, ping_timeout);
299     if (status == MSG_OK) {
300       xbt_assert((task_received != NULL), "Invalid task received");
301       //Checking if it's what we are waiting for or not.
302       task_data_t data = MSG_task_get_data(task_received);
303       xbt_assert((data != NULL), "didn't receive any data...");
304       if (data->type == TASK_PING_ANSWER && id_to_ping == data->sender_id) {
305         XBT_VERB("Ping to %s succeeded", mailbox);
306         node_routing_table_update(node, data->sender_id);
307         destination_found = 1;
308         task_free(task_received);
309       } else {
310         //If it's not our answer, we answer the query anyway.
311         handle_task(node, task_received);
312       }
313     }
314   } while (destination_found == 0 && MSG_get_clock() < timeout);
315
316   if (MSG_get_clock() >= timeout) {
317     XBT_DEBUG("Ping to %s has timeout.", mailbox);
318     return 0;
319   }
320   if (destination_found == -1) {
321     XBT_DEBUG("It seems that %s is offline...", mailbox);
322     return 0;
323   }
324   return 1;
325 }
326
327 /** @brief Does a pseudo-random lookup for someone in the system
328   * @param node caller node data
329   */
330 void random_lookup(node_t node)
331 {
332   unsigned int id_to_look = RANDOM_LOOKUP_NODE; //Totally random.
333   /* TODO: Use some pseudorandom generator like RngStream. */
334   XBT_DEBUG("I'm doing a random lookup");
335   find_node(node, id_to_look, 1);
336 }
337
338 /** @brief Send a "FIND_NODE" to a node
339   * @param node sender node data
340   * @param id node we are querying
341   * @param destination node we are trying to find.
342   */
343 void send_find_node(node_t node, unsigned int id, unsigned int destination)
344 {
345   char mailbox[MAILBOX_NAME_SIZE];
346   /* Gets the mailbox to send to */
347   get_node_mailbox(id, mailbox);
348   /* Build the task */
349   msg_task_t task = task_new_find_node(node->id, destination, node->mailbox, MSG_host_get_name(MSG_host_self()));
350   /* Send the task */
351   xbt_assert((task != NULL), "Trying to send a NULL task.");
352   MSG_task_dsend(task, mailbox, task_free_v);
353   XBT_VERB("Asking %s for its closest nodes", mailbox);
354 }
355
356 /**
357   * Sends to the best "kademlia_alpha" nodes in the "node_list" array a "FIND_NODE" request, to ask them for their best nodes
358   */
359 unsigned int send_find_node_to_best(node_t node, answer_t node_list)
360 {
361   unsigned int i = 0, j = 0;
362   unsigned int destination = node_list->destination_id;
363   node_contact_t node_to_query;
364   while (j < kademlia_alpha && i < node_list->size) {
365     /* We need to have at most "kademlia_alpha" requests each time, according to the protocol */
366     /* Gets the node we want to send the query to */
367     node_to_query = xbt_dynar_get_as(node_list->nodes, i, node_contact_t);
368     if (node_to_query->id != node->id) {        /* No need to query ourselves */
369       send_find_node(node, node_to_query->id, destination);
370       j++;
371     }
372     i++;
373   }
374   return i;
375 }
376
377 /** @brief Handles an incoming received task */
378 void handle_task(node_t node, msg_task_t task)
379 {
380   task_data_t data = MSG_task_get_data(task);
381   xbt_assert((data != NULL), "Received NULL data");
382   //Adding/updating the guy to our routing table
383   node_routing_table_update(node, data->sender_id);
384   switch (data->type) {
385   case TASK_FIND_NODE:
386     handle_find_node(node, data);
387     break;
388   case TASK_FIND_NODE_ANSWER:
389     XBT_DEBUG("Received a wrong answer for a FIND_NODE");
390     break;
391   case TASK_PING:
392     handle_ping(node, data);
393     break;
394   default:
395     break;
396   }
397   task_free(task);
398 }
399
400 /** @brief Handles the answer to an incoming "find_node" task */
401 void handle_find_node(node_t node, task_data_t data)
402 {
403   XBT_VERB("Received a FIND_NODE from %s (%s), he's trying to find %08x",
404            data->answer_to, data->issuer_host_name, data->destination_id);
405   //Building the answer to the request
406   answer_t answer = node_find_closest(node, data->destination_id);
407   //Building the task to send
408   msg_task_t task = task_new_find_node_answer(node->id, data->destination_id, answer, node->mailbox,
409                                               MSG_host_get_name(MSG_host_self()));
410   //Sending the task
411   MSG_task_dsend(task, data->answer_to, task_free_v);
412 }
413
414 /** @brief handles the answer to a ping */
415 void handle_ping(node_t node, task_data_t data)
416 {
417   XBT_VERB("Received a PING request from %s (%s)", data->answer_to, data->issuer_host_name);
418   //Building the answer to the request
419   msg_task_t task = task_new_ping_answer(node->id, data->answer_to, MSG_host_get_name(MSG_host_self()));
420
421   MSG_task_dsend(task, data->answer_to, task_free_v);
422 }
423
424 /** @brief Main function */
425 int main(int argc, char *argv[])
426 {
427   MSG_init(&argc, argv);
428
429   /* Check the arguments */
430   xbt_assert(argc > 2, "Usage: %s platform_file deployment_file\n\tExample: %s msg_platform.xml msg_deployment.xml\n",
431              argv[0], argv[0]);
432
433   const char *platform_file = argv[1];
434   const char *deployment_file = argv[2];
435
436   MSG_create_environment(platform_file);
437   MSG_function_register("node", node);
438   MSG_launch_application(deployment_file);
439
440   msg_error_t res = MSG_main();
441
442   XBT_INFO("Simulated time: %g", MSG_get_clock());
443
444   return res != MSG_OK;
445 }