Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
58aa2987cb964f30051ec0b9ca02577ea7db63a2
[simgrid.git] / examples / msg / chord / chord.c
1
2 /* Copyright (c) 2010. The SimGrid Team.
3  * All rights reserved.                                                     */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include <stdio.h>
9 #include "msg/msg.h"
10 #include "xbt/log.h"
11 #include "xbt/asserts.h"
12 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_chord,
13                              "Messages specific for this msg example");
14
15 #define COMM_SIZE 10
16 #define COMP_SIZE 0
17
18 static int nb_bits = 16;
19 static int nb_keys = 0;
20 static int timeout = 50;
21 static int max_simulation_time = 1000;
22 static int periodic_stabilize_delay = 20;
23 static int periodic_fix_fingers_delay = 120;
24 static int periodic_check_predecessor_delay = 120;
25
26 /**
27  * Finger element.
28  */
29 typedef struct finger {
30   int id;
31   char* mailbox;
32 } s_finger_t, *finger_t;
33
34 /**
35  * Node data.
36  */
37 typedef struct node {
38   int id;                                 // my id
39   char* mailbox;                          // my usual mailbox name
40   s_finger_t *fingers;                    // finger table, of size nb_bits (fingers[0] is my successor)
41   int pred_id;                            // predecessor id
42   char* pred_mailbox;                     // predecessor's mailbox name
43   int next_finger_to_fix;                 // index of the next finger to fix in fix_fingers()
44   msg_comm_t comm_receive;                // current communication to receive
45   xbt_dynar_t comms;                      // current communications being sent
46   double last_change_date;                // last time I changed a finger or my predecessor
47 } s_node_t, *node_t;
48
49 /**
50  * Types of tasks exchanged between nodes.
51  */
52 typedef enum {
53   TASK_FIND_SUCCESSOR,
54   TASK_FIND_SUCCESSOR_ANSWER,
55   TASK_GET_PREDECESSOR,
56   TASK_GET_PREDECESSOR_ANSWER,
57   TASK_NOTIFY,
58   TASK_SUCCESSOR_LEAVING,
59   TASK_PREDECESSOR_LEAVING
60 } e_task_type_t;
61
62 /**
63  * Data attached with the tasks sent and received
64  */
65 typedef struct task_data {
66   e_task_type_t type;                     // type of task
67   int request_id;                         // id paramater (used by some types of tasks)
68   int request_finger;                     // finger parameter (used by some types of tasks)
69   int answer_id;                          // answer (used by some types of tasks)
70   char* answer_to;                        // mailbox to send an answer to (or NULL)
71   const char* issuer_host_name;           // used for logging
72 } s_task_data_t, *task_data_t;
73
74 static int *powers2;
75
76 // utility functions
77 static void chord_initialize(void);
78 static int normalize(int id);
79 static int is_in_interval(int id, int start, int end);
80 static char* get_mailbox(int host_id);
81 static void task_data_destroy(task_data_t task_data);
82 static void print_finger_table(node_t node);
83 static void set_finger(node_t node, int finger_index, int id);
84 static void set_predecessor(node_t node, int predecessor_id);
85
86 // process functions
87 static int node(int argc, char *argv[]);
88 static void handle_task(node_t node, m_task_t task);
89
90 // Chord core
91 static void create(node_t node);
92 static int join(node_t node, int known_id);
93 static void leave(node_t node);
94 static int find_successor(node_t node, int id);
95 static int remote_find_successor(node_t node, int ask_to_id, int id);
96 static int remote_get_predecessor(node_t node, int ask_to_id);
97 static int closest_preceding_node(node_t node, int id);
98 static void stabilize(node_t node);
99 static void notify(node_t node, int predecessor_candidate_id);
100 static void remote_notify(node_t node, int notify_to, int predecessor_candidate_id);
101 static void fix_fingers(node_t node);
102 static void check_predecessor(node_t node);
103 static void quit_notify(node_t node, int to);
104
105 /**
106  * \brief Global initialization of the Chord simulation.
107  */
108 static void chord_initialize(void)
109 {
110   // compute the powers of 2 once for all
111   powers2 = xbt_new(int, nb_bits);
112   int pow = 1;
113   int i;
114   for (i = 0; i < nb_bits; i++) {
115     powers2[i] = pow;
116     pow = pow << 1;
117   }
118   nb_keys = pow;
119   DEBUG1("Sets nb_keys to %d", nb_keys);
120 }
121
122 /**
123  * \brief Turns an id into an equivalent id in [0, nb_keys).
124  * \param id an id
125  * \return the corresponding normalized id
126  */
127 static int normalize(int id)
128 {
129   // make sure id >= 0
130   while (id < 0) {
131     id += nb_keys;
132   }
133   // make sure id < nb_keys
134   id = id % nb_keys;
135
136   return id;
137 }
138
139 /**
140  * \brief Returns whether a id belongs to the interval [start, end].
141  *
142  * The parameters are noramlized to make sure they are between 0 and nb_keys - 1).
143  * 1 belongs to [62, 3]
144  * 1 does not belong to [3, 62]
145  * 63 belongs to [62, 3]
146  * 63 does not belong to [3, 62]
147  * 24 belongs to [21, 29]
148  * 24 does not belong to [29, 21]
149  *
150  * \param id id to check
151  * \param start lower bound
152  * \param end upper bound
153  * \return a non-zero value if id in in [start, end]
154  */
155 static int is_in_interval(int id, int start, int end)
156 {
157   id = normalize(id);
158   start = normalize(start);
159   end = normalize(end);
160
161   // make sure end >= start and id >= start
162   if (end < start) {
163     end += nb_keys;
164   }
165
166   if (id < start) {
167     id += nb_keys;
168   }
169
170   return id <= end;
171 }
172
173 /**
174  * \brief Gets the mailbox name of a host given its chord id.
175  * \param node_id id of a node
176  * \return the name of its mailbox
177  */
178 static char* get_mailbox(int node_id)
179 {
180   return bprintf("mailbox%d", node_id);
181 }
182
183 /**
184  * \brief Frees the memory used by some task data.
185  * \param task_data the task data to destroy
186  */
187 static void task_data_destroy(task_data_t task_data)
188 {
189   xbt_free(task_data->answer_to);
190   xbt_free(task_data);
191 }
192
193 /**
194  * \brief Displays the finger table of a node.
195  * \param node a node
196  */
197 static void print_finger_table(node_t node)
198 {
199   if (XBT_LOG_ISENABLED(msg_chord, xbt_log_priority_verbose)) {
200     int i;
201     int pow = 1;
202     VERB0("My finger table:");
203     VERB0("Start | Succ ");
204     for (i = 0; i < nb_bits; i++) {
205       VERB2(" %3d  | %3d ", (node->id + pow) % nb_keys, node->fingers[i].id);
206       pow = pow << 1;
207     }
208     VERB1("Predecessor: %d", node->pred_id);
209   }
210 }
211
212 /**
213  * \brief Sets a finger of the current node.
214  * \param node the current node
215  * \param finger_index index of the finger to set (0 to nb_bits - 1)
216  * \param id the id to set for this finger
217  */
218 static void set_finger(node_t node, int finger_index, int id)
219 {
220   if (id != node->fingers[finger_index].id) {
221     node->fingers[finger_index].id = id;
222     xbt_free(node->fingers[finger_index].mailbox);
223     node->fingers[finger_index].mailbox = get_mailbox(id);
224     node->last_change_date = MSG_get_clock();
225     DEBUG2("My new finger #%d is %d", finger_index, id);
226   }
227 }
228
229 /**
230  * \brief Sets the predecessor of the current node.
231  * \param node the current node
232  * \param id the id to predecessor, or -1 to unset the predecessor
233  */
234 static void set_predecessor(node_t node, int predecessor_id)
235 {
236   if (predecessor_id != node->pred_id) {
237     node->pred_id = predecessor_id;
238     xbt_free(node->pred_mailbox);
239
240     if (predecessor_id != -1) {
241       node->pred_mailbox = get_mailbox(predecessor_id);
242     }
243     node->last_change_date = MSG_get_clock();
244
245     DEBUG1("My new predecessor is %d", predecessor_id);
246   }
247 }
248
249 /**
250  * \brief Node Function
251  * Arguments:
252  * - my id
253  * - the id of a guy I know in the system (except for the first node)
254  * - the time to sleep before I join (except for the first node)
255  */
256 int node(int argc, char *argv[])
257 {
258   double init_time = MSG_get_clock();
259   m_task_t task = NULL;
260   m_task_t task_received = NULL;
261   msg_comm_t comm_send = NULL;
262   int i;
263   int index;
264   int join_success = 0;
265   double deadline;
266   double next_stabilize_date = init_time + 10;
267   double next_fix_fingers_date = init_time + 10;
268   double next_check_predecessor_date = init_time + 10;
269
270   xbt_assert0(argc == 3 || argc == 5, "Wrong number of arguments for this node");
271
272   // initialize my node
273   s_node_t node = {0};
274   node.id = atoi(argv[1]);
275   node.mailbox = get_mailbox(node.id);
276   node.next_finger_to_fix = 0;
277   node.comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
278   node.fingers = xbt_new0(s_finger_t, nb_bits);
279   node.last_change_date = MSG_get_clock();
280
281   for (i = 0; i < nb_bits; i++) {
282     set_finger(&node, i, node.id);
283   }
284
285   if (argc == 3) { // first ring
286     deadline = atof(argv[2]);
287     create(&node);
288     join_success = 1;
289   }
290   else {
291     int known_id = atoi(argv[2]);
292     //double sleep_time = atof(argv[3]);
293     deadline = atof(argv[4]);
294
295     /*
296     // sleep before starting
297     DEBUG1("Let's sleep during %f", sleep_time);
298     MSG_process_sleep(sleep_time);
299     */
300     DEBUG0("Hey! Let's join the system.");
301
302     join_success = join(&node, known_id);
303   }
304
305   if (join_success) {
306     while (MSG_get_clock() < init_time + deadline
307 //      && MSG_get_clock() < node.last_change_date + 1000
308         && MSG_get_clock() < max_simulation_time) {
309
310       if (node.comm_receive == NULL) {
311         task_received = NULL;
312         node.comm_receive = MSG_task_irecv(&task_received, node.mailbox);
313         // FIXME: do not make MSG_task_irecv() calls from several functions
314       }
315
316       if (!MSG_comm_test(node.comm_receive)) {
317
318         // no task was received: make some periodic calls
319         if (MSG_get_clock() >= next_stabilize_date) {
320           stabilize(&node);
321           next_stabilize_date = MSG_get_clock() + periodic_stabilize_delay;
322         }
323         else if (MSG_get_clock() >= next_fix_fingers_date) {
324           fix_fingers(&node);
325           next_fix_fingers_date = MSG_get_clock() + periodic_fix_fingers_delay;
326         }
327         else if (MSG_get_clock() >= next_check_predecessor_date) {
328           check_predecessor(&node);
329           next_check_predecessor_date = MSG_get_clock() + periodic_check_predecessor_delay;
330         }
331         else {
332           // nothing to do: sleep for a while
333           MSG_process_sleep(5);
334         }
335       }
336       else {
337         // a transfer has occured
338
339         MSG_error_t status = MSG_comm_get_status(node.comm_receive);
340
341         if (status != MSG_OK) {
342           DEBUG0("Failed to receive a task. Nevermind.");
343           node.comm_receive = NULL;
344         }
345         else {
346           // the task was successfully received
347           MSG_comm_destroy(node.comm_receive);
348           node.comm_receive = NULL;
349           handle_task(&node, task_received);
350         }
351       }
352
353       // see if some communications are finished
354       while ((index = MSG_comm_testany(node.comms)) != -1) {
355         comm_send = xbt_dynar_get_as(node.comms, index, msg_comm_t);
356         MSG_error_t status = MSG_comm_get_status(comm_send);
357         xbt_dynar_remove_at(node.comms, index, &comm_send);
358         DEBUG3("Communication %p is finished with status %d, dynar size is now %lu",
359             comm_send, status, xbt_dynar_length(node.comms));
360         MSG_comm_destroy(comm_send);
361       }
362     }
363
364     // clean unfinished comms sent
365     unsigned int cursor;
366     xbt_dynar_foreach(node.comms, cursor, comm_send) {
367       task = MSG_comm_get_task(comm_send);
368       MSG_task_cancel(task);
369       task_data_destroy(MSG_task_get_data(task));
370       MSG_task_destroy(task);
371       MSG_comm_destroy(comm_send);
372       // FIXME: the task is actually not destroyed because MSG thinks that the other side (whose process is dead) is still using it
373     }
374
375     // leave the ring
376     leave(&node);
377   }
378
379   // stop the simulation
380   xbt_dynar_free(&node.comms);
381   xbt_free(node.mailbox);
382   xbt_free(node.pred_mailbox);
383   for (i = 0; i < nb_bits - 1; i++) {
384     xbt_free(node.fingers[i].mailbox);
385   }
386   xbt_free(node.fingers);
387   return 0;
388 }
389
390 /**
391  * \brief This function is called when the current node receives a task.
392  * \param node the current node
393  * \param task the task to handle (don't touch it then:
394  * it will be destroyed, reused or forwarded)
395  */
396 static void handle_task(node_t node, m_task_t task) {
397
398   DEBUG1("Handling task %p", task);
399   msg_comm_t comm = NULL;
400   char* mailbox = NULL;
401   task_data_t task_data = (task_data_t) MSG_task_get_data(task);
402   e_task_type_t type = task_data->type;
403
404   switch (type) {
405
406     case TASK_FIND_SUCCESSOR:
407       DEBUG2("Receiving a 'Find Successor' request from %s for id %d",
408           task_data->issuer_host_name, task_data->request_id);
409       // is my successor the successor?
410       if (is_in_interval(task_data->request_id, node->id + 1, node->fingers[0].id)) {
411         task_data->type = TASK_FIND_SUCCESSOR_ANSWER;
412         task_data->answer_id = node->fingers[0].id;
413         DEBUG3("Sending back a 'Find Successor Answer' to %s: the successor of %d is %d",
414             task_data->issuer_host_name,
415             task_data->request_id, task_data->answer_id);
416         comm = MSG_task_isend(task, task_data->answer_to);
417         xbt_dynar_push(node->comms, &comm);
418       }
419       else {
420         // otherwise, forward the request to the closest preceding finger in my table
421         int closest = closest_preceding_node(node, task_data->request_id);
422         DEBUG2("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d",
423             task_data->request_id, closest);
424         mailbox = get_mailbox(closest);
425         comm = MSG_task_isend(task, mailbox);
426         xbt_dynar_push(node->comms, &comm);
427         xbt_free(mailbox);
428       }
429       break;
430
431     case TASK_GET_PREDECESSOR:
432       DEBUG1("Receiving a 'Get Predecessor' request from %s", task_data->issuer_host_name);
433       task_data->type = TASK_GET_PREDECESSOR_ANSWER;
434       task_data->answer_id = node->pred_id;
435       DEBUG3("Sending back a 'Get Predecessor Answer' to %s via mailbox '%s': my predecessor is %d",
436           task_data->issuer_host_name,
437           task_data->answer_to, task_data->answer_id);
438       comm = MSG_task_isend(task, task_data->answer_to);
439       xbt_dynar_push(node->comms, &comm);
440       break;
441
442     case TASK_NOTIFY:
443       // someone is telling me that he may be my new predecessor
444       DEBUG1("Receiving a 'Notify' request from %s", task_data->issuer_host_name);
445       notify(node, task_data->request_id);
446       task_data_destroy(task_data);
447       MSG_task_destroy(task);
448       break;
449
450     case TASK_PREDECESSOR_LEAVING:
451       // my predecessor is about to quit
452       DEBUG1("Receiving a 'Predecessor Leaving' message from %s", task_data->issuer_host_name);
453       // modify my predecessor
454       set_predecessor(node, task_data->request_id);
455       task_data_destroy(task_data);
456       MSG_task_destroy(task);
457       /*TODO :
458       >> notify my new predecessor
459       >> send a notify_predecessors !!
460        */
461       break;
462
463     case TASK_SUCCESSOR_LEAVING:
464       // my successor is about to quit
465       DEBUG1("Receiving a 'Successor Leaving' message from %s", task_data->issuer_host_name);
466       // modify my successor FIXME : this should be implicit ?
467       set_finger(node, 0, task_data->request_id);
468       task_data_destroy(task_data);
469       MSG_task_destroy(task);
470       /* TODO
471       >> notify my new successor
472       >> update my table & predecessors table */
473       break;
474
475     case TASK_FIND_SUCCESSOR_ANSWER:
476     case TASK_GET_PREDECESSOR_ANSWER:
477       DEBUG2("Ignoring unexpected task of type %d (%p)", type, task);
478       break;
479   }
480 }
481
482 /**
483  * \brief Initializes the current node as the first one of the system.
484  * \param node the current node
485  */
486 static void create(node_t node)
487 {
488   DEBUG0("Create a new Chord ring...");
489   set_predecessor(node, -1); // -1 means that I have no predecessor
490   print_finger_table(node);
491 }
492
493 /**
494  * \brief Makes the current node join the ring, knowing the id of a node
495  * already in the ring
496  * \param node the current node
497  * \param known_id id of a node already in the ring
498  * \return 1 if the join operation succeeded, 0 otherwise
499  */
500 static int join(node_t node, int known_id)
501 {
502   INFO2("Joining the ring with id %d, knowing node %d", node->id, known_id);
503   set_predecessor(node, -1); // no predecessor (yet)
504
505   int successor_id = remote_find_successor(node, known_id, node->id);
506   if (successor_id == -1) {
507     INFO0("Cannot join the ring.");
508   }
509   else {
510     set_finger(node, 0, successor_id);
511     print_finger_table(node);
512   }
513
514   return successor_id != -1;
515 }
516
517 /**
518  * \brief Makes the current node quit the system
519  * \param node the current node
520  */
521 static void leave(node_t node)
522 {
523   DEBUG0("Well Guys! I Think it's time for me to quit ;)");
524   quit_notify(node, 1);  // notify to my successor ( >>> 1 );
525   quit_notify(node, -1); // notify my predecessor  ( >>> -1);
526   // TODO ...
527 }
528
529 /*
530  * \brief Notifies the successor or the predecessor of the current node
531  * of the departure
532  * \param node the current node
533  * \param to 1 to notify the successor, -1 to notify the predecessor
534  * FIXME: notify both nodes with only one call
535  */
536 static void quit_notify(node_t node, int to)
537 {
538   /* TODO
539   task_data_t req_data = xbt_new0(s_task_data_t, 1);
540   req_data->request_id = node->id;
541   req_data->successor_id = node->fingers[0].id;
542   req_data->pred_id = node->pred_id;
543   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
544   req_data->answer_to = NULL;
545   const char* task_name = NULL;
546   const char* to_mailbox = NULL;
547   if (to == 1) {    // notify my successor
548     to_mailbox = node->fingers[0].mailbox;
549     INFO2("Telling my Successor %d about my departure via mailbox %s",
550           node->fingers[0].id, to_mailbox);
551     req_data->type = TASK_PREDECESSOR_LEAVING;
552   }
553   else if (to == -1) {    // notify my predecessor
554
555     if (node->pred_id == -1) {
556       return;
557     }
558
559     to_mailbox = node->pred_mailbox;
560     INFO2("Telling my Predecessor %d about my departure via mailbox %s",
561           node->pred_id, to_mailbox);
562     req_data->type = TASK_SUCCESSOR_LEAVING;
563   }
564   m_task_t task = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
565   //char* mailbox = get_mailbox(to_mailbox);
566   msg_comm_t comm = MSG_task_isend(task, to_mailbox);
567   xbt_dynar_push(node->comms, &comm);
568   */
569 }
570
571 /**
572  * \brief Makes the current node find the successor node of an id.
573  * \param node the current node
574  * \param id the id to find
575  * \return the id of the successor node, or -1 if the request failed
576  */
577 static int find_successor(node_t node, int id)
578 {
579   // is my successor the successor?
580   if (is_in_interval(id, node->id + 1, node->fingers[0].id)) {
581     return node->fingers[0].id;
582   }
583
584   // otherwise, ask the closest preceding finger in my table
585   int closest = closest_preceding_node(node, id);
586   return remote_find_successor(node, closest, id);
587 }
588
589 /**
590  * \brief Asks another node the successor node of an id.
591  * \param node the current node
592  * \param ask_to the node to ask to
593  * \param id the id to find
594  * \return the id of the successor node, or -1 if the request failed
595  */
596 static int remote_find_successor(node_t node, int ask_to, int id)
597 {
598   int successor = -1;
599   int stop = 0;
600   char* mailbox = get_mailbox(ask_to);
601   task_data_t req_data = xbt_new0(s_task_data_t, 1);
602   req_data->type = TASK_FIND_SUCCESSOR;
603   req_data->request_id = id;
604   req_data->answer_to = xbt_strdup(node->mailbox);
605   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
606
607   // send a "Find Successor" request to ask_to_id
608   m_task_t task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
609   DEBUG3("Sending a 'Find Successor' request (task %p) to %d for id %d", task_sent, ask_to, id);
610   MSG_error_t res = MSG_task_send_with_timeout(task_sent, mailbox, timeout);
611
612   if (res != MSG_OK) {
613     DEBUG3("Failed to send the 'Find Successor' request (task %p) to %d for id %d",
614         task_sent, ask_to, id);
615     MSG_task_destroy(task_sent);
616     task_data_destroy(req_data);
617   }
618   else {
619
620     // receive the answer
621     DEBUG3("Sent a 'Find Successor' request (task %p) to %d for key %d, waiting for the answer",
622         task_sent, ask_to, id);
623
624     do {
625       if (node->comm_receive == NULL) {
626         m_task_t task_received = NULL;
627         node->comm_receive = MSG_task_irecv(&task_received, node->mailbox);
628       }
629
630       res = MSG_comm_wait(node->comm_receive, timeout);
631
632       if (res != MSG_OK) {
633         DEBUG2("Failed to receive the answer to my 'Find Successor' request (task %p): %d",
634             task_sent, res);
635         stop = 1;
636         //MSG_comm_destroy(node->comm_receive);
637       }
638       else {
639         m_task_t task_received = MSG_comm_get_task(node->comm_receive);
640         DEBUG1("Received a task (%p)", task_received);
641         task_data_t ans_data = MSG_task_get_data(task_received);
642
643         if (task_received != task_sent) {
644           // this is not the expected answer
645           handle_task(node, task_received);
646         }
647         else {
648           // this is our answer
649           DEBUG4("Received the answer to my 'Find Successor' request for id %d (task %p): the successor of key %d is %d",
650               ans_data->request_id, task_received, id, ans_data->answer_id);
651           successor = ans_data->answer_id;
652           stop = 1;
653           MSG_task_destroy(task_received);
654           task_data_destroy(req_data);
655         }
656       }
657       node->comm_receive = NULL;
658     } while (!stop);
659   }
660
661   xbt_free(mailbox);
662   return successor;
663 }
664
665 /**
666  * \brief Asks another node its predecessor.
667  * \param node the current node
668  * \param ask_to the node to ask to
669  * \return the id of its predecessor node, or -1 if the request failed
670  * (or if the node does not know its predecessor)
671  */
672 static int remote_get_predecessor(node_t node, int ask_to)
673 {
674   int predecessor_id = -1;
675   int stop = 0;
676   char* mailbox = get_mailbox(ask_to);
677   task_data_t req_data = xbt_new0(s_task_data_t, 1);
678   req_data->type = TASK_GET_PREDECESSOR;
679   req_data->answer_to = xbt_strdup(node->mailbox);
680   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
681
682   // send a "Get Predecessor" request to ask_to_id
683   DEBUG1("Sending a 'Get Predecessor' request to %d", ask_to);
684   m_task_t task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
685   MSG_error_t res = MSG_task_send_with_timeout(task_sent, mailbox, timeout);
686
687   if (res != MSG_OK) {
688     DEBUG2("Failed to send the 'Get Predecessor' request (task %p) to %d",
689         task_sent, ask_to);
690     MSG_task_destroy(task_sent);
691     task_data_destroy(req_data);
692   }
693   else {
694
695     // receive the answer
696     DEBUG3("Sent 'Get Predecessor' request (task %p) to %d, waiting for the answer on my mailbox '%s'",
697         task_sent, ask_to, req_data->answer_to);
698
699     do {
700       if (node->comm_receive == NULL) { // FIXME simplify this
701         m_task_t task_received = NULL;
702         node->comm_receive = MSG_task_irecv(&task_received, node->mailbox);
703       }
704
705       res = MSG_comm_wait(node->comm_receive, timeout);
706
707       if (res != MSG_OK) {
708         DEBUG2("Failed to receive the answer to my 'Get Predecessor' request (task %p): %d",
709             task_sent, res);
710         stop = 1;
711         //MSG_comm_destroy(node->comm_receive);
712       }
713       else {
714         m_task_t task_received = MSG_comm_get_task(node->comm_receive);
715         task_data_t ans_data = MSG_task_get_data(task_received);
716
717         if (task_received != task_sent) {
718           handle_task(node, task_received);
719         }
720         else {
721           DEBUG3("Received the answer to my 'Get Predecessor' request (task %p): the predecessor of node %d is %d",
722               task_received, ask_to, ans_data->answer_id);
723           predecessor_id = ans_data->answer_id;
724           stop = 1;
725           MSG_task_destroy(task_received);
726           task_data_destroy(req_data);
727         }
728       }
729       node->comm_receive = NULL;
730     } while (!stop);
731   }
732
733   xbt_free(mailbox);
734   return predecessor_id;
735 }
736
737 /**
738  * \brief Returns the closest preceding finger of an id
739  * with respect to the finger table of the current node.
740  * \param node the current node
741  * \param id the id to find
742  * \return the closest preceding finger of that id
743  */
744 int closest_preceding_node(node_t node, int id)
745 {
746   int i;
747   for (i = nb_bits - 1; i >= 0; i--) {
748     if (is_in_interval(node->fingers[i].id, node->id + 1, id - 1)) {
749       return node->fingers[i].id;
750     }
751   }
752   return node->id;
753 }
754
755 /**
756  * \brief This function is called periodically. It checks the immediate
757  * successor of the current node.
758  * \param node the current node
759  */
760 static void stabilize(node_t node)
761 {
762   DEBUG0("Stabilizing node");
763
764   // get the predecessor of my immediate successor
765   int candidate_id;
766   int successor_id = node->fingers[0].id;
767   if (successor_id != node->id) {
768     candidate_id = remote_get_predecessor(node, successor_id);
769   }
770   else {
771     candidate_id = node->pred_id;
772   }
773
774   // this node is a candidate to become my new successor
775   if (candidate_id != -1
776       && is_in_interval(candidate_id, node->id + 1, successor_id - 1)) {
777     set_finger(node, 0, candidate_id);
778   }
779   if (successor_id != node->id) {
780     remote_notify(node, successor_id, node->id);
781   }
782 }
783
784 /**
785  * \brief Notifies the current node that its predecessor may have changed.
786  * \param node the current node
787  * \param candidate_id the possible new predecessor
788  */
789 static void notify(node_t node, int predecessor_candidate_id) {
790
791   if (node->pred_id == -1
792     || is_in_interval(predecessor_candidate_id, node->pred_id + 1, node->id - 1)) {
793
794     set_predecessor(node, predecessor_candidate_id);
795     print_finger_table(node);
796   }
797   else {
798     DEBUG1("I don't have to change my predecessor to %d", predecessor_candidate_id);
799   }
800 }
801
802 /**
803  * \brief Notifies a remote node that its predecessor may have changed.
804  * \param node the current node
805  * \param notify_id id of the node to notify
806  * \param candidate_id the possible new predecessor
807  */
808 static void remote_notify(node_t node, int notify_id, int predecessor_candidate_id) {
809
810   task_data_t req_data = xbt_new0(s_task_data_t, 1);
811   req_data->type = TASK_NOTIFY;
812   req_data->request_id = predecessor_candidate_id;
813   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
814   req_data->answer_to = NULL;
815
816   // send a "Notify" request to notify_id
817   m_task_t task = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
818   DEBUG2("Sending a 'Notify' request (task %p) to %d", task, notify_id);
819   char* mailbox = get_mailbox(notify_id);
820   msg_comm_t comm = MSG_task_isend(task, mailbox);
821   xbt_dynar_push(node->comms, &comm);
822   xbt_free(mailbox);
823 }
824
825 /**
826  * \brief This function is called periodically.
827  * It refreshes the finger table of the current node.
828  * \param node the current node
829  */
830 static void fix_fingers(node_t node) {
831
832   DEBUG0("Fixing fingers");
833   int i = node->next_finger_to_fix;
834   int id = find_successor(node, node->id + powers2[i]);
835   if (id != -1) {
836
837     if (id != node->fingers[i].id) {
838       set_finger(node, i, id);
839       print_finger_table(node);
840     }
841     node->next_finger_to_fix = (i + 1) % nb_bits;
842   }
843 }
844
845 /**
846  * \brief This function is called periodically.
847  * It checks whether the predecessor has failed
848  * \param node the current node
849  */
850 static void check_predecessor(node_t node)
851 {
852   DEBUG0("Checking whether my predecessor is alive");
853   // TODO
854 }
855
856 /**
857  * \brief Main function.
858  */
859 int main(int argc, char *argv[])
860 {
861   if (argc < 3) {
862     printf("Usage: %s [-nb_bits=n] [-timeout=t] platform_file deployment_file\n", argv[0]);
863     printf("example: %s ../msg_platform.xml chord.xml\n", argv[0]);
864     exit(1);
865   }
866
867   MSG_global_init(&argc, argv);
868
869   char **options = &argv[1];
870   while (!strncmp(options[0], "-", 1)) {
871
872     int length = strlen("-nb_bits=");
873     if (!strncmp(options[0], "-nb_bits=", length) && strlen(options[0]) > length) {
874       nb_bits = atoi(options[0] + length);
875       DEBUG1("Set nb_bits to %d", nb_bits);
876     }
877     else {
878
879       length = strlen("-timeout=");
880       if (!strncmp(options[0], "-timeout=", length) && strlen(options[0]) > length) {
881         timeout = atoi(options[0] + length);
882         DEBUG1("Set timeout to %d", timeout);
883       }
884       else {
885         xbt_assert1(0, "Invalid chord option '%s'", options[0]);
886       }
887     }
888     options++;
889   }
890
891   const char* platform_file = options[0];
892   const char* application_file = options[1];
893
894   chord_initialize();
895
896   MSG_set_channel_number(0);
897   MSG_create_environment(platform_file);
898
899   MSG_function_register("node", node);
900   MSG_launch_application(application_file);
901
902   MSG_error_t res = MSG_main();
903   INFO1("Simulation time: %g", MSG_get_clock());
904
905   MSG_clean();
906
907   if (res == MSG_OK)
908     return 0;
909   else
910     return 1;
911 }