Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
9b10609bd2c962adef3adff2d23d9d808ecf39ba
[simgrid.git] / examples / msg / kademlia / kademlia.c
1 /* Copyright (c) 2012. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "kademlia.h"
7 #include "node.h"
8 #include "task.h"
9
10 #include "msg/msg.h"
11 #include "xbt/log.h"
12 #include "xbt/asserts.h"
13 /** @addtogroup MSG_examples
14   * <b>kademlia/kademlia.c: Kademlia protocol</b>
15   * Implements the Kademlia protocol, using 32 bits identifiers.
16   */
17  XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kademlia,
18                              "Messages specific for this msg example");
19
20 extern long unsigned int smx_total_comms;
21
22 /** 
23   * \brief Node function
24   * Arguments :
25   * - my node ID
26   * - the ID of the person I know in the system (or not)
27   * - Time before I leave the system because I'm bored
28   */
29 int node(int argc, char *argv[]) {
30   unsigned int join_sucess = 1, deadline;
31   xbt_assert(argc == 3 || argc == 4, "Wrong number of arguments");
32   /* Node initialization */
33   unsigned int id = atoi(argv[1]);;
34   node_t node = node_init(id);
35   
36   if (argc == 4) {
37     XBT_INFO("Hi, I'm going to join the network !");        
38     unsigned int id_known = (unsigned int)atoi(argv[2]);
39     join_sucess = join(node,id_known);
40     deadline = atoi(argv[3]);
41   }
42   else {
43     deadline = atoi(argv[2]);
44     XBT_INFO("Hi, I'm going to create the network with id %s !",node->mailbox);
45     node_routing_table_update(node,node->id);
46   }
47   if (join_sucess) {
48     XBT_VERB("Ok, I'm joining the network with id %s",node->mailbox);
49     //We start the main loop
50     main_loop(node,deadline);
51   }
52   else {
53     XBT_INFO("I couldn't join the network :(");
54   } 
55   XBT_DEBUG("I'm leaving the network");
56   XBT_INFO("%d/%d FIND_NODE have succeeded",node->find_node_success,node->find_node_success + node->find_node_failed);
57   node_free(node);
58
59   return 0;
60 }
61 /**
62   * Main loop for the process
63   */
64 void main_loop(node_t node, unsigned int deadline) {
65   double next_lookup_time = MSG_get_clock() + random_lookup_interval;
66   XBT_VERB("Main loop start");
67   while (MSG_get_clock() < deadline) {
68
69     if (node->receive_comm == NULL) {
70       node->task_received = NULL;
71       node->receive_comm = MSG_task_irecv(&node->task_received,node->mailbox);
72   }
73     if (node->receive_comm) {
74       if (!MSG_comm_test(node->receive_comm)) {
75         /* We search for a pseudo random node */
76         if (MSG_get_clock() >= next_lookup_time) {
77           random_lookup(node);
78           next_lookup_time += random_lookup_interval;
79         }
80         else {
81           //Didn't get a task: sleep for a while...
82           MSG_process_sleep(1);
83         }      
84       }
85       else {
86         //There has been a transfer, we need to handle it !
87         msg_error_t status = MSG_comm_get_status(node->receive_comm);
88         MSG_comm_destroy(node->receive_comm);
89         node->receive_comm = NULL;
90
91         if (status == MSG_OK) {
92           xbt_assert( (node->task_received != NULL), "We received an incorrect task");
93           handle_task(node,node->task_received);
94         }
95         else {
96           xbt_assert((MSG_comm_get_task(node->receive_comm) == NULL),"Comm failed but received a task.");
97           XBT_DEBUG("Nevermind, the communication has failed.");
98         }
99       }
100     }
101     else {
102       //Didn't get a comm: sleep.
103       MSG_process_sleep(1);
104     }
105   }
106   //Cleanup the receiving communication.
107   if (node->receive_comm != NULL ) {
108     if (MSG_comm_test(node->receive_comm) && MSG_comm_get_status(node->receive_comm) == MSG_OK) {
109       task_free(MSG_comm_get_task(node->receive_comm));
110     }
111     MSG_comm_destroy(node->receive_comm);
112   }
113 }
114 /**
115   * Tries to join the network
116   * @param node node data
117   * @param id_known id of the node I know in the network.
118   */
119 unsigned int join(node_t node, unsigned int id_known) {
120   answer_t node_list;
121   msg_error_t status;
122   unsigned int trial = 0;
123   unsigned int i, answer_got = 0;
124
125   /* Add the guy we know to our routing table and ourselves. */
126   node_routing_table_update(node,node->id);
127   node_routing_table_update(node,id_known);
128
129   /* First step: Send a "FIND_NODE" request to the node we know */
130   send_find_node(node,id_known,node->id);
131   do {
132     if (node->receive_comm == NULL) {
133       node->task_received = NULL;
134       node->receive_comm = MSG_task_irecv(&node->task_received,node->mailbox);
135     }
136     if (node->receive_comm) {
137       if (MSG_comm_test(node->receive_comm)) {
138         status = MSG_comm_get_status(node->receive_comm);
139         MSG_comm_destroy(node->receive_comm);
140         node->receive_comm = NULL;
141         if (status == MSG_OK) {
142           XBT_DEBUG("Received an answer from the node I know.");
143           answer_got = 1;
144           //retrieve the node list and ping them.
145           task_data_t data = MSG_task_get_data(node->task_received);
146           xbt_assert( (data != NULL), "Null data received");
147           if (data->type == TASK_FIND_NODE_ANSWER) {
148             node_contact_t contact;
149             node_list = data->answer;
150             xbt_dynar_foreach(node_list->nodes,i, contact) {
151               node_routing_table_update(node,contact->id);
152               //ping(node,contact->id);
153             }
154             task_free(node->task_received);
155           }
156           else {
157             handle_task(node,node->task_received);
158           }
159         }
160         else {
161           trial++;
162         }
163       }
164       else {
165         MSG_process_sleep(1);
166       }
167     }
168     else {
169       MSG_process_sleep(1);
170     }
171   } while (answer_got == 0 && trial < max_join_trials);
172   /* Second step: Send a FIND_NODE to a a random node in buckets */
173   unsigned int bucket_id = routing_table_find_bucket(node->table,id_known)->id;
174   for (i = 0; ((bucket_id -i) > 0 || (bucket_id + i) <= identifier_size) && i < JOIN_BUCKETS_QUERIES; i++) {
175     if (bucket_id - i > 0) {
176       unsigned int id_in_bucket = get_id_in_prefix(node->id,bucket_id - i);
177       find_node(node,id_in_bucket,0);
178     }
179     if (bucket_id + i <= identifier_size) {
180       unsigned int id_in_bucket = get_id_in_prefix(node->id,bucket_id + i);
181       find_node(node,id_in_bucket,0);
182     }
183   }
184   return answer_got;
185 }
186 /**
187   * Send a request to find a node in the node routing table.
188   * @brief node our node data
189   * @brief id_to_find the id of the node we are trying to find
190   */
191 unsigned int find_node(node_t node, unsigned int id_to_find, unsigned int count_in_stats) {
192
193   unsigned int i = 0;
194   unsigned int queries, answers;
195   unsigned int destination_found = 0;
196   unsigned int nodes_added = 0;
197   double time_beginreceive;
198   double timeout, global_timeout = MSG_get_clock() + find_node_global_timeout;
199   unsigned int steps = 0;
200
201   xbt_assert( (id_to_find >= 0), "Id supplied incorrect");
202   
203   /* First we build a list of who we already know */
204   answer_t node_list = node_find_closest(node,id_to_find);
205   xbt_assert((node_list != NULL),"node_list incorrect");
206   
207   XBT_DEBUG("Doing a FIND_NODE on %d", id_to_find);
208    
209   msg_error_t status;
210
211   /* Ask the nodes on our list if they   have information about
212    * the node we are trying to find */
213
214   do {
215     answers = 0;
216     queries = send_find_node_to_best(node,node_list);
217     nodes_added = 0;
218     timeout = MSG_get_clock() + find_node_timeout;
219     steps++;
220     time_beginreceive = MSG_get_clock();
221     do {
222       if (node->receive_comm == NULL) {
223         node->task_received = NULL;
224         node->receive_comm = MSG_task_irecv(&node->task_received,node->mailbox);
225       }
226       if (node->receive_comm) {
227         if (MSG_comm_test(node->receive_comm)) {
228           status = MSG_comm_get_status(node->receive_comm);
229           MSG_comm_destroy(node->receive_comm);
230           node->receive_comm = NULL;
231           if (status == MSG_OK) {
232             xbt_assert( (node->task_received != NULL), "Invalid task received");
233             //Figure out if we received an answer or something else
234             task_data_t data = MSG_task_get_data(node->task_received);
235             xbt_assert( (data != NULL), "No data in the task");
236
237             //Check if what we have received is what we are looking for.
238             if (data->type == TASK_FIND_NODE_ANSWER && data->answer->destination_id == id_to_find) {
239               //Handle the answer
240               node_routing_table_update(node,data->sender_id);
241               node_contact_t contact;
242               xbt_dynar_foreach(node_list->nodes,i, contact) {
243                   node_routing_table_update(node,contact->id);
244               }
245               answers++;
246
247               nodes_added = answer_merge(node_list,data->answer);
248               XBT_DEBUG("Received an answer from %s (%s) with %ld nodes on it",data->answer_to,data->issuer_host_name,xbt_dynar_length(data->answer->nodes));
249
250               task_free(node->task_received);
251             }
252             else {
253               handle_task(node,node->task_received);
254               //Update the timeout if we didn't have our answer
255               timeout += MSG_get_clock() - time_beginreceive;
256               time_beginreceive = MSG_get_clock();
257             }
258           }
259         }
260         else {
261             MSG_process_sleep(1);
262         }
263       }
264       else {
265         MSG_process_sleep(1);
266       }
267     } while (MSG_get_clock() < timeout && answers < queries) ;
268     destination_found = answer_destination_found(node_list);
269   } while (!destination_found && (nodes_added > 0 || answers == 0) && MSG_get_clock() < global_timeout && steps < MAX_STEPS);
270   if (destination_found) {
271     if (count_in_stats)
272       node->find_node_success++;
273       if (queries > 4)
274         XBT_VERB("FIND_NODE on %d success in %d steps",id_to_find,steps);
275     node_routing_table_update(node,id_to_find);
276   }
277   else {
278     if (count_in_stats) {
279       node->find_node_failed++;
280       XBT_VERB("%d not found in %d steps",id_to_find,steps);
281     }
282   }
283   answer_free(node_list);
284   return destination_found;
285 }
286 /**
287   * Pings a node in the system to see if it is online. 
288   * @param node Our node data
289   * @param id_to_ping the id of a node we want to see if it is online.
290   * @return if the ping succeded or not.
291   */
292 unsigned int ping(node_t node, unsigned int id_to_ping) {
293   char mailbox[MAILBOX_NAME_SIZE];
294   sprintf(mailbox,"%d",id_to_ping);
295
296   unsigned int destination_found = 0;
297   unsigned int timeout = MSG_get_clock() +  ping_timeout;
298
299   msg_task_t ping_task = task_new_ping(node->id,node->mailbox,MSG_host_get_name(MSG_host_self()));
300   msg_task_t task_received = NULL;
301   
302   XBT_VERB("PING %d",id_to_ping);
303
304   //Check that we aren't trying to ping ourselves
305   if (id_to_ping == node->id) {
306     return 1;
307   }
308
309   /* Sending the ping task */
310   MSG_task_dsend(ping_task,mailbox,task_free_v);
311   do
312   { 
313     task_received = NULL;
314     msg_error_t status = MSG_task_receive_with_timeout(&task_received,node->mailbox,ping_timeout);
315     if (status == MSG_OK) {
316       xbt_assert( (task_received != NULL), "Invalid task received");
317       //Checking if it's what we are waiting for or not.
318       task_data_t data = MSG_task_get_data(task_received);
319       xbt_assert( (data != NULL) ,"didn't receive any data...");
320       if (data->type == TASK_PING_ANSWER && id_to_ping == data->sender_id) {
321         XBT_VERB("Ping to %s succeeded",mailbox);
322         node_routing_table_update(node,data->sender_id);
323         destination_found = 1;
324         task_free(task_received);
325       }
326       else {
327         //If it's not our answer, we answer the query anyway.
328         handle_task(node,task_received);
329       }
330     }
331   } while (destination_found == 0 && MSG_get_clock() < timeout);
332   
333   if (MSG_get_clock() >= timeout) {
334     XBT_DEBUG("Ping to %s has timeout.",mailbox);
335     return 0;
336   }
337   if (destination_found == -1) {
338     XBT_DEBUG("It seems that %s is offline...",mailbox);
339     return 0;
340   }
341   return 1;
342 }
343 /**
344   * Does a pseudo-random lookup for someone in the system
345   * @param node caller node data
346   */
347 void random_lookup(node_t node) {
348   unsigned int id_to_look = RANDOM_LOOKUP_NODE; //Totally random.
349   /* TODO: Use some pseudorandom generator like RngStream. */
350   XBT_DEBUG("I'm doing a random lookup");
351   find_node(node,id_to_look,1);
352 }
353 /**
354   * @brief Send a "FIND_NODE" to a node 
355   * @param node sender node data
356   * @param id node we are querying
357   * @param destination node we are trying to find.
358   */
359 void send_find_node(node_t node, unsigned int id, unsigned int destination) {
360   char mailbox[MAILBOX_NAME_SIZE];
361   /* Gets the mailbox to send to */
362   get_node_mailbox(id,mailbox);
363   /* Build the task */
364   msg_task_t task = task_new_find_node(node->id,destination,node->mailbox,MSG_host_get_name(MSG_host_self()));
365   /* Send the task */
366   xbt_assert( (task != NULL), "Trying to send a NULL task.");
367   MSG_task_dsend(task,mailbox,task_free_v);
368   XBT_VERB("Asking %s for its closest nodes",mailbox);
369
370 /**
371   * Sends to the best "kademlia_alpha" nodes in the "node_list" array a "FIND_NODE" request, to ask them for their best nodes
372   */
373 unsigned int send_find_node_to_best(node_t node, answer_t node_list) {
374   unsigned int i = 0, j = 0;
375   unsigned int destination = node_list->destination_id;
376   node_contact_t node_to_query;
377   while (j < kademlia_alpha && i < node_list->size) { /* We need to have at most "kademlia_alpha" requets each time, according to the protocol */
378     /* Gets the node we want to send the query to */
379     node_to_query = xbt_dynar_get_as(node_list->nodes,i,node_contact_t);
380     if (node_to_query->id != node->id) { /* No need to query ourselves */
381       send_find_node(node,node_to_query->id,destination);
382       j++;
383     }
384     i++;
385   }   
386   return i;  
387 }
388 /**
389   * \brief Handles an incomming received task
390   */
391 void handle_task(node_t node, msg_task_t task) {
392   task_data_t data = MSG_task_get_data(task);
393   xbt_assert( (data != NULL), "Received NULL data");
394   //Adding/updating the guy to our routing table
395   node_routing_table_update(node,data->sender_id);
396   switch (data->type) {
397     case TASK_FIND_NODE:
398       handle_find_node(node,data);
399     break;
400     case TASK_FIND_NODE_ANSWER:
401       XBT_DEBUG("Received a wrong answer for a FIND_NODE");
402     break;
403     case TASK_PING:
404       handle_ping(node,data);
405     break;
406     default:
407     
408     break;
409   }
410   task_free(task);
411 }
412 /**
413   * \brief Handles the answer to an incomming "find_node" task
414   */
415 void handle_find_node(node_t node, task_data_t data) {
416   XBT_VERB("Received a FIND_NODE from %s (%s), he's trying to find %d",data->answer_to,data->issuer_host_name,data->destination_id);
417   //Building the answer to the request
418   answer_t answer = node_find_closest(node,data->destination_id);
419   //Building the task to send
420   msg_task_t task = task_new_find_node_answer(node->id,data->destination_id,answer,node->mailbox,MSG_host_get_name(MSG_host_self()));
421   //Sending the task  
422   MSG_task_dsend(task,data->answer_to,task_free_v);
423 }
424 /**
425   * \brief handles the answer to a ping
426   */
427 void handle_ping(node_t node, task_data_t data) {
428   XBT_VERB("Received a PING request from %s (%s)",data->answer_to,data->issuer_host_name);
429   //Building the answer to the request
430   msg_task_t task = task_new_ping_answer(node->id,data->answer_to,MSG_host_get_name(MSG_host_self()));
431   
432   MSG_task_dsend(task,data->answer_to,task_free_v);
433 }
434 /**
435   * \brief Main function
436   */
437 int main(int argc, char *argv[]) {
438
439   MSG_init(&argc, argv);
440
441   /* Check the arguments */
442   if (argc < 3) {
443     printf("Usage: %s platform_file deployment_file \n",argv[0]);
444     return -1;
445   }  
446   
447   const char *platform_file = argv[1];
448   const char *deployment_file = argv[2];
449
450   MSG_create_environment(platform_file);
451   
452   MSG_function_register("node",node);
453   MSG_launch_application(deployment_file);
454   
455   msg_error_t res = MSG_main();
456
457   XBT_CRITICAL("Messages created: %ld", smx_total_comms);
458   XBT_INFO("Simulated time: %g", MSG_get_clock());
459   MSG_clean();
460   
461   if ( res == MSG_OK)
462     return 0;
463   else
464     return 1;
465 }