Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Implementing concurrent join in Chord
[simgrid.git] / examples / msg / chord / chord.c
1
2 /* Copyright (c) 2010. The SimGrid Team.
3  * All rights reserved.                                                     */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include <stdio.h>
9 #include "msg/msg.h"
10 #include "xbt/log.h"
11 #include "xbt/asserts.h"
12 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_chord,
13                              "Messages specific for this msg example");
14
15 #define NB_BITS 6
16 #define NB_KEYS 64
17
18 /**
19  * Finger element.
20  */
21 typedef struct finger {
22   int id;
23   char* mailbox;
24 } s_finger_t, *finger_t;
25
26 /**
27  * Node data.
28  */
29 typedef struct node {
30   int id;                                 // my id
31   char* mailbox;                          // my usual mailbox name
32   s_finger_t fingers[NB_BITS];            // finger table (fingers[0] is my successor)
33   int pred_id;                            // predecessor id
34   char* pred_mailbox;                     // predecessor's mailbox name
35   xbt_dynar_t comms;                      // current communications pending
36 } s_node_t, *node_t;
37
38 /**
39  * Task data
40  */
41 typedef struct task_data {
42   int request_id;
43   int request_finger;
44   int answer_id;
45   const char* answer_to;
46   const char* issuer_host_name; // used for logging
47   int successor_id;                             // used when quitting
48   int pred_id;                                  // used when quitting
49 } s_task_data_t, *task_data_t;
50
51 // utility functions
52 static int normalize(int id);
53 static int is_in_interval(int id, int start, int end);
54 static char* get_mailbox(int host_id);
55 static void print_finger_table(node_t node);
56 static void set_finger(node_t node, int finger_index, int id);
57 static void set_predecessor(node_t node, int predecessor_id);
58
59 // process functions
60 static int node(int argc, char *argv[]);
61
62 // initialization
63 static void initialize_first_node(node_t node);
64 static void join(node_t node, int known_id);
65 static void bootstrap(node_t node, int known_id);
66 static void leave(node_t node);
67
68 // Chord core
69 static int find_successor(node_t node, int id);
70 static int remote_find_successor(node_t node, int ask_to_id, int id);
71 static int find_predecessor(node_t node, int id);
72 static int remote_find_predecessor(node_t node, int ask_to_id, int id);
73 static int closest_preceding_finger(node_t node, int id);
74 static int remote_closest_preceding_finger(int ask_to_id, int id);
75 static void notify(node_t node, int new_node_id);
76 static void remote_notify(node_t node, int notify_id);
77 static void stabilize(node_t node);
78 static void quit_notify(node_t node, int to);
79
80 // not implemented yet
81 static void remote_move_keys(node_t node, int take_from_id);
82
83 // deprecated
84 static void notify_predecessors(node_t node);
85 static void initialize_finger_table(node_t data, int known_id);
86 static void notify_predecessor_changed(node_t node, int predecessor_candidate_id);
87 static void remote_notify_predecessor_changed(node_t node, int notify_to, int predecessor_candidate_id);
88 static void update_finger_table(node_t node, int candidate_id, int finger_index);
89 static void remote_update_finger_table(node_t node, int ask_to_id, int candidate_id, int finger_index);
90
91 /**
92  * \brief Turns an id into an equivalent id in [0, NB_KEYS[
93  * \param id an id
94  * \return the corresponding normalized id
95  */
96 static int normalize(int id)
97 {
98   // make sure id >= 0
99   while (id < 0) {
100     id += NB_KEYS;
101   }
102   // make sure id < NB_KEYS
103   id = id % NB_KEYS;
104
105   return id;
106 }
107
108 /**
109  * \brief Returns whether a id belongs to the interval [start, end].
110  *
111  * The parameters are noramlized to make sure they are between 0 and CHORD_NB_KEYS - 1).
112  * 1 belongs to [62, 3]
113  * 1 does not belong to [3, 62]
114  * 63 belongs to [62, 3]
115  * 63 does not belong to [3, 62]
116  * 24 belongs to [21, 29]
117  * 24 does not belong to [29, 21]
118  *
119  * \param id id to check
120  * \param start lower bound
121  * \param end upper bound
122  * \return a non-zero value if id in in [start, end]
123  */
124 static int is_in_interval(int id, int start, int end)
125 {
126   id = normalize(id);
127   start = normalize(start);
128   end = normalize(end);
129
130   // make sure end >= start and id >= start
131   if (end < start) {
132     end += NB_KEYS;
133   }
134
135   if (id < start) {
136     id += NB_KEYS;
137   }
138
139   return id <= end;
140 }
141
142 /**
143  * \brief Gets the mailbox name of a host given its chord id.
144  * \param node_id id of a node
145  * \return the name of its mailbox
146  * FIXME: free the memory
147  */
148 static char* get_mailbox(int node_id)
149 {
150   return bprintf("mailbox%d", node_id);
151 }
152
153 /**
154  * \brief Displays the finger table of a node.
155  * \param node a node
156  */
157 static void print_finger_table(node_t node)
158 {
159   int i;
160   int pow = 1;
161   INFO0("My finger table:");
162   INFO0("Start | Succ ");
163   for (i = 0; i < NB_BITS; i++) {
164     INFO2(" %3d  | %3d ", (node->id + pow) % NB_KEYS, node->fingers[i].id);
165     pow = pow << 1;
166   }
167   INFO1("Predecessor: %d", node->pred_id);
168 }
169
170 /**
171  * \brief Sets a finger of the current node.
172  * \param node the current node
173  * \param finger_index index of the finger to set (0 to NB_BITS - 1)
174  * \param id the id to set for this finger
175  */
176 static void set_finger(node_t node, int finger_index, int id)
177 {
178   node->fingers[finger_index].id = id;
179   xbt_free(node->fingers[finger_index].mailbox);
180   node->fingers[finger_index].mailbox = get_mailbox(id);
181   INFO2("My new finger #%d is %d", finger_index, id);
182 }
183
184 /**
185  * \brief Sets the predecessor of the current node.
186  * \param node the current node
187  * \param id the id to predecessor
188  */
189 static void set_predecessor(node_t node, int predecessor_id)
190 {
191   node->pred_id = predecessor_id;
192   xbt_free(node->pred_mailbox);
193   node->pred_mailbox = get_mailbox(predecessor_id);
194   INFO1("My new predecessor is %d", predecessor_id);
195 }
196
197 /**
198  * \brief Node Function
199  * Arguments:
200  * - my id
201  * - the id of a guy I know in the system (except for the first node)
202  * - the time to sleep before I join (except for the first node)
203  */
204 int node(int argc, char *argv[])
205 {
206   double init_time = MSG_get_clock();
207   msg_comm_t comm = NULL;
208   int i;
209   char* mailbox;
210   double deadline;
211   double next_stabilize_date = init_time + 50;
212
213   xbt_assert0(argc == 3 || argc == 5, "Wrong number of arguments for this node");
214
215   // initialize my node
216   s_node_t node = {0};
217   node.id = atoi(argv[1]);
218   node.mailbox = get_mailbox(node.id);
219   node.comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
220
221   if (argc == 3) { // first ring
222     initialize_first_node(&node);
223     deadline = atof(argv[2]);
224   }
225   else {
226     int known_id = atoi(argv[2]);
227     double sleep_time = atof(argv[3]);
228     deadline = atof(argv[4]);
229
230     // sleep before starting
231     INFO1("Let's sleep >>%f", sleep_time);
232     MSG_process_sleep(sleep_time);
233     INFO0("Hey! Let's join the system.");
234
235     join(&node, known_id);
236   }
237
238   while ((MSG_get_clock() - init_time) < deadline) {
239
240     unsigned int cursor;
241     comm = NULL;
242     xbt_dynar_foreach(node.comms, cursor, comm) {
243       if (MSG_comm_test(comm)) { // FIXME: try with MSG_comm_testany instead
244         xbt_dynar_cursor_rm(node.comms, &cursor);
245         MSG_comm_destroy(comm);
246       }
247     }
248
249    if (MSG_get_clock() >= next_stabilize_date) {
250      stabilize(&node);
251      next_stabilize_date = MSG_get_clock() + 50;
252    }
253
254     m_task_t task = NULL;
255     MSG_error_t res = MSG_task_receive_with_timeout(&task, node.mailbox, 45); // FIXME >> find the right timeout !!
256
257     if (res == MSG_OK)                           // else check deadline condition and keep waiting for a task
258     {
259
260     // get data
261     const char* task_name = MSG_task_get_name(task);
262     task_data_t task_data = (task_data_t) MSG_task_get_data(task);
263
264     if (!strcmp(task_name, "Find Successor")) {
265       INFO2("Receiving a 'Find Successor' Request from %s for id %d", task_data->issuer_host_name, task_data->request_id);
266       // is my successor the successor?
267       if (is_in_interval(task_data->request_id, node.id + 1, node.fingers[0].id)) {
268         task_data->answer_id = node.fingers[0].id;
269         MSG_task_set_name(task, "Find Successor Answer");
270         INFO3("Sending back a 'Find Successor' Answer to %s: the successor of %d is %d", task_data->issuer_host_name, task_data->request_id, task_data->answer_id);
271         comm = MSG_task_isend(task, task_data->answer_to);
272         xbt_dynar_push(node.comms, &comm);
273       }
274       else {
275         // otherwise, forward the request to the closest preceding finger in my table
276         int closest = closest_preceding_finger(&node, task_data->request_id);
277         INFO2("Forwarding 'Find Successor' request for id %d to my closest preceding finger %d", task_data->request_id, closest);
278         mailbox = get_mailbox(closest);
279         comm = MSG_task_isend(task, mailbox);
280         xbt_dynar_push(node.comms, &comm);
281         xbt_free(mailbox);
282       }
283     }
284
285     else if (!strcmp(task_name, "Find Predecessor")) {
286       INFO2("Receiving a 'Find Predecessor' Request from %s for id %d", task_data->issuer_host_name, task_data->request_id);
287       // am I the predecessor?
288       if (is_in_interval(task_data->request_id, node.id + 1, node.fingers[0].id)) {
289         task_data->answer_id = node.id;
290         MSG_task_set_name(task, "Find Predecessor Answer");
291         INFO3("Sending back a 'Find Predecessor' Answer to %s: the predecessor of %d is %d", task_data->issuer_host_name, task_data->request_id, task_data->answer_id);
292         comm = MSG_task_isend(task, task_data->answer_to);
293         xbt_dynar_push(node.comms, &comm);
294       }
295       else {
296         // otherwise, forward the request to the closest preceding finger in my table
297         int closest = closest_preceding_finger(&node, task_data->request_id);
298         INFO2("Forwarding 'Find Predecessor' request for id %d to my closest preceding finger %d", task_data->request_id, closest);
299         mailbox = get_mailbox(closest);
300         comm = MSG_task_isend(task, mailbox);
301         xbt_dynar_push(node.comms, &comm);
302         xbt_free(mailbox);
303       }
304     }
305
306     else if (!strcmp(task_name, "Notify")) {
307       // someone may be my new neighboor
308       INFO1("Receiving a 'Notify' request from %s", task_data->issuer_host_name);
309       notify(&node, task_data->request_id);
310     }
311     else if (!strcmp(task_name, "Update Finger")) {
312       // someone is telling me that he may be my new finger
313       INFO1("Receiving an 'Update Finger' request from %s", task_data->issuer_host_name);
314       update_finger_table(&node, task_data->request_id, task_data->request_finger);
315     }
316     else if (!strcmp(task_name, "Notify Predecessor Changed")) {
317       // someone is telling me that he may be my new predecessor
318       INFO1("Receiving a 'Notify Predecessor Changed' request from %s", task_data->issuer_host_name);
319       notify_predecessor_changed(&node, task_data->request_id);
320     }
321     else if (!strcmp(task_name, "Predecessor Leaving")) {
322           // my predecessor is about quitting
323           INFO1("Receiving a 'quite notify' from %s", task_data->issuer_host_name);
324           // modify my predecessor
325           node.pred_id = task_data->pred_id;
326           node.pred_mailbox = get_mailbox(node.id);
327           /*TODO :
328           >> notify my new predecessor
329           >> send a notify_predecessors !!
330           */
331
332         }
333     else if (!strcmp(task_name, "Successor Leaving")) {
334           // my successor is about quitting
335           INFO1("Receiving a 'quite notify' from %s", task_data->issuer_host_name);
336           // modify my successor FIXME : this should be implicit ?
337           node.fingers[0].id = task_data->successor_id;
338           node.fingers[0].mailbox = get_mailbox(node.fingers[0].id);
339           /* TODO
340           >> notify my new successor
341           >> update my table & predecessors table */
342         }
343     }
344   }
345   // leave the ring and quit simulation
346   leave(&node);
347   xbt_dynar_free(&node.comms);
348   xbt_free(node.mailbox);
349   xbt_free(node.pred_mailbox);
350   for (i = 0; i < NB_BITS - 1; i++) {
351     xbt_free(node.fingers[i].mailbox);
352   }
353   return 1;
354 }
355
356 /**
357  * \brief Initializes the current node as the first one of the system.
358  * \param node the current node
359  */
360 static void initialize_first_node(node_t node)
361 {
362   INFO0("Create a new Chord ring...");
363
364   // I am my own successor, predecessor and fingers
365   int i;
366   for (i = 0; i < NB_BITS; i++) {
367     set_finger(node, i, node->id);
368   }
369   set_predecessor(node, node->id);
370   print_finger_table(node);
371 }
372
373 /**
374  * \brief Makes the current node join the system, knowing the id of a node
375  * already in the system
376  * \param node the current node
377  * \param known_id id of a node already in the system
378  */
379 static void join(node_t node, int known_id)
380 {
381   INFO0("Joining the system");
382   // find my predecessor and successor
383   int pred;
384   int suc = remote_find_predecessor(node, known_id, node->id);
385   do {
386     pred = suc;
387     suc = remote_find_successor(node, pred, pred);
388   } while (!is_in_interval(node->id, pred + 1, suc));
389
390   set_finger(node, 0, suc);
391   set_predecessor(node, pred);
392   remote_notify(node, pred);
393   remote_notify(node, suc);
394   bootstrap(node, suc);
395 }
396
397 /**
398  * \brief Notifies the current node that a node has just joined the network.
399  * \param node the current node
400  * \param new_node_id id of the new node in the network
401  */
402 static void notify(node_t node, int new_node_id)
403 {
404   INFO1("A new node %d has joined the network, let's see if it is a neighboor", new_node_id);
405   if (is_in_interval(new_node_id, node->id + 1, node->fingers[0].id - 1)) {
406     // new_node_id is my new successor
407     set_finger(node, 0, new_node_id);
408     bootstrap(node, new_node_id);
409   }
410
411   if (is_in_interval(new_node_id, node->pred_id + 1, node->id - 1)) {
412     // new_node_id is my new predecessor
413     set_predecessor(node, new_node_id);
414     bootstrap(node, new_node_id);
415   }
416 }
417
418 /**
419  * \brief Notifies a remote node that the current node has just joined
420  * the network.
421  * \param node the current node
422  * \param notify_id id of the remote node to notify
423  */
424 static void remote_notify(node_t node, int notify_id)
425 {
426   task_data_t req_data = xbt_new0(s_task_data_t, 1);
427   req_data->request_id = node->id;
428   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
429
430   // send a "Notify" request to notify_id
431   INFO2("Sending a 'Notify' request to %d because I have joined the network with id %d", notify_id, node->id);
432   m_task_t task = MSG_task_create("Notify", 1000, 5000, req_data);
433   char* mailbox = get_mailbox(notify_id);
434   msg_comm_t comm = MSG_task_isend(task, mailbox);
435   xbt_dynar_push(node->comms, &comm);
436   xbt_free(mailbox);
437 }
438
439 /**
440 * \brief Let the current node send queries to fill in its own finger table.
441 * \param node the current node
442 * \param ask_to_id id of a node to send queries to
443 */
444 static void bootstrap(node_t node, int ask_to_id)
445 {
446   INFO0("Filling my finger table");
447   int i, pred_id, succ_id;
448   int pow = 1;
449
450   for (i = 0; i < NB_BITS; i++)
451   {
452     pred_id = remote_find_successor(node, ask_to_id, pow);
453     do {
454       succ_id = pred_id;
455       pred_id = remote_find_predecessor(node, pred_id, pred_id);
456     } while (pred_id >= pow);
457
458     pow = pow << 1;
459     set_finger(node, i, succ_id);
460   }
461 }
462
463 /**
464  * \brief Makes the current node quit the system
465  * \param node the current node
466  */
467 static void leave(node_t node)
468 {
469         INFO0("Well Guys! I Think it's time for me to quit ;)");
470         quit_notify(node,1);            // notify to my successor ( >>> 1 );
471         quit_notify(node,-1);           // notify my predecessor  ( >>> -1);
472     // TODO ...
473 }
474 /*
475  * \brief Notify msg to tell my successor||predecessor about my departure
476  * \param node the current node
477  */
478 static void quit_notify(node_t node, int to)
479 {
480         task_data_t req_data = xbt_new0(s_task_data_t, 1);
481         req_data->request_id = node->id;
482         req_data->successor_id = node->fingers[0].id;
483         req_data->pred_id = node->pred_id;
484         req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
485         const char *task_name = NULL;
486         const char* to_mailbox = NULL;
487         if( to == 1)    // notify my successor
488         {
489                 to_mailbox = node->fingers[0].mailbox;
490             INFO1("Telling my Successor about my departure via mailbox %s", to_mailbox);
491                 task_name = "Predecessor Leaving";
492
493         }
494         else if(to == -1)    // notify my predecessor
495         {
496                 to_mailbox = node->pred_mailbox;
497             INFO1("Telling my Predecessor about my departure via mailbox %s",to_mailbox);
498                 task_name = "Predecessor Leaving";
499         }
500     m_task_t task = MSG_task_create(task_name, 1000, 5000, req_data);
501     //char* mailbox = get_mailbox(to_mailbox);
502         msg_comm_t comm = MSG_task_isend(task, to_mailbox);
503         xbt_dynar_push(node->comms, &comm);
504 }
505
506 /*
507  * \brief Initializes my finger table, knowing the id of a node already in the system.
508  * \param node the current node
509  * \param known_id id of a node already in the system
510  */
511 static void initialize_finger_table(node_t node, int known_id)
512 {
513   int my_id = node->id;
514   int i;
515   int pow = 1; // 2^i
516
517   INFO0("Initializing my finger table...");
518
519   // ask known_id who is my immediate successor
520   node->fingers[0].id = remote_find_successor(node, known_id, my_id + 1);
521   node->fingers[0].mailbox = get_mailbox(node->fingers[0].id);
522
523   // find all other fingers
524   for (i = 0; i < NB_BITS - 1; i++) {
525
526     pow = pow << 1; // equivalent to pow = pow * 2
527     if (is_in_interval(my_id + pow, my_id, node->fingers[i].id - 1)) {
528       // I already have the info for this finger
529       node->fingers[i + 1].id = node->fingers[i].id;
530     }
531     else {
532       // I don't have the info, ask the only guy I know
533       node->fingers[i + 1].id = remote_find_successor(node, known_id, my_id + pow);
534     }
535     node->fingers[i + 1].mailbox = get_mailbox(node->fingers[i + 1].id);
536   }
537
538   node->pred_id = find_predecessor(node, node->id);
539   node->pred_mailbox = get_mailbox(node->pred_id);
540
541   INFO0("Finger table initialized!");
542   print_finger_table(node);
543 }
544
545 /**
546  * \brief Notifies some nodes that the current node may have became their finger.
547  * \param node the current node, which has just joined the system
548  */
549 static void notify_predecessors(node_t node)
550 {
551   int i, pred_id;
552   int pow = 1;
553   for (i = 0; i < NB_BITS; i++) {
554     // find the closest node whose finger #i can be me
555     pred_id = find_predecessor(node, node->id - pow + 1); // note: no "+1" in the article!
556     if (pred_id != node->id) {
557       remote_update_finger_table(node, pred_id, node->id, i);
558     }
559     pow = pow << 1; // pow = pow * 2
560   }
561 }
562
563 /**
564  * \brief Tells the current node that a node may have became its new finger.
565  * \param node the current node
566  * \param candidate_id id of the node that may be a new finger of the current node
567  * \param finger_index index of the finger to update
568  */
569 static void update_finger_table(node_t node, int candidate_id, int finger_index)
570 {
571   int pow = 1;
572   int i;
573   for (i = 0; i < finger_index; i++) {
574     pow = pow << 1;
575   }
576
577   //  if (is_in_interval(candidate_id, node->id + pow, node->fingers[finger_index].id - 1)) {
578   if (is_in_interval(candidate_id, node->id, node->fingers[finger_index].id - 1)) {
579 //    INFO3("Candidate %d is between %d and %d!", candidate_id, node->id + pow, node->fingers[finger_index].id - 1);
580     // candidate_id is my new finger
581     set_finger(node, finger_index, candidate_id);
582     print_finger_table(node);
583
584     if (node->pred_id != node->id) { // FIXME: is this necessary?
585       // my predecessor may be concerned too
586       remote_update_finger_table(node, node->pred_id, candidate_id, finger_index);
587     }
588   }
589 }
590
591 /**
592  * \brief Tells a remote node that a node may have became its new finger.
593  * \param ask_to_id id of the remote node to update
594  * \param candidate_id id of the node that may be a new finger of the remote node
595  * \param finger_index index of the finger to update
596  */
597 static void remote_update_finger_table(node_t node, int ask_to_id, int candidate_id, int finger_index)
598 {
599   task_data_t req_data = xbt_new0(s_task_data_t, 1);
600   req_data->request_id = candidate_id;
601   req_data->request_finger = finger_index;
602   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
603
604   // send a "Update Finger" request to ask_to_id
605   INFO3("Sending an 'Update Finger' request to %d: his finger #%d may be %d now", ask_to_id, finger_index, candidate_id);
606   m_task_t task = MSG_task_create("Update Finger", 1000, 5000, req_data);
607   char* mailbox = get_mailbox(ask_to_id);
608   msg_comm_t comm = MSG_task_isend(task, mailbox);
609   xbt_dynar_push(node->comms, &comm);
610   xbt_free(mailbox);
611 }
612
613 /**
614  * \brief Makes the current node find the successor node of an id.
615  * \param node the current node
616  * \param id the id to find
617  * \return the id of the successor node
618  */
619 static int find_successor(node_t node, int id)
620 {
621   // is my successor the successor?
622   if (is_in_interval(id, node->id + 1, node->fingers[0].id)) {
623     return node->fingers[0].id;
624   }
625
626   // otherwise, ask the closest preceding finger in my table
627   int closest = closest_preceding_finger(node, id);
628   return remote_find_successor(node, closest, id);
629 }
630
631 /**
632  * \brief Asks another node the successor node of an id.
633  * \param node the current node
634  * \param ask_to the node to ask to
635  * \param id the id to find
636  * \return the id of the successor node
637  */
638 static int remote_find_successor(node_t node, int ask_to, int id)
639 {
640   s_task_data_t req_data;
641   char* mailbox = bprintf("%s Find Successor", node->mailbox);
642   req_data.request_id = id;
643   req_data.answer_to = mailbox;
644   req_data.issuer_host_name = MSG_host_get_name(MSG_host_self());
645
646   // send a "Find Successor" request to ask_to_id
647   INFO2("Sending a 'Find Successor' request to %d for key %d", ask_to, id);
648   m_task_t task = MSG_task_create("Find Successor", 1000, 5000, &req_data);
649   MSG_task_send(task, get_mailbox(ask_to));
650
651   // receive the answer
652   task = NULL;
653   MSG_task_receive(&task, req_data.answer_to);
654   task_data_t ans_data;
655   ans_data = MSG_task_get_data(task);
656   int successor = ans_data->answer_id;
657   xbt_free(mailbox);
658   INFO2("Received the answer to my Find Successor request: the successor of key %d is %d", id, successor);
659
660   return successor;
661 }
662
663 /**
664  * \brief Makes the current node find the predecessor node of an id.
665  * \param node the current node
666  * \param id the id to find
667  * \return the id of the predecessor node
668  */
669 static int find_predecessor(node_t node, int id)
670 {
671   if (node->id == node->fingers[0].id) {
672     // I am the only node in the system
673     return node->id;
674   }
675
676   if (is_in_interval(id, node->id + 1, node->fingers[0].id)) {
677     return node->id;
678   }
679   int ask_to = closest_preceding_finger(node, id);
680   return remote_find_predecessor(node, ask_to, id);
681 }
682
683 /**
684  * \brief Asks another node the predecessor node of an id.
685  * \param node the current node
686  * \param ask_to the node to ask to
687  * \param id the id to find
688  * \return the id of the predecessor node
689  */
690 static int remote_find_predecessor(node_t node, int ask_to, int id)
691 {
692   s_task_data_t req_data;
693   char* mailbox = bprintf("%s Find Predecessor", node->mailbox);
694   req_data.request_id = id;
695   req_data.answer_to = mailbox;
696   req_data.issuer_host_name = MSG_host_get_name(MSG_host_self());
697
698   // send a "Find Predecessor" request to ask_to
699   INFO2("Sending a 'Find Predecessor' request to %d for key %d", ask_to, id);
700   m_task_t task = MSG_task_create("Find Predecessor", 1000, 5000, &req_data);
701   MSG_task_send(task, get_mailbox(ask_to));
702
703   // receive the answer
704   task = NULL;
705   MSG_task_receive(&task, req_data.answer_to);
706   task_data_t ans_data;
707   ans_data = MSG_task_get_data(task);
708   int predecessor = ans_data->answer_id;
709   xbt_free(mailbox);
710   INFO2("Received the answer to my 'Find Predecessor' request: the predecessor of key %d is %d", id, predecessor);
711
712   return predecessor;
713 }
714
715 /**
716  * \brief Returns the closest preceding finger of an id
717  * with respect to the finger table of the current node.
718  * \param node the current node
719  * \param id the id to find
720  * \return the closest preceding finger of that id
721  */
722 int closest_preceding_finger(node_t node, int id)
723 {
724   int i;
725   for (i = NB_BITS - 1; i >= 0; i--) {
726     if (is_in_interval(node->fingers[i].id, node->id + 1, id - 1)) {
727       return node->fingers[i].id;
728     }
729   }
730   return node->id;
731 }
732
733 /**
734  * \brief This function is called periodically. It checks the immediate
735  * successor and predecessor of the current node.
736  * \param node the current node
737  */
738 static void stabilize(node_t node)
739 {
740   INFO0("Stabilizing node");
741   int succ_id;
742   succ_id = node->pred_id;
743   succ_id = remote_find_successor(node, succ_id, succ_id);
744   if (is_in_interval(succ_id, node->pred_id + 1, node->id - 1)) {
745     set_predecessor(node, succ_id);
746   }
747   succ_id = node->fingers[0].id;
748   succ_id = remote_find_predecessor(node, succ_id, succ_id);
749   if (is_in_interval(succ_id, node->id + 1, node->fingers[0].id - 1)) {
750     set_finger(node, 0, succ_id);
751   }
752 }
753
754 /**
755  * \brief Notifies the current node that its predecessor may have changed.
756  * \param node the current node
757  * \param candidate_id the possible new predecessor
758  */
759 static void notify_predecessor_changed(node_t node, int predecessor_candidate_id) {
760
761   if (node->pred_id == node->id
762     || is_in_interval(predecessor_candidate_id, node->pred_id, node->id)) {
763
764     set_predecessor(node, predecessor_candidate_id);
765     print_finger_table(node);
766   }
767   else {
768     INFO1("I don't have to change my predecessor to %d", predecessor_candidate_id);
769   }
770 }
771
772 /**
773  * \brief Notifies a remote node that its predecessor may have changed.
774  * \param node the current node
775  * \param notify_id id of the node to notify
776  * \param candidate_id the possible new predecessor
777  */
778 static void remote_notify_predecessor_changed(node_t node, int notify_id, int predecessor_candidate_id) {
779
780   task_data_t req_data = xbt_new0(s_task_data_t, 1);
781   req_data->request_id = predecessor_candidate_id;
782   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
783
784   // send a "Notify" request to notify_id
785   INFO1("Sending a 'Notify Predecessor Changed' request to %d", notify_id);
786   m_task_t task = MSG_task_create("Notify Predecessor Changed", 1000, 5000, req_data);
787   char* mailbox = get_mailbox(notify_id);
788   msg_comm_t comm = MSG_task_isend(task, mailbox);
789   xbt_dynar_push(node->comms, &comm);
790   xbt_free(mailbox);
791 }
792
793 /**
794  * \brief Asks a node to take some of its keys.
795  * \param node the current node, which has just joined the system
796  * \param take_from_id id of a node who may have keys to give to the current node
797  */
798 static void remote_move_keys(node_t node, int take_from_id) {
799   // TODO
800 }
801
802 /**
803  * \brief Main function.
804  */
805 int main(int argc, char *argv[])
806 {
807   if (argc < 3) {
808     printf("Usage: %s platform_file deployment_file\n", argv[0]);
809     printf("example: %s ../msg_platform.xml chord.xml\n", argv[0]);
810     exit(1);
811   }
812
813   MSG_global_init(&argc, argv);
814
815   const char* platform_file = argv[1];
816   const char* application_file = argv[2];
817
818   /* MSG_config("workstation/model","KCCFLN05"); */
819   MSG_set_channel_number(0);
820   MSG_create_environment(platform_file);
821
822   MSG_function_register("node", node);
823   MSG_launch_application(application_file);
824
825   MSG_error_t res = MSG_main();
826   INFO1("Simulation time: %g", MSG_get_clock());
827
828   MSG_clean();
829
830   if (res == MSG_OK)
831     return 0;
832   else
833     return 1;
834 }