Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
9129794454f450e38047dd57dcc580a37a80f6b7
[simgrid.git] / examples / deprecated / msg / dht-kademlia / dht-kademlia.c
1 /* Copyright (c) 2012-2020. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "dht-kademlia.h"
7 #include "node.h"
8 #include "task.h"
9
10 #include "simgrid/msg.h"
11
12 #include <stdio.h> /* snprintf */
13
14 /** @addtogroup MSG_examples
15   * <b>kademlia/kademlia.c: Kademlia protocol</b>
16   * Implements the Kademlia protocol, using 32 bits identifiers.
17   */
18 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia, "Messages specific for this msg example");
19
20 /* Main loop for the process */
21 static void main_loop(node_t node, double deadline)
22 {
23   double next_lookup_time = MSG_get_clock() + RANDOM_LOOKUP_INTERVAL;
24   XBT_VERB("Main loop start");
25   while (MSG_get_clock() < deadline) {
26
27     if (node->receive_comm == NULL) {
28       node->task_received = NULL;
29       node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
30     }
31     if (node->receive_comm) {
32       if (!MSG_comm_test(node->receive_comm)) {
33         /* We search for a pseudo random node */
34         if (MSG_get_clock() >= next_lookup_time) {
35           random_lookup(node);
36           next_lookup_time += RANDOM_LOOKUP_INTERVAL;
37         } else {
38           //Didn't get a task: sleep for a while...
39           MSG_process_sleep(1);
40         }
41       } else {
42         //There has been a transfer, we need to handle it !
43         msg_error_t status = MSG_comm_get_status(node->receive_comm);
44         MSG_comm_destroy(node->receive_comm);
45         node->receive_comm = NULL;
46
47         if (status == MSG_OK) {
48           xbt_assert((node->task_received != NULL), "We received an incorrect task");
49           handle_task(node, node->task_received);
50         } else {
51           xbt_assert((MSG_comm_get_task(node->receive_comm) == NULL), "Comm failed but received a task.");
52           XBT_DEBUG("Nevermind, the communication has failed.");
53         }
54       }
55     } else {
56       //Didn't get a comm: sleep.
57       MSG_process_sleep(1);
58     }
59   }
60   //Cleanup the receiving communication.
61   if (node->receive_comm != NULL) {
62     if (MSG_comm_test(node->receive_comm) && MSG_comm_get_status(node->receive_comm) == MSG_OK) {
63       task_free(MSG_comm_get_task(node->receive_comm));
64     }
65     MSG_comm_destroy(node->receive_comm);
66   }
67 }
68
69 /** @brief Node function
70   * @param my node ID
71   * @param the ID of the person I know in the system (or not)
72   * @param Time before I leave the system because I'm bored
73   */
74 static int node(int argc, char *argv[])
75 {
76   unsigned int join_sucess = 1;
77   double deadline;
78   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
79   /* Node initialization */
80   unsigned int id = strtoul(argv[1], NULL, 0);
81   node_t node = node_init(id);
82
83   if (argc == 4) {
84     XBT_INFO("Hi, I'm going to join the network with id %s", node->mailbox);
85     unsigned int id_known = strtoul(argv[2], NULL, 0);
86     join_sucess = join(node, id_known);
87     deadline = strtod(argv[3], NULL);
88   } else {
89     deadline = strtod(argv[2], NULL);
90     XBT_INFO("Hi, I'm going to create the network with id %s", node->mailbox);
91     node_routing_table_update(node, node->id);
92   }
93   if (join_sucess) {
94     XBT_VERB("Ok, I'm joining the network with id %s", node->mailbox);
95     //We start the main loop
96     main_loop(node, deadline);
97   } else {
98     XBT_INFO("I couldn't join the network :(");
99   }
100   XBT_DEBUG("I'm leaving the network");
101   XBT_INFO("%u/%u FIND_NODE have succeeded", node->find_node_success, node->find_node_success + node->find_node_failed);
102   node_free(node);
103
104   return 0;
105 }
106
107 /**
108   * @brief Tries to join the network
109   * @param node node data
110   * @param id_known id of the node I know in the network.
111   */
112 unsigned int join(node_t node, unsigned int id_known)
113 {
114   const s_answer_t* node_list;
115   msg_error_t status;
116   unsigned int trial = 0;
117   unsigned int i;
118   unsigned int answer_got = 0;
119
120   /* Add the guy we know to our routing table and ourselves. */
121   node_routing_table_update(node, node->id);
122   node_routing_table_update(node, id_known);
123
124   /* First step: Send a "FIND_NODE" request to the node we know */
125   send_find_node(node, id_known, node->id);
126   do {
127     if (node->receive_comm == NULL) {
128       node->task_received = NULL;
129       node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
130     }
131     if (node->receive_comm) {
132       if (MSG_comm_test(node->receive_comm)) {
133         status = MSG_comm_get_status(node->receive_comm);
134         MSG_comm_destroy(node->receive_comm);
135         node->receive_comm = NULL;
136         if (status == MSG_OK) {
137           XBT_DEBUG("Received an answer from the node I know.");
138           answer_got = 1;
139           //retrieve the node list and ping them.
140           const s_task_data_t* data = MSG_task_get_data(node->task_received);
141           xbt_assert((data != NULL), "Null data received");
142           if (data->type == TASK_FIND_NODE_ANSWER) {
143             node_contact_t contact;
144             node_list = data->answer;
145             xbt_dynar_foreach(node_list->nodes, i, contact) {
146               node_routing_table_update(node, contact->id);
147             }
148             task_free(node->task_received);
149           } else {
150             handle_task(node, node->task_received);
151           }
152         } else {
153           trial++;
154         }
155       } else {
156         MSG_process_sleep(1);
157       }
158     } else {
159       MSG_process_sleep(1);
160     }
161   } while (answer_got == 0 && trial < MAX_JOIN_TRIALS);
162   /* Second step: Send a FIND_NODE to a a random node in buckets */
163   unsigned int bucket_id = routing_table_find_bucket(node->table, id_known)->id;
164   for (i = 0; ((bucket_id > i) || (bucket_id + i) <= IDENTIFIER_SIZE) && i < JOIN_BUCKETS_QUERIES; i++) {
165     if (bucket_id  > i) {
166       unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id - i);
167       find_node(node, id_in_bucket, 0);
168     }
169     if (bucket_id + i <= IDENTIFIER_SIZE) {
170       unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id + i);
171       find_node(node, id_in_bucket, 0);
172     }
173   }
174   return answer_got;
175 }
176
177 /** @brief Send a request to find a node in the node routing table.
178   * @param node our node data
179   * @param id_to_find the id of the node we are trying to find
180   */
181 unsigned int find_node(node_t node, unsigned int id_to_find, unsigned int count_in_stats)
182 {
183   unsigned int i = 0;
184   unsigned int queries;
185   unsigned int answers;
186   unsigned int destination_found = 0;
187   unsigned int nodes_added = 0;
188   double global_timeout          = MSG_get_clock() + FIND_NODE_GLOBAL_TIMEOUT;
189   unsigned int steps = 0;
190
191   /* First we build a list of who we already know */
192   answer_t node_list = node_find_closest(node, id_to_find);
193   xbt_assert((node_list != NULL), "node_list incorrect");
194
195   XBT_DEBUG("Doing a FIND_NODE on %08x", id_to_find);
196
197   msg_error_t status;
198
199   /* Ask the nodes on our list if they   have information about the node we are trying to find */
200   do {
201     answers = 0;
202     queries = send_find_node_to_best(node, node_list);
203     nodes_added = 0;
204     double timeout = MSG_get_clock() + FIND_NODE_TIMEOUT;
205     steps++;
206     double time_beginreceive = MSG_get_clock();
207     do {
208       if (node->receive_comm == NULL) {
209         node->task_received = NULL;
210         node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
211       }
212       if (node->receive_comm) {
213         if (MSG_comm_test(node->receive_comm)) {
214           status = MSG_comm_get_status(node->receive_comm);
215           MSG_comm_destroy(node->receive_comm);
216           node->receive_comm = NULL;
217           if (status == MSG_OK) {
218             xbt_assert((node->task_received != NULL), "Invalid task received");
219             //Figure out if we received an answer or something else
220             const s_task_data_t* data = MSG_task_get_data(node->task_received);
221             xbt_assert((data != NULL), "No data in the task");
222
223             //Check if what we have received is what we are looking for.
224             if (data->type == TASK_FIND_NODE_ANSWER && data->answer->destination_id == id_to_find) {
225               //Handle the answer
226               node_routing_table_update(node, data->sender_id);
227               node_contact_t contact;
228               xbt_dynar_foreach(node_list->nodes, i, contact) {
229                 node_routing_table_update(node, contact->id);
230               }
231               answers++;
232
233               nodes_added = answer_merge(node_list, data->answer);
234               XBT_DEBUG("Received an answer from %s (%s) with %lu nodes on it", data->answer_to, data->issuer_host_name,
235                         xbt_dynar_length(data->answer->nodes));
236
237               task_free(node->task_received);
238             } else {
239               handle_task(node, node->task_received);
240               //Update the timeout if we didn't have our answer
241               timeout += MSG_get_clock() - time_beginreceive;
242               time_beginreceive = MSG_get_clock();
243             }
244           }
245         } else {
246           MSG_process_sleep(1);
247         }
248       } else {
249         MSG_process_sleep(1);
250       }
251     } while (MSG_get_clock() < timeout && answers < queries);
252     destination_found = answer_destination_found(node_list);
253   } while (!destination_found && (nodes_added > 0 || answers == 0) && MSG_get_clock() < global_timeout
254             && steps < MAX_STEPS);
255   if (destination_found) {
256     if (count_in_stats)
257       node->find_node_success++;
258     if (queries > 4)
259       XBT_VERB("FIND_NODE on %08x success in %u steps", id_to_find, steps);
260     node_routing_table_update(node, id_to_find);
261   } else {
262     if (count_in_stats) {
263       node->find_node_failed++;
264       XBT_VERB("%08x not found in %u steps", id_to_find, steps);
265     }
266   }
267   answer_free(node_list);
268   return destination_found;
269 }
270
271 /** @brief Does a pseudo-random lookup for someone in the system
272   * @param node caller node data
273   */
274 void random_lookup(node_t node)
275 {
276   unsigned int id_to_look = RANDOM_LOOKUP_NODE; //Totally random.
277   /* TODO: Use some pseudorandom generator. */
278   XBT_DEBUG("I'm doing a random lookup");
279   find_node(node, id_to_look, 1);
280 }
281
282 /** @brief Send a "FIND_NODE" to a node
283   * @param node sender node data
284   * @param id node we are querying
285   * @param destination node we are trying to find.
286   */
287 void send_find_node(node_t node, unsigned int id, unsigned int destination)
288 {
289   char mailbox[MAILBOX_NAME_SIZE];
290   /* Gets the mailbox to send to */
291   get_node_mailbox(id, mailbox);
292   /* Build the task */
293   msg_task_t task = task_new_find_node(node->id, destination, node->mailbox, MSG_host_get_name(MSG_host_self()));
294   /* Send the task */
295   xbt_assert((task != NULL), "Trying to send a NULL task.");
296   MSG_task_dsend(task, mailbox, task_free_v);
297   XBT_VERB("Asking %s for its closest nodes", mailbox);
298 }
299
300 /**
301   * Sends to the best "KADEMLIA_ALPHA" nodes in the "node_list" array a "FIND_NODE" request, to ask them for their best
302  * nodes
303   */
304 unsigned int send_find_node_to_best(node_t node, const_answer_t node_list)
305 {
306   unsigned int i = 0;
307   unsigned int j = 0;
308   unsigned int destination = node_list->destination_id;
309   while (j < KADEMLIA_ALPHA && i < node_list->size) {
310     /* We need to have at most "KADEMLIA_ALPHA" requests each time, according to the protocol */
311     /* Gets the node we want to send the query to */
312     const s_node_contact_t* node_to_query = xbt_dynar_get_as(node_list->nodes, i, node_contact_t);
313     if (node_to_query->id != node->id) {        /* No need to query ourselves */
314       send_find_node(node, node_to_query->id, destination);
315       j++;
316     }
317     i++;
318   }
319   return i;
320 }
321
322 /** @brief Handles an incoming received task */
323 void handle_task(node_t node, msg_task_t task)
324 {
325   const_task_data_t data = MSG_task_get_data(task);
326   xbt_assert((data != NULL), "Received NULL data");
327   //Adding/updating the guy to our routing table
328   node_routing_table_update(node, data->sender_id);
329   switch (data->type) {
330   case TASK_FIND_NODE:
331     handle_find_node(node, data);
332     break;
333   case TASK_FIND_NODE_ANSWER:
334     XBT_DEBUG("Received a wrong answer for a FIND_NODE");
335     break;
336   default:
337     break;
338   }
339   task_free(task);
340 }
341
342 /** @brief Handles the answer to an incoming "find_node" task */
343 void handle_find_node(node_t node, const_task_data_t data)
344 {
345   XBT_VERB("Received a FIND_NODE from %s (%s), he's trying to find %08x",
346            data->answer_to, data->issuer_host_name, data->destination_id);
347   //Building the answer to the request
348   answer_t answer = node_find_closest(node, data->destination_id);
349   //Building the task to send
350   msg_task_t task = task_new_find_node_answer(node->id, data->destination_id, answer, node->mailbox,
351                                               MSG_host_get_name(MSG_host_self()));
352   //Sending the task
353   MSG_task_dsend(task, data->answer_to, task_free_v);
354 }
355
356 /** @brief Main function */
357 int main(int argc, char *argv[])
358 {
359   MSG_init(&argc, argv);
360
361   /* Check the arguments */
362   xbt_assert(argc > 2, "Usage: %s platform_file deployment_file\n\tExample: %s msg_platform.xml msg_deployment.xml\n",
363              argv[0], argv[0]);
364
365   const char *platform_file = argv[1];
366   const char *deployment_file = argv[2];
367
368   MSG_create_environment(platform_file);
369   MSG_function_register("node", node);
370   MSG_launch_application(deployment_file);
371
372   msg_error_t res = MSG_main();
373
374   XBT_INFO("Simulated time: %g", MSG_get_clock());
375
376   return res != MSG_OK;
377 }