Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
yet another cleaning pass
[simgrid.git] / examples / msg / kademlia / kademlia.c
1 /* Copyright (c) 2012, 2014-2016. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "kademlia.h"
8 #include "node.h"
9 #include "task.h"
10
11 #include "simgrid/msg.h"
12 /** @addtogroup MSG_examples
13   * <b>kademlia/kademlia.c: Kademlia protocol</b>
14   * Implements the Kademlia protocol, using 32 bits identifiers.
15   */
16 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia, "Messages specific for this msg example");
17
18 extern long unsigned int smx_total_comms;
19
20 /* Main loop for the process */
21 static void main_loop(node_t node, double deadline)
22 {
23   double next_lookup_time = MSG_get_clock() + random_lookup_interval;
24   XBT_VERB("Main loop start");
25   while (MSG_get_clock() < deadline) {
26
27     if (node->receive_comm == NULL) {
28       node->task_received = NULL;
29       node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
30     }
31     if (node->receive_comm) {
32       if (!MSG_comm_test(node->receive_comm)) {
33         /* We search for a pseudo random node */
34         if (MSG_get_clock() >= next_lookup_time) {
35           random_lookup(node);
36           next_lookup_time += random_lookup_interval;
37         } else {
38           //Didn't get a task: sleep for a while...
39           MSG_process_sleep(1);
40         }
41       } else {
42         //There has been a transfer, we need to handle it !
43         msg_error_t status = MSG_comm_get_status(node->receive_comm);
44         MSG_comm_destroy(node->receive_comm);
45         node->receive_comm = NULL;
46
47         if (status == MSG_OK) {
48           xbt_assert((node->task_received != NULL), "We received an incorrect task");
49           handle_task(node, node->task_received);
50         } else {
51           xbt_assert((MSG_comm_get_task(node->receive_comm) == NULL), "Comm failed but received a task.");
52           XBT_DEBUG("Nevermind, the communication has failed.");
53         }
54       }
55     } else {
56       //Didn't get a comm: sleep.
57       MSG_process_sleep(1);
58     }
59   }
60   //Cleanup the receiving communication.
61   if (node->receive_comm != NULL) {
62     if (MSG_comm_test(node->receive_comm) && MSG_comm_get_status(node->receive_comm) == MSG_OK) {
63       task_free(MSG_comm_get_task(node->receive_comm));
64     }
65     MSG_comm_destroy(node->receive_comm);
66   }
67 }
68
69 /** @brief Node function
70   * @param my node ID
71   * @param the ID of the person I know in the system (or not)
72   * @param Time before I leave the system because I'm bored
73   */
74 static int node(int argc, char *argv[])
75 {
76   unsigned int join_sucess = 1;
77   double deadline;
78   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
79   /* Node initialization */
80   unsigned int id = strtoul(argv[1], NULL, 0);
81   node_t node = node_init(id);
82
83   if (argc == 4) {
84     XBT_INFO("Hi, I'm going to join the network with id %s", node->mailbox);
85     unsigned int id_known = strtoul(argv[2], NULL, 0);
86     join_sucess = join(node, id_known);
87     deadline = strtod(argv[3], NULL);
88   } else {
89     deadline = strtod(argv[2], NULL);
90     XBT_INFO("Hi, I'm going to create the network with id %s", node->mailbox);
91     node_routing_table_update(node, node->id);
92   }
93   if (join_sucess) {
94     XBT_VERB("Ok, I'm joining the network with id %s", node->mailbox);
95     //We start the main loop
96     main_loop(node, deadline);
97   } else {
98     XBT_INFO("I couldn't join the network :(");
99   }
100   XBT_DEBUG("I'm leaving the network");
101   XBT_INFO("%d/%d FIND_NODE have succeeded", node->find_node_success, node->find_node_success + node->find_node_failed);
102   node_free(node);
103
104   return 0;
105 }
106
107 /**
108   * @brief Tries to join the network
109   * @param node node data
110   * @param id_known id of the node I know in the network.
111   */
112 unsigned int join(node_t node, unsigned int id_known)
113 {
114   answer_t node_list;
115   msg_error_t status;
116   unsigned int trial = 0;
117   unsigned int i, answer_got = 0;
118
119   /* Add the guy we know to our routing table and ourselves. */
120   node_routing_table_update(node, node->id);
121   node_routing_table_update(node, id_known);
122
123   /* First step: Send a "FIND_NODE" request to the node we know */
124   send_find_node(node, id_known, node->id);
125   do {
126     if (node->receive_comm == NULL) {
127       node->task_received = NULL;
128       node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
129     }
130     if (node->receive_comm) {
131       if (MSG_comm_test(node->receive_comm)) {
132         status = MSG_comm_get_status(node->receive_comm);
133         MSG_comm_destroy(node->receive_comm);
134         node->receive_comm = NULL;
135         if (status == MSG_OK) {
136           XBT_DEBUG("Received an answer from the node I know.");
137           answer_got = 1;
138           //retrieve the node list and ping them.
139           task_data_t data = MSG_task_get_data(node->task_received);
140           xbt_assert((data != NULL), "Null data received");
141           if (data->type == TASK_FIND_NODE_ANSWER) {
142             node_contact_t contact;
143             node_list = data->answer;
144             xbt_dynar_foreach(node_list->nodes, i, contact) {
145               node_routing_table_update(node, contact->id);
146             }
147             task_free(node->task_received);
148           } else {
149             handle_task(node, node->task_received);
150           }
151         } else {
152           trial++;
153         }
154       } else {
155         MSG_process_sleep(1);
156       }
157     } else {
158       MSG_process_sleep(1);
159     }
160   } while (answer_got == 0 && trial < max_join_trials);
161   /* Second step: Send a FIND_NODE to a a random node in buckets */
162   unsigned int bucket_id = routing_table_find_bucket(node->table, id_known)->id;
163   for (i = 0; ((bucket_id - i) > 0 || (bucket_id + i) <= identifier_size) && i < JOIN_BUCKETS_QUERIES; i++) {
164     if (bucket_id - i > 0) {
165       unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id - i);
166       find_node(node, id_in_bucket, 0);
167     }
168     if (bucket_id + i <= identifier_size) {
169       unsigned int id_in_bucket = get_id_in_prefix(node->id, bucket_id + i);
170       find_node(node, id_in_bucket, 0);
171     }
172   }
173   return answer_got;
174 }
175
176 /** @brief Send a request to find a node in the node routing table.
177   * @param node our node data
178   * @param id_to_find the id of the node we are trying to find
179   */
180 unsigned int find_node(node_t node, unsigned int id_to_find, unsigned int count_in_stats)
181 {
182   unsigned int i = 0;
183   unsigned int queries, answers;
184   unsigned int destination_found = 0;
185   unsigned int nodes_added = 0;
186   double time_beginreceive;
187   double timeout, global_timeout = MSG_get_clock() + find_node_global_timeout;
188   unsigned int steps = 0;
189
190   xbt_assert((id_to_find >= 0), "Id supplied incorrect");
191
192   /* First we build a list of who we already know */
193   answer_t node_list = node_find_closest(node, id_to_find);
194   xbt_assert((node_list != NULL), "node_list incorrect");
195
196   XBT_DEBUG("Doing a FIND_NODE on %08x", id_to_find);
197
198   msg_error_t status;
199
200   /* Ask the nodes on our list if they   have information about the node we are trying to find */
201   do {
202     answers = 0;
203     queries = send_find_node_to_best(node, node_list);
204     nodes_added = 0;
205     timeout = MSG_get_clock() + find_node_timeout;
206     steps++;
207     time_beginreceive = MSG_get_clock();
208     do {
209       if (node->receive_comm == NULL) {
210         node->task_received = NULL;
211         node->receive_comm = MSG_task_irecv(&node->task_received, node->mailbox);
212       }
213       if (node->receive_comm) {
214         if (MSG_comm_test(node->receive_comm)) {
215           status = MSG_comm_get_status(node->receive_comm);
216           MSG_comm_destroy(node->receive_comm);
217           node->receive_comm = NULL;
218           if (status == MSG_OK) {
219             xbt_assert((node->task_received != NULL), "Invalid task received");
220             //Figure out if we received an answer or something else
221             task_data_t data = MSG_task_get_data(node->task_received);
222             xbt_assert((data != NULL), "No data in the task");
223
224             //Check if what we have received is what we are looking for.
225             if (data->type == TASK_FIND_NODE_ANSWER && data->answer->destination_id == id_to_find) {
226               //Handle the answer
227               node_routing_table_update(node, data->sender_id);
228               node_contact_t contact;
229               xbt_dynar_foreach(node_list->nodes, i, contact) {
230                 node_routing_table_update(node, contact->id);
231               }
232               answers++;
233
234               nodes_added = answer_merge(node_list, data->answer);
235               XBT_DEBUG("Received an answer from %s (%s) with %ld nodes on it",
236                         data->answer_to, data->issuer_host_name, xbt_dynar_length(data->answer->nodes));
237
238               task_free(node->task_received);
239             } else {
240               handle_task(node, node->task_received);
241               //Update the timeout if we didn't have our answer
242               timeout += MSG_get_clock() - time_beginreceive;
243               time_beginreceive = MSG_get_clock();
244             }
245           }
246         } else {
247           MSG_process_sleep(1);
248         }
249       } else {
250         MSG_process_sleep(1);
251       }
252     } while (MSG_get_clock() < timeout && answers < queries);
253     destination_found = answer_destination_found(node_list);
254   } while (!destination_found && (nodes_added > 0 || answers == 0) && MSG_get_clock() < global_timeout
255             && steps < MAX_STEPS);
256   if (destination_found) {
257     if (count_in_stats)
258       node->find_node_success++;
259     if (queries > 4)
260       XBT_VERB("FIND_NODE on %08x success in %d steps", id_to_find, steps);
261     node_routing_table_update(node, id_to_find);
262   } else {
263     if (count_in_stats) {
264       node->find_node_failed++;
265       XBT_VERB("%08x not found in %d steps", id_to_find, steps);
266     }
267   }
268   answer_free(node_list);
269   return destination_found;
270 }
271
272 /** @brief Pings a node in the system to see if it is online.
273   * @param node Our node data
274   * @param id_to_ping the id of a node we want to see if it is online.
275   * @return if the ping succeded or not.
276   */
277 unsigned int ping(node_t node, unsigned int id_to_ping)
278 {
279   char mailbox[MAILBOX_NAME_SIZE + 1];
280   sprintf(mailbox, "%0*x", MAILBOX_NAME_SIZE, id_to_ping);
281
282   unsigned int destination_found = 0;
283   double timeout = MSG_get_clock() + ping_timeout;
284
285   msg_task_t ping_task = task_new_ping(node->id, node->mailbox, MSG_host_get_name(MSG_host_self()));
286   msg_task_t task_received = NULL;
287
288   XBT_VERB("PING %08x", id_to_ping);
289
290   //Check that we aren't trying to ping ourselves
291   if (id_to_ping == node->id) {
292     return 1;
293   }
294
295   /* Sending the ping task */
296   MSG_task_dsend(ping_task, mailbox, task_free_v);
297   do {
298     task_received = NULL;
299     msg_error_t status =
300         MSG_task_receive_with_timeout(&task_received, node->mailbox, ping_timeout);
301     if (status == MSG_OK) {
302       xbt_assert((task_received != NULL), "Invalid task received");
303       //Checking if it's what we are waiting for or not.
304       task_data_t data = MSG_task_get_data(task_received);
305       xbt_assert((data != NULL), "didn't receive any data...");
306       if (data->type == TASK_PING_ANSWER && id_to_ping == data->sender_id) {
307         XBT_VERB("Ping to %s succeeded", mailbox);
308         node_routing_table_update(node, data->sender_id);
309         destination_found = 1;
310         task_free(task_received);
311       } else {
312         //If it's not our answer, we answer the query anyway.
313         handle_task(node, task_received);
314       }
315     }
316   } while (destination_found == 0 && MSG_get_clock() < timeout);
317
318   if (MSG_get_clock() >= timeout) {
319     XBT_DEBUG("Ping to %s has timeout.", mailbox);
320     return 0;
321   }
322   if (destination_found == -1) {
323     XBT_DEBUG("It seems that %s is offline...", mailbox);
324     return 0;
325   }
326   return 1;
327 }
328
329 /** @brief Does a pseudo-random lookup for someone in the system
330   * @param node caller node data
331   */
332 void random_lookup(node_t node)
333 {
334   unsigned int id_to_look = RANDOM_LOOKUP_NODE; //Totally random.
335   /* TODO: Use some pseudorandom generator like RngStream. */
336   XBT_DEBUG("I'm doing a random lookup");
337   find_node(node, id_to_look, 1);
338 }
339
340 /** @brief Send a "FIND_NODE" to a node
341   * @param node sender node data
342   * @param id node we are querying
343   * @param destination node we are trying to find.
344   */
345 void send_find_node(node_t node, unsigned int id, unsigned int destination)
346 {
347   char mailbox[MAILBOX_NAME_SIZE + 1];
348   /* Gets the mailbox to send to */
349   get_node_mailbox(id, mailbox);
350   /* Build the task */
351   msg_task_t task = task_new_find_node(node->id, destination, node->mailbox, MSG_host_get_name(MSG_host_self()));
352   /* Send the task */
353   xbt_assert((task != NULL), "Trying to send a NULL task.");
354   MSG_task_dsend(task, mailbox, task_free_v);
355   XBT_VERB("Asking %s for its closest nodes", mailbox);
356 }
357
358 /**
359   * Sends to the best "kademlia_alpha" nodes in the "node_list" array a "FIND_NODE" request, to ask them for their best nodes
360   */
361 unsigned int send_find_node_to_best(node_t node, answer_t node_list)
362 {
363   unsigned int i = 0, j = 0;
364   unsigned int destination = node_list->destination_id;
365   node_contact_t node_to_query;
366   while (j < kademlia_alpha && i < node_list->size) {
367     /* We need to have at most "kademlia_alpha" requests each time, according to the protocol */
368     /* Gets the node we want to send the query to */
369     node_to_query = xbt_dynar_get_as(node_list->nodes, i, node_contact_t);
370     if (node_to_query->id != node->id) {        /* No need to query ourselves */
371       send_find_node(node, node_to_query->id, destination);
372       j++;
373     }
374     i++;
375   }
376   return i;
377 }
378
379 /** @brief Handles an incoming received task */
380 void handle_task(node_t node, msg_task_t task)
381 {
382   task_data_t data = MSG_task_get_data(task);
383   xbt_assert((data != NULL), "Received NULL data");
384   //Adding/updating the guy to our routing table
385   node_routing_table_update(node, data->sender_id);
386   switch (data->type) {
387   case TASK_FIND_NODE:
388     handle_find_node(node, data);
389     break;
390   case TASK_FIND_NODE_ANSWER:
391     XBT_DEBUG("Received a wrong answer for a FIND_NODE");
392     break;
393   case TASK_PING:
394     handle_ping(node, data);
395     break;
396   default:
397     break;
398   }
399   task_free(task);
400 }
401
402 /** @brief Handles the answer to an incoming "find_node" task */
403 void handle_find_node(node_t node, task_data_t data)
404 {
405   XBT_VERB("Received a FIND_NODE from %s (%s), he's trying to find %08x",
406            data->answer_to, data->issuer_host_name, data->destination_id);
407   //Building the answer to the request
408   answer_t answer = node_find_closest(node, data->destination_id);
409   //Building the task to send
410   msg_task_t task = task_new_find_node_answer(node->id, data->destination_id, answer, node->mailbox,
411                                               MSG_host_get_name(MSG_host_self()));
412   //Sending the task
413   MSG_task_dsend(task, data->answer_to, task_free_v);
414 }
415
416 /** @brief handles the answer to a ping */
417 void handle_ping(node_t node, task_data_t data)
418 {
419   XBT_VERB("Received a PING request from %s (%s)", data->answer_to, data->issuer_host_name);
420   //Building the answer to the request
421   msg_task_t task = task_new_ping_answer(node->id, data->answer_to, MSG_host_get_name(MSG_host_self()));
422
423   MSG_task_dsend(task, data->answer_to, task_free_v);
424 }
425
426 /** @brief Main function */
427 int main(int argc, char *argv[])
428 {
429   MSG_init(&argc, argv);
430
431   /* Check the arguments */
432   xbt_assert(argc > 2, "Usage: %s platform_file deployment_file\n\tExample: %s msg_platform.xml msg_deployment.xml\n",
433              argv[0], argv[0]);
434
435   const char *platform_file = argv[1];
436   const char *deployment_file = argv[2];
437
438   MSG_create_environment(platform_file);
439   MSG_function_register("node", node);
440   MSG_launch_application(deployment_file);
441
442   msg_error_t res = MSG_main();
443
444   XBT_CRITICAL("Messages created: %ld", smx_total_comms);
445   XBT_INFO("Simulated time: %g", MSG_get_clock());
446
447   return res != MSG_OK;
448 }