Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Modify pipol scripts.
[simgrid.git] / examples / msg / chord / chord.c
1
2 /* Copyright (c) 2010. The SimGrid Team.
3  * All rights reserved.                                                     */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include <stdio.h>
9 #include "msg/msg.h"
10 #include "xbt/log.h"
11 #include "xbt/asserts.h"
12 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_chord,
13                              "Messages specific for this msg example");
14
15 #define COMM_SIZE 10
16 #define COMP_SIZE 0
17
18 static int nb_bits = 16;
19 static int nb_keys = 0;
20 static int timeout = 50;
21
22 /**
23  * Finger element.
24  */
25 typedef struct finger {
26   int id;
27   char* mailbox;
28 } s_finger_t, *finger_t;
29
30 /**
31  * Node data.
32  */
33 typedef struct node {
34   int id;                                 // my id
35   char* mailbox;                          // my usual mailbox name
36   s_finger_t *fingers;                    // finger table, of size nb_bits (fingers[0] is my successor)
37   int pred_id;                            // predecessor id
38   char* pred_mailbox;                     // predecessor's mailbox name
39   int next_finger_to_fix;                 // index of the next finger to fix in fix_fingers()
40   msg_comm_t comm_receive;                // current communication to receive
41   xbt_dynar_t comms;                      // current communications being sent
42   double last_change_date;                // last time I changed a finger or my predecessor
43 } s_node_t, *node_t;
44
45 /**
46  * Types of tasks exchanged between nodes.
47  */
48 typedef enum {
49   TASK_FIND_SUCCESSOR,
50   TASK_FIND_SUCCESSOR_ANSWER,
51   TASK_GET_PREDECESSOR,
52   TASK_GET_PREDECESSOR_ANSWER,
53   TASK_NOTIFY,
54   TASK_SUCCESSOR_LEAVING,
55   TASK_PREDECESSOR_LEAVING
56 } e_task_type_t;
57
58 /**
59  * Data attached with the tasks sent and received
60  */
61 typedef struct task_data {
62   e_task_type_t type;                     // type of task
63   int request_id;                         // id paramater (used by some types of tasks)
64   int request_finger;                     // finger parameter (used by some types of tasks)
65   int answer_id;                          // answer (used by some types of tasks)
66   char* answer_to;                        // mailbox to send an answer to (or NULL)
67   const char* issuer_host_name;           // used for logging
68 } s_task_data_t, *task_data_t;
69
70 static int *powers2;
71
72 // utility functions
73 static void chord_initialize(void);
74 static int normalize(int id);
75 static int is_in_interval(int id, int start, int end);
76 static char* get_mailbox(int host_id);
77 static void task_data_destroy(task_data_t task_data);
78 static void print_finger_table(node_t node);
79 static void set_finger(node_t node, int finger_index, int id);
80 static void set_predecessor(node_t node, int predecessor_id);
81
82 // process functions
83 static int node(int argc, char *argv[]);
84 static void handle_task(node_t node, m_task_t task);
85
86 // Chord core
87 static void create(node_t node);
88 static int join(node_t node, int known_id);
89 static void leave(node_t node);
90 static int find_successor(node_t node, int id);
91 static int remote_find_successor(node_t node, int ask_to_id, int id);
92 static int remote_get_predecessor(node_t node, int ask_to_id);
93 static int closest_preceding_node(node_t node, int id);
94 static void stabilize(node_t node);
95 static void notify(node_t node, int predecessor_candidate_id);
96 static void remote_notify(node_t node, int notify_to, int predecessor_candidate_id);
97 static void fix_fingers(node_t node);
98 static void check_predecessor(node_t node);
99 static void quit_notify(node_t node, int to);
100
101 /**
102  * \brief Global initialization of the Chord simulation.
103  */
104 static void chord_initialize(void)
105 {
106   // compute the powers of 2 once for all
107   powers2 = xbt_new(int, nb_bits);
108   int pow = 1;
109   int i;
110   for (i = 0; i < nb_bits; i++) {
111     powers2[i] = pow;
112     pow = pow << 1;
113   }
114   nb_keys = pow;
115   DEBUG1("Sets nb_keys to %d", nb_keys);
116 }
117
118 /**
119  * \brief Turns an id into an equivalent id in [0, nb_keys).
120  * \param id an id
121  * \return the corresponding normalized id
122  */
123 static int normalize(int id)
124 {
125   // make sure id >= 0
126   while (id < 0) {
127     id += nb_keys;
128   }
129   // make sure id < nb_keys
130   id = id % nb_keys;
131
132   return id;
133 }
134
135 /**
136  * \brief Returns whether a id belongs to the interval [start, end].
137  *
138  * The parameters are noramlized to make sure they are between 0 and nb_keys - 1).
139  * 1 belongs to [62, 3]
140  * 1 does not belong to [3, 62]
141  * 63 belongs to [62, 3]
142  * 63 does not belong to [3, 62]
143  * 24 belongs to [21, 29]
144  * 24 does not belong to [29, 21]
145  *
146  * \param id id to check
147  * \param start lower bound
148  * \param end upper bound
149  * \return a non-zero value if id in in [start, end]
150  */
151 static int is_in_interval(int id, int start, int end)
152 {
153   id = normalize(id);
154   start = normalize(start);
155   end = normalize(end);
156
157   // make sure end >= start and id >= start
158   if (end < start) {
159     end += nb_keys;
160   }
161
162   if (id < start) {
163     id += nb_keys;
164   }
165
166   return id <= end;
167 }
168
169 /**
170  * \brief Gets the mailbox name of a host given its chord id.
171  * \param node_id id of a node
172  * \return the name of its mailbox
173  */
174 static char* get_mailbox(int node_id)
175 {
176   return bprintf("mailbox%d", node_id);
177 }
178
179 /**
180  * \brief Frees the memory used by some task data.
181  * \param task_data the task data to destroy
182  */
183 static void task_data_destroy(task_data_t task_data)
184 {
185   xbt_free(task_data->answer_to);
186   xbt_free(task_data);
187 }
188
189 /**
190  * \brief Displays the finger table of a node.
191  * \param node a node
192  */
193 static void print_finger_table(node_t node)
194 {
195   if (XBT_LOG_ISENABLED(msg_chord, xbt_log_priority_verbose)) {
196     int i;
197     int pow = 1;
198     VERB0("My finger table:");
199     VERB0("Start | Succ ");
200     for (i = 0; i < nb_bits; i++) {
201       VERB2(" %3d  | %3d ", (node->id + pow) % nb_keys, node->fingers[i].id);
202       pow = pow << 1;
203     }
204     VERB1("Predecessor: %d", node->pred_id);
205   }
206 }
207
208 /**
209  * \brief Sets a finger of the current node.
210  * \param node the current node
211  * \param finger_index index of the finger to set (0 to nb_bits - 1)
212  * \param id the id to set for this finger
213  */
214 static void set_finger(node_t node, int finger_index, int id)
215 {
216   if (id != node->fingers[finger_index].id) {
217     node->fingers[finger_index].id = id;
218     xbt_free(node->fingers[finger_index].mailbox);
219     node->fingers[finger_index].mailbox = get_mailbox(id);
220     node->last_change_date = MSG_get_clock();
221     DEBUG2("My new finger #%d is %d", finger_index, id);
222   }
223 }
224
225 /**
226  * \brief Sets the predecessor of the current node.
227  * \param node the current node
228  * \param id the id to predecessor, or -1 to unset the predecessor
229  */
230 static void set_predecessor(node_t node, int predecessor_id)
231 {
232   if (predecessor_id != node->pred_id) {
233     node->pred_id = predecessor_id;
234     xbt_free(node->pred_mailbox);
235
236     if (predecessor_id != -1) {
237       node->pred_mailbox = get_mailbox(predecessor_id);
238     }
239     node->last_change_date = MSG_get_clock();
240
241     DEBUG1("My new predecessor is %d", predecessor_id);
242   }
243 }
244
245 /**
246  * \brief Node Function
247  * Arguments:
248  * - my id
249  * - the id of a guy I know in the system (except for the first node)
250  * - the time to sleep before I join (except for the first node)
251  */
252 int node(int argc, char *argv[])
253 {
254   double init_time = MSG_get_clock();
255   m_task_t task = NULL;
256   m_task_t task_received = NULL;
257   msg_comm_t comm_send = NULL;
258   int i;
259   int index;
260   int join_success = 0;
261   double deadline;
262   double next_stabilize_date = init_time + 10;
263   double next_fix_fingers_date = init_time + 10;
264   double next_check_predecessor_date = init_time + 10;
265
266   xbt_assert0(argc == 3 || argc == 5, "Wrong number of arguments for this node");
267
268   // initialize my node
269   s_node_t node = {0};
270   node.id = atoi(argv[1]);
271   node.mailbox = get_mailbox(node.id);
272   node.next_finger_to_fix = 0;
273   node.comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
274   node.fingers = xbt_new0(s_finger_t, nb_bits);
275   node.last_change_date = MSG_get_clock();
276
277   for (i = 0; i < nb_bits; i++) {
278     set_finger(&node, i, node.id);
279   }
280
281   if (argc == 3) { // first ring
282     deadline = atof(argv[2]);
283     create(&node);
284     join_success = 1;
285   }
286   else {
287     int known_id = atoi(argv[2]);
288     double sleep_time = atof(argv[3]);
289     deadline = atof(argv[4]);
290
291     // sleep before starting
292     DEBUG1("Let's sleep during %f", sleep_time);
293     MSG_process_sleep(sleep_time);
294     DEBUG0("Hey! Let's join the system.");
295
296     join_success = join(&node, known_id);
297   }
298
299   if (join_success) {
300     while (MSG_get_clock() < init_time + deadline
301         && MSG_get_clock() < node.last_change_date + 1000) {
302
303       if (node.comm_receive == NULL) {
304         task_received = NULL;
305         node.comm_receive = MSG_task_irecv(&task_received, node.mailbox);
306         // FIXME: do not make MSG_task_irecv() calls from several functions
307       }
308
309       if (!MSG_comm_test(node.comm_receive)) {
310
311         // no task was received: make some periodic calls
312         if (MSG_get_clock() >= next_stabilize_date) {
313           stabilize(&node);
314           next_stabilize_date = MSG_get_clock() + 8;
315         }
316         else if (MSG_get_clock() >= next_fix_fingers_date) {
317           fix_fingers(&node);
318           next_fix_fingers_date = MSG_get_clock() + 8;
319         }
320         else if (MSG_get_clock() >= next_check_predecessor_date) {
321           check_predecessor(&node);
322           next_check_predecessor_date = MSG_get_clock() + 8;
323         }
324         else {
325           // nothing to do: sleep for a while
326           MSG_process_sleep(5);
327         }
328       }
329       else {
330         // a transfer has occured
331
332         MSG_error_t status = MSG_comm_get_status(node.comm_receive);
333
334         if (status != MSG_OK) {
335           DEBUG0("Failed to receive a task. Nevermind.");
336           node.comm_receive = NULL;
337         }
338         else {
339           // the task was successfully received
340           MSG_comm_destroy(node.comm_receive);
341           node.comm_receive = NULL;
342           handle_task(&node, task_received);
343         }
344       }
345
346       // see if some communications are finished
347       while ((index = MSG_comm_testany(node.comms)) != -1) {
348         comm_send = xbt_dynar_get_as(node.comms, index, msg_comm_t);
349         MSG_error_t status = MSG_comm_get_status(comm_send);
350         xbt_dynar_remove_at(node.comms, index, &comm_send);
351         DEBUG3("Communication %p is finished with status %d, dynar size is now %lu",
352             comm_send, status, xbt_dynar_length(node.comms));
353         MSG_comm_destroy(comm_send);
354       }
355     }
356
357     // clean unfinished comms sent
358     unsigned int cursor;
359     xbt_dynar_foreach(node.comms, cursor, comm_send) {
360       task = MSG_comm_get_task(comm_send);
361       MSG_task_cancel(task);
362       task_data_destroy(MSG_task_get_data(task));
363       MSG_task_destroy(task);
364       MSG_comm_destroy(comm_send);
365       // FIXME: the task is actually not destroyed because MSG thinks that the other side (whose process is dead) is still using it
366     }
367
368     // leave the ring
369     leave(&node);
370   }
371
372   // stop the simulation
373   xbt_dynar_free(&node.comms);
374   xbt_free(node.mailbox);
375   xbt_free(node.pred_mailbox);
376   for (i = 0; i < nb_bits - 1; i++) {
377     xbt_free(node.fingers[i].mailbox);
378   }
379   xbt_free(node.fingers);
380   return 0;
381 }
382
383 /**
384  * \brief This function is called when the current node receives a task.
385  * \param node the current node
386  * \param task the task to handle (don't touch it then:
387  * it will be destroyed, reused or forwarded)
388  */
389 static void handle_task(node_t node, m_task_t task) {
390
391   DEBUG1("Handling task %p", task);
392   msg_comm_t comm = NULL;
393   char* mailbox = NULL;
394   task_data_t task_data = (task_data_t) MSG_task_get_data(task);
395   e_task_type_t type = task_data->type;
396
397   switch (type) {
398
399     case TASK_FIND_SUCCESSOR:
400       DEBUG2("Receiving a 'Find Successor' request from %s for id %d",
401           task_data->issuer_host_name, task_data->request_id);
402       // is my successor the successor?
403       if (is_in_interval(task_data->request_id, node->id + 1, node->fingers[0].id)) {
404         task_data->type = TASK_FIND_SUCCESSOR_ANSWER;
405         task_data->answer_id = node->fingers[0].id;
406         DEBUG3("Sending back a 'Find Successor Answer' to %s: the successor of %d is %d",
407             task_data->issuer_host_name,
408             task_data->request_id, task_data->answer_id);
409         comm = MSG_task_isend(task, task_data->answer_to);
410         xbt_dynar_push(node->comms, &comm);
411       }
412       else {
413         // otherwise, forward the request to the closest preceding finger in my table
414         int closest = closest_preceding_node(node, task_data->request_id);
415         DEBUG2("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d",
416             task_data->request_id, closest);
417         mailbox = get_mailbox(closest);
418         comm = MSG_task_isend(task, mailbox);
419         xbt_dynar_push(node->comms, &comm);
420         xbt_free(mailbox);
421       }
422       break;
423
424     case TASK_GET_PREDECESSOR:
425       DEBUG1("Receiving a 'Get Predecessor' request from %s", task_data->issuer_host_name);
426       task_data->type = TASK_GET_PREDECESSOR_ANSWER;
427       task_data->answer_id = node->pred_id;
428       DEBUG3("Sending back a 'Get Predecessor Answer' to %s via mailbox '%s': my predecessor is %d",
429           task_data->issuer_host_name,
430           task_data->answer_to, task_data->answer_id);
431       comm = MSG_task_isend(task, task_data->answer_to);
432       xbt_dynar_push(node->comms, &comm);
433       break;
434
435     case TASK_NOTIFY:
436       // someone is telling me that he may be my new predecessor
437       DEBUG1("Receiving a 'Notify' request from %s", task_data->issuer_host_name);
438       notify(node, task_data->request_id);
439       task_data_destroy(task_data);
440       MSG_task_destroy(task);
441       break;
442
443     case TASK_PREDECESSOR_LEAVING:
444       // my predecessor is about to quit
445       DEBUG1("Receiving a 'Predecessor Leaving' message from %s", task_data->issuer_host_name);
446       // modify my predecessor
447       set_predecessor(node, task_data->request_id);
448       task_data_destroy(task_data);
449       MSG_task_destroy(task);
450       /*TODO :
451       >> notify my new predecessor
452       >> send a notify_predecessors !!
453        */
454       break;
455
456     case TASK_SUCCESSOR_LEAVING:
457       // my successor is about to quit
458       DEBUG1("Receiving a 'Successor Leaving' message from %s", task_data->issuer_host_name);
459       // modify my successor FIXME : this should be implicit ?
460       set_finger(node, 0, task_data->request_id);
461       task_data_destroy(task_data);
462       MSG_task_destroy(task);
463       /* TODO
464       >> notify my new successor
465       >> update my table & predecessors table */
466       break;
467
468     case TASK_FIND_SUCCESSOR_ANSWER:
469     case TASK_GET_PREDECESSOR_ANSWER:
470       DEBUG2("Ignoring unexpected task of type %d (%p)", type, task);
471       break;
472   }
473 }
474
475 /**
476  * \brief Initializes the current node as the first one of the system.
477  * \param node the current node
478  */
479 static void create(node_t node)
480 {
481   DEBUG0("Create a new Chord ring...");
482   set_predecessor(node, -1); // -1 means that I have no predecessor
483   print_finger_table(node);
484 }
485
486 /**
487  * \brief Makes the current node join the ring, knowing the id of a node
488  * already in the ring
489  * \param node the current node
490  * \param known_id id of a node already in the ring
491  * \return 1 if the join operation succeeded, 0 otherwise
492  */
493 static int join(node_t node, int known_id)
494 {
495   INFO2("Joining the ring with id %d, knowing node %d", node->id, known_id);
496   set_predecessor(node, -1); // no predecessor (yet)
497
498   int successor_id = remote_find_successor(node, known_id, node->id);
499   if (successor_id == -1) {
500     INFO0("Cannot join the ring.");
501   }
502   else {
503     set_finger(node, 0, successor_id);
504     print_finger_table(node);
505   }
506
507   return successor_id != -1;
508 }
509
510 /**
511  * \brief Makes the current node quit the system
512  * \param node the current node
513  */
514 static void leave(node_t node)
515 {
516   DEBUG0("Well Guys! I Think it's time for me to quit ;)");
517   quit_notify(node, 1);  // notify to my successor ( >>> 1 );
518   quit_notify(node, -1); // notify my predecessor  ( >>> -1);
519   // TODO ...
520 }
521
522 /*
523  * \brief Notifies the successor or the predecessor of the current node
524  * of the departure
525  * \param node the current node
526  * \param to 1 to notify the successor, -1 to notify the predecessor
527  * FIXME: notify both nodes with only one call
528  */
529 static void quit_notify(node_t node, int to)
530 {
531   /* TODO
532   task_data_t req_data = xbt_new0(s_task_data_t, 1);
533   req_data->request_id = node->id;
534   req_data->successor_id = node->fingers[0].id;
535   req_data->pred_id = node->pred_id;
536   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
537   req_data->answer_to = NULL;
538   const char* task_name = NULL;
539   const char* to_mailbox = NULL;
540   if (to == 1) {    // notify my successor
541     to_mailbox = node->fingers[0].mailbox;
542     INFO2("Telling my Successor %d about my departure via mailbox %s",
543           node->fingers[0].id, to_mailbox);
544     req_data->type = TASK_PREDECESSOR_LEAVING;
545   }
546   else if (to == -1) {    // notify my predecessor
547
548     if (node->pred_id == -1) {
549       return;
550     }
551
552     to_mailbox = node->pred_mailbox;
553     INFO2("Telling my Predecessor %d about my departure via mailbox %s",
554           node->pred_id, to_mailbox);
555     req_data->type = TASK_SUCCESSOR_LEAVING;
556   }
557   m_task_t task = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
558   //char* mailbox = get_mailbox(to_mailbox);
559   msg_comm_t comm = MSG_task_isend(task, to_mailbox);
560   xbt_dynar_push(node->comms, &comm);
561   */
562 }
563
564 /**
565  * \brief Makes the current node find the successor node of an id.
566  * \param node the current node
567  * \param id the id to find
568  * \return the id of the successor node, or -1 if the request failed
569  */
570 static int find_successor(node_t node, int id)
571 {
572   // is my successor the successor?
573   if (is_in_interval(id, node->id + 1, node->fingers[0].id)) {
574     return node->fingers[0].id;
575   }
576
577   // otherwise, ask the closest preceding finger in my table
578   int closest = closest_preceding_node(node, id);
579   return remote_find_successor(node, closest, id);
580 }
581
582 /**
583  * \brief Asks another node the successor node of an id.
584  * \param node the current node
585  * \param ask_to the node to ask to
586  * \param id the id to find
587  * \return the id of the successor node, or -1 if the request failed
588  */
589 static int remote_find_successor(node_t node, int ask_to, int id)
590 {
591   int successor = -1;
592   int stop = 0;
593   char* mailbox = get_mailbox(ask_to);
594   task_data_t req_data = xbt_new0(s_task_data_t, 1);
595   req_data->type = TASK_FIND_SUCCESSOR;
596   req_data->request_id = id;
597   req_data->answer_to = xbt_strdup(node->mailbox);
598   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
599
600   // send a "Find Successor" request to ask_to_id
601   m_task_t task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
602   DEBUG3("Sending a 'Find Successor' request (task %p) to %d for id %d", task_sent, ask_to, id);
603   MSG_error_t res = MSG_task_send_with_timeout(task_sent, mailbox, timeout);
604
605   if (res != MSG_OK) {
606     DEBUG3("Failed to send the 'Find Successor' request (task %p) to %d for id %d",
607         task_sent, ask_to, id);
608     MSG_task_destroy(task_sent);
609     task_data_destroy(req_data);
610   }
611   else {
612
613     // receive the answer
614     DEBUG3("Sent a 'Find Successor' request (task %p) to %d for key %d, waiting for the answer",
615         task_sent, ask_to, id);
616
617     do {
618       if (node->comm_receive == NULL) {
619         m_task_t task_received = NULL;
620         node->comm_receive = MSG_task_irecv(&task_received, node->mailbox);
621       }
622
623       res = MSG_comm_wait(node->comm_receive, timeout);
624
625       if (res != MSG_OK) {
626         DEBUG2("Failed to receive the answer to my 'Find Successor' request (task %p): %d",
627             task_sent, res);
628         stop = 1;
629         //MSG_comm_destroy(node->comm_receive);
630       }
631       else {
632         m_task_t task_received = MSG_comm_get_task(node->comm_receive);
633         DEBUG1("Received a task (%p)", task_received);
634         task_data_t ans_data = MSG_task_get_data(task_received);
635
636         if (task_received != task_sent) {
637           // this is not the expected answer
638           handle_task(node, task_received);
639         }
640         else {
641           // this is our answer
642           DEBUG4("Received the answer to my 'Find Successor' request for id %d (task %p): the successor of key %d is %d",
643               ans_data->request_id, task_received, id, ans_data->answer_id);
644           successor = ans_data->answer_id;
645           stop = 1;
646           MSG_task_destroy(task_received);
647           task_data_destroy(req_data);
648         }
649       }
650       node->comm_receive = NULL;
651     } while (!stop);
652   }
653
654   xbt_free(mailbox);
655   return successor;
656 }
657
658 /**
659  * \brief Asks another node its predecessor.
660  * \param node the current node
661  * \param ask_to the node to ask to
662  * \return the id of its predecessor node, or -1 if the request failed
663  * (or if the node does not know its predecessor)
664  */
665 static int remote_get_predecessor(node_t node, int ask_to)
666 {
667   int predecessor_id = -1;
668   int stop = 0;
669   char* mailbox = get_mailbox(ask_to);
670   task_data_t req_data = xbt_new0(s_task_data_t, 1);
671   req_data->type = TASK_GET_PREDECESSOR;
672   req_data->answer_to = xbt_strdup(node->mailbox);
673   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
674
675   // send a "Get Predecessor" request to ask_to_id
676   DEBUG1("Sending a 'Get Predecessor' request to %d", ask_to);
677   m_task_t task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
678   MSG_error_t res = MSG_task_send_with_timeout(task_sent, mailbox, timeout);
679
680   if (res != MSG_OK) {
681     DEBUG2("Failed to send the 'Get Predecessor' request (task %p) to %d",
682         task_sent, ask_to);
683     MSG_task_destroy(task_sent);
684     task_data_destroy(req_data);
685   }
686   else {
687
688     // receive the answer
689     DEBUG3("Sent 'Get Predecessor' request (task %p) to %d, waiting for the answer on my mailbox '%s'",
690         task_sent, ask_to, req_data->answer_to);
691
692     do {
693       if (node->comm_receive == NULL) { // FIXME simplify this
694         m_task_t task_received = NULL;
695         node->comm_receive = MSG_task_irecv(&task_received, node->mailbox);
696       }
697
698       res = MSG_comm_wait(node->comm_receive, timeout);
699
700       if (res != MSG_OK) {
701         DEBUG2("Failed to receive the answer to my 'Get Predecessor' request (task %p): %d",
702             task_sent, res);
703         stop = 1;
704         //MSG_comm_destroy(node->comm_receive);
705       }
706       else {
707         m_task_t task_received = MSG_comm_get_task(node->comm_receive);
708         task_data_t ans_data = MSG_task_get_data(task_received);
709
710         if (task_received != task_sent) {
711           handle_task(node, task_received);
712         }
713         else {
714           DEBUG3("Received the answer to my 'Get Predecessor' request (task %p): the predecessor of node %d is %d",
715               task_received, ask_to, ans_data->answer_id);
716           predecessor_id = ans_data->answer_id;
717           stop = 1;
718           MSG_task_destroy(task_received);
719           task_data_destroy(req_data);
720         }
721       }
722       node->comm_receive = NULL;
723     } while (!stop);
724   }
725
726   xbt_free(mailbox);
727   return predecessor_id;
728 }
729
730 /**
731  * \brief Returns the closest preceding finger of an id
732  * with respect to the finger table of the current node.
733  * \param node the current node
734  * \param id the id to find
735  * \return the closest preceding finger of that id
736  */
737 int closest_preceding_node(node_t node, int id)
738 {
739   int i;
740   for (i = nb_bits - 1; i >= 0; i--) {
741     if (is_in_interval(node->fingers[i].id, node->id + 1, id - 1)) {
742       return node->fingers[i].id;
743     }
744   }
745   return node->id;
746 }
747
748 /**
749  * \brief This function is called periodically. It checks the immediate
750  * successor of the current node.
751  * \param node the current node
752  */
753 static void stabilize(node_t node)
754 {
755   DEBUG0("Stabilizing node");
756
757   // get the predecessor of my immediate successor
758   int candidate_id;
759   int successor_id = node->fingers[0].id;
760   if (successor_id != node->id) {
761     candidate_id = remote_get_predecessor(node, successor_id);
762   }
763   else {
764     candidate_id = node->pred_id;
765   }
766
767   // this node is a candidate to become my new successor
768   if (candidate_id != -1
769       && is_in_interval(candidate_id, node->id + 1, successor_id - 1)) {
770     set_finger(node, 0, candidate_id);
771   }
772   if (successor_id != node->id) {
773     remote_notify(node, successor_id, node->id);
774   }
775 }
776
777 /**
778  * \brief Notifies the current node that its predecessor may have changed.
779  * \param node the current node
780  * \param candidate_id the possible new predecessor
781  */
782 static void notify(node_t node, int predecessor_candidate_id) {
783
784   if (node->pred_id == -1
785     || is_in_interval(predecessor_candidate_id, node->pred_id + 1, node->id - 1)) {
786
787     set_predecessor(node, predecessor_candidate_id);
788     print_finger_table(node);
789   }
790   else {
791     DEBUG1("I don't have to change my predecessor to %d", predecessor_candidate_id);
792   }
793 }
794
795 /**
796  * \brief Notifies a remote node that its predecessor may have changed.
797  * \param node the current node
798  * \param notify_id id of the node to notify
799  * \param candidate_id the possible new predecessor
800  */
801 static void remote_notify(node_t node, int notify_id, int predecessor_candidate_id) {
802
803   task_data_t req_data = xbt_new0(s_task_data_t, 1);
804   req_data->type = TASK_NOTIFY;
805   req_data->request_id = predecessor_candidate_id;
806   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
807   req_data->answer_to = NULL;
808
809   // send a "Notify" request to notify_id
810   m_task_t task = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
811   DEBUG2("Sending a 'Notify' request (task %p) to %d", task, notify_id);
812   char* mailbox = get_mailbox(notify_id);
813   msg_comm_t comm = MSG_task_isend(task, mailbox);
814   xbt_dynar_push(node->comms, &comm);
815   xbt_free(mailbox);
816 }
817
818 /**
819  * \brief This function is called periodically.
820  * It refreshes the finger table of the current node.
821  * \param node the current node
822  */
823 static void fix_fingers(node_t node) {
824
825   DEBUG0("Fixing fingers");
826   int i = node->next_finger_to_fix;
827   int id = find_successor(node, node->id + powers2[i]);
828   if (id != -1) {
829
830     if (id != node->fingers[i].id) {
831       set_finger(node, i, id);
832       print_finger_table(node);
833     }
834     node->next_finger_to_fix = (i + 1) % nb_bits;
835   }
836 }
837
838 /**
839  * \brief This function is called periodically.
840  * It checks whether the predecessor has failed
841  * \param node the current node
842  */
843 static void check_predecessor(node_t node)
844 {
845   DEBUG0("Checking whether my predecessor is alive");
846   // TODO
847 }
848
849 /**
850  * \brief Main function.
851  */
852 int main(int argc, char *argv[])
853 {
854   if (argc < 3) {
855     printf("Usage: %s [-nb_bits=n] [-timeout=t] platform_file deployment_file\n", argv[0]);
856     printf("example: %s ../msg_platform.xml chord.xml\n", argv[0]);
857     exit(1);
858   }
859
860   MSG_global_init(&argc, argv);
861
862   char **options = &argv[1];
863   while (!strncmp(options[0], "-", 1)) {
864
865     int length = strlen("-nb_bits=");
866     if (!strncmp(options[0], "-nb_bits=", length) && strlen(options[0]) > length) {
867       nb_bits = atoi(options[0] + length);
868       DEBUG1("Set nb_bits to %d", nb_bits);
869     }
870     else {
871
872       length = strlen("-timeout=");
873       if (!strncmp(options[0], "-timeout=", length) && strlen(options[0]) > length) {
874         timeout = atoi(options[0] + length);
875         DEBUG1("Set timeout to %d", timeout);
876       }
877       else {
878         xbt_assert1(0, "Invalid chord option '%s'", options[0]);
879       }
880     }
881     options++;
882   }
883
884   const char* platform_file = options[0];
885   const char* application_file = options[1];
886
887   chord_initialize();
888
889   MSG_set_channel_number(0);
890   MSG_create_environment(platform_file);
891
892   MSG_function_register("node", node);
893   MSG_launch_application(application_file);
894
895   MSG_error_t res = MSG_main();
896   INFO1("Simulation time: %g", MSG_get_clock());
897
898   MSG_clean();
899
900   if (res == MSG_OK)
901     return 0;
902   else
903     return 1;
904 }