Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
88987d7aa771c7d3d039c5aa71d82e19dc5fc5f9
[simgrid.git] / examples / msg / chord / chord.c
1
2 /* Copyright (c) 2010. The SimGrid Team.
3  * All rights reserved.                                                     */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include <stdio.h>
9 #include "msg/msg.h"
10 #include "xbt/log.h"
11 #include "xbt/asserts.h"
12 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_chord,
13                              "Messages specific for this msg example");
14
15 #define NB_BITS 16
16 #define NB_KEYS 65536
17 #define COMM_SIZE 10
18 #define COMP_SIZE 0
19 #define TIMEOUT 50
20
21 /**
22  * Finger element.
23  */
24 typedef struct finger {
25   int id;
26   char* mailbox;
27 } s_finger_t, *finger_t;
28
29 /**
30  * Node data.
31  */
32 typedef struct node {
33   int id;                                 // my id
34   char* mailbox;                          // my usual mailbox name
35   s_finger_t fingers[NB_BITS];            // finger table (fingers[0] is my successor)
36   int pred_id;                            // predecessor id
37   char* pred_mailbox;                     // predecessor's mailbox name
38   int next_finger_to_fix;                 // index of the next finger to fix in fix_fingers()
39   msg_comm_t comm_receive;                // current communication to receive
40   xbt_dynar_t comms;                      // current communications being sent
41 } s_node_t, *node_t;
42
43 /**
44  * Types of tasks exchanged between nodes.
45  */
46 typedef enum {
47   TASK_FIND_SUCCESSOR,
48   TASK_FIND_SUCCESSOR_ANSWER,
49   TASK_GET_PREDECESSOR,
50   TASK_GET_PREDECESSOR_ANSWER,
51   TASK_NOTIFY,
52   TASK_SUCCESSOR_LEAVING,
53   TASK_PREDECESSOR_LEAVING
54 } e_task_type_t;
55
56 /**
57  * Data attached with the tasks sent and received
58  */
59 typedef struct task_data {
60   e_task_type_t type;                     // type of task
61   int request_id;                         // id paramater (used by some types of tasks)
62   int request_finger;                     // finger parameter (used by some types of tasks)
63   int answer_id;                          // answer (used by some types of tasks)
64   char* answer_to;                        // mailbox to send an answer to (or NULL)
65   const char* issuer_host_name;           // used for logging
66 } s_task_data_t, *task_data_t;
67
68 static int powers2[NB_BITS];
69
70 // utility functions
71 static void chord_initialize(void);
72 static int normalize(int id);
73 static int is_in_interval(int id, int start, int end);
74 static char* get_mailbox(int host_id);
75 static void task_data_destroy(task_data_t task_data);
76 static void print_finger_table(node_t node);
77 static void set_finger(node_t node, int finger_index, int id);
78 static void set_predecessor(node_t node, int predecessor_id);
79
80 // process functions
81 static int node(int argc, char *argv[]);
82 static void handle_task(node_t node, m_task_t task);
83
84 // Chord core
85 static void create(node_t node);
86 static int join(node_t node, int known_id);
87 static void leave(node_t node);
88 static int find_successor(node_t node, int id);
89 static int remote_find_successor(node_t node, int ask_to_id, int id);
90 static int remote_get_predecessor(node_t node, int ask_to_id);
91 static int closest_preceding_node(node_t node, int id);
92 static void stabilize(node_t node);
93 static void notify(node_t node, int predecessor_candidate_id);
94 static void remote_notify(node_t node, int notify_to, int predecessor_candidate_id);
95 static void fix_fingers(node_t node);
96 static void check_predecessor(node_t node);
97 static void quit_notify(node_t node, int to);
98
99 /**
100  * \brief Global initialization of the Chord simulation.
101  */
102 static void chord_initialize(void)
103 {
104   // compute the powers of 2 once for all
105   int pow = 1;
106   int i;
107   for (i = 0; i < NB_BITS; i++) {
108     powers2[i] = pow;
109     pow = pow << 1;
110   }
111 }
112
113 /**
114  * \brief Turns an id into an equivalent id in [0, NB_KEYS).
115  * \param id an id
116  * \return the corresponding normalized id
117  */
118 static int normalize(int id)
119 {
120   // make sure id >= 0
121   while (id < 0) {
122     id += NB_KEYS;
123   }
124   // make sure id < NB_KEYS
125   id = id % NB_KEYS;
126
127   return id;
128 }
129
130 /**
131  * \brief Returns whether a id belongs to the interval [start, end].
132  *
133  * The parameters are noramlized to make sure they are between 0 and CHORD_NB_KEYS - 1).
134  * 1 belongs to [62, 3]
135  * 1 does not belong to [3, 62]
136  * 63 belongs to [62, 3]
137  * 63 does not belong to [3, 62]
138  * 24 belongs to [21, 29]
139  * 24 does not belong to [29, 21]
140  *
141  * \param id id to check
142  * \param start lower bound
143  * \param end upper bound
144  * \return a non-zero value if id in in [start, end]
145  */
146 static int is_in_interval(int id, int start, int end)
147 {
148   id = normalize(id);
149   start = normalize(start);
150   end = normalize(end);
151
152   // make sure end >= start and id >= start
153   if (end < start) {
154     end += NB_KEYS;
155   }
156
157   if (id < start) {
158     id += NB_KEYS;
159   }
160
161   return id <= end;
162 }
163
164 /**
165  * \brief Gets the mailbox name of a host given its chord id.
166  * \param node_id id of a node
167  * \return the name of its mailbox
168  */
169 static char* get_mailbox(int node_id)
170 {
171   return bprintf("mailbox%d", node_id);
172 }
173
174 /**
175  * \brief Frees the memory used by some task data.
176  * \param task_data the task data to destroy
177  */
178 static void task_data_destroy(task_data_t task_data)
179 {
180   xbt_free(task_data->answer_to);
181   xbt_free(task_data);
182 }
183
184 /**
185  * \brief Displays the finger table of a node.
186  * \param node a node
187  */
188 static void print_finger_table(node_t node)
189 {
190   if (XBT_LOG_ISENABLED(msg_chord, xbt_log_priority_verbose)) {
191     int i;
192     int pow = 1;
193     VERB0("My finger table:");
194     VERB0("Start | Succ ");
195     for (i = 0; i < NB_BITS; i++) {
196       VERB2(" %3d  | %3d ", (node->id + pow) % NB_KEYS, node->fingers[i].id);
197       pow = pow << 1;
198     }
199     VERB1("Predecessor: %d", node->pred_id);
200   }
201 }
202
203 /**
204  * \brief Sets a finger of the current node.
205  * \param node the current node
206  * \param finger_index index of the finger to set (0 to NB_BITS - 1)
207  * \param id the id to set for this finger
208  */
209 static void set_finger(node_t node, int finger_index, int id)
210 {
211   node->fingers[finger_index].id = id;
212   xbt_free(node->fingers[finger_index].mailbox);
213   node->fingers[finger_index].mailbox = get_mailbox(id);
214   DEBUG2("My new finger #%d is %d", finger_index, id);
215 }
216
217 /**
218  * \brief Sets the predecessor of the current node.
219  * \param node the current node
220  * \param id the id to predecessor, or -1 to unset the predecessor
221  */
222 static void set_predecessor(node_t node, int predecessor_id)
223 {
224   node->pred_id = predecessor_id;
225   xbt_free(node->pred_mailbox);
226
227   if (predecessor_id != -1) {
228     node->pred_mailbox = get_mailbox(predecessor_id);
229   }
230
231   DEBUG1("My new predecessor is %d", predecessor_id);
232 }
233
234 /**
235  * \brief Node Function
236  * Arguments:
237  * - my id
238  * - the id of a guy I know in the system (except for the first node)
239  * - the time to sleep before I join (except for the first node)
240  */
241 int node(int argc, char *argv[])
242 {
243   double init_time = MSG_get_clock();
244   m_task_t task = NULL;
245   m_task_t task_received = NULL;
246   msg_comm_t comm_send = NULL;
247   int i;
248   int index;
249   int join_success = 0;
250   double deadline;
251   double next_stabilize_date = init_time + 10;
252   double next_fix_fingers_date = init_time + 10;
253   double next_check_predecessor_date = init_time + 10;
254
255   xbt_assert0(argc == 3 || argc == 5, "Wrong number of arguments for this node");
256
257   // initialize my node
258   s_node_t node = {0};
259   node.id = atoi(argv[1]);
260   node.mailbox = get_mailbox(node.id);
261   node.next_finger_to_fix = 0;
262   node.comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
263
264   for (i = 0; i < NB_BITS; i++) {
265     set_finger(&node, i, node.id);
266   }
267
268   if (argc == 3) { // first ring
269     deadline = atof(argv[2]);
270     create(&node);
271     join_success = 1;
272   }
273   else {
274     int known_id = atoi(argv[2]);
275     double sleep_time = atof(argv[3]);
276     deadline = atof(argv[4]);
277
278     // sleep before starting
279     DEBUG1("Let's sleep during %f", sleep_time);
280     MSG_process_sleep(sleep_time);
281     DEBUG0("Hey! Let's join the system.");
282
283     join_success = join(&node, known_id);
284   }
285
286   if (join_success) {
287     while (MSG_get_clock() < init_time + deadline) {
288
289       if (node.comm_receive == NULL) {
290         task_received = NULL;
291         node.comm_receive = MSG_task_irecv(&task_received, node.mailbox);
292         // FIXME: do not make MSG_task_irecv() calls from several functions
293       }
294
295       if (!MSG_comm_test(node.comm_receive)) {
296
297         // no task was received: make some periodic calls
298         if (MSG_get_clock() >= next_stabilize_date) {
299           stabilize(&node);
300           next_stabilize_date = MSG_get_clock() + 10;
301         }
302         else if (MSG_get_clock() >= next_fix_fingers_date) {
303           fix_fingers(&node);
304           next_fix_fingers_date = MSG_get_clock() + 10;
305         }
306         else if (MSG_get_clock() >= next_check_predecessor_date) {
307           check_predecessor(&node);
308           next_check_predecessor_date = MSG_get_clock() + 10;
309         }
310         else {
311           // nothing to do: sleep for a while
312           MSG_process_sleep(5);
313         }
314       }
315       else {
316         // a transfer has occured
317
318         MSG_error_t status = MSG_comm_get_status(node.comm_receive);
319
320         if (status != MSG_OK) {
321           DEBUG0("Failed to receive a task. Nevermind.");
322           node.comm_receive = NULL;
323         }
324         else {
325           // the task was successfully received
326           MSG_comm_destroy(node.comm_receive);
327           node.comm_receive = NULL;
328           handle_task(&node, task_received);
329         }
330       }
331
332       // see if some communications are finished
333       while ((index = MSG_comm_testany(node.comms)) != -1) {
334         comm_send = xbt_dynar_get_as(node.comms, index, msg_comm_t);
335         MSG_error_t status = MSG_comm_get_status(comm_send);
336         xbt_dynar_remove_at(node.comms, index, &comm_send);
337         DEBUG3("Communication %p is finished with status %d, dynar size is now %lu",
338             comm_send, status, xbt_dynar_length(node.comms));
339         MSG_comm_destroy(comm_send);
340       }
341     }
342
343     // clean unfinished comms sent
344     unsigned int cursor;
345     xbt_dynar_foreach(node.comms, cursor, comm_send) {
346       task = MSG_comm_get_task(comm_send);
347       MSG_task_cancel(task);
348       task_data_destroy(MSG_task_get_data(task));
349       MSG_task_destroy(task);
350       MSG_comm_destroy(comm_send);
351       // FIXME: the task is actually not destroyed because MSG thinks that the other side (whose process is dead) is still using it
352     }
353
354     // leave the ring
355     leave(&node);
356   }
357
358   // stop the simulation
359   xbt_dynar_free(&node.comms);
360   xbt_free(node.mailbox);
361   xbt_free(node.pred_mailbox);
362   for (i = 0; i < NB_BITS - 1; i++) {
363     xbt_free(node.fingers[i].mailbox);
364   }
365   return 0;
366 }
367
368 /**
369  * \brief This function is called when the current node receives a task.
370  * \param node the current node
371  * \param task the task to handle (don't touch it then:
372  * it will be destroyed, reused or forwarded)
373  */
374 static void handle_task(node_t node, m_task_t task) {
375
376   DEBUG1("Handling task %p", task);
377   msg_comm_t comm = NULL;
378   char* mailbox = NULL;
379   task_data_t task_data = (task_data_t) MSG_task_get_data(task);
380   e_task_type_t type = task_data->type;
381
382   switch (type) {
383
384     case TASK_FIND_SUCCESSOR:
385       DEBUG2("Receiving a 'Find Successor' request from %s for id %d",
386           task_data->issuer_host_name, task_data->request_id);
387       // is my successor the successor?
388       if (is_in_interval(task_data->request_id, node->id + 1, node->fingers[0].id)) {
389         task_data->type = TASK_FIND_SUCCESSOR_ANSWER;
390         task_data->answer_id = node->fingers[0].id;
391         DEBUG3("Sending back a 'Find Successor Answer' to %s: the successor of %d is %d",
392             task_data->issuer_host_name,
393             task_data->request_id, task_data->answer_id);
394         comm = MSG_task_isend(task, task_data->answer_to);
395         xbt_dynar_push(node->comms, &comm);
396       }
397       else {
398         // otherwise, forward the request to the closest preceding finger in my table
399         int closest = closest_preceding_node(node, task_data->request_id);
400         DEBUG2("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d",
401             task_data->request_id, closest);
402         mailbox = get_mailbox(closest);
403         comm = MSG_task_isend(task, mailbox);
404         xbt_dynar_push(node->comms, &comm);
405         xbt_free(mailbox);
406       }
407       break;
408
409     case TASK_GET_PREDECESSOR:
410       DEBUG1("Receiving a 'Get Predecessor' request from %s", task_data->issuer_host_name);
411       task_data->type = TASK_GET_PREDECESSOR_ANSWER;
412       task_data->answer_id = node->pred_id;
413       DEBUG3("Sending back a 'Get Predecessor Answer' to %s via mailbox '%s': my predecessor is %d",
414           task_data->issuer_host_name,
415           task_data->answer_to, task_data->answer_id);
416       comm = MSG_task_isend(task, task_data->answer_to);
417       xbt_dynar_push(node->comms, &comm);
418       break;
419
420     case TASK_NOTIFY:
421       // someone is telling me that he may be my new predecessor
422       DEBUG1("Receiving a 'Notify' request from %s", task_data->issuer_host_name);
423       notify(node, task_data->request_id);
424       task_data_destroy(task_data);
425       MSG_task_destroy(task);
426       break;
427
428     case TASK_PREDECESSOR_LEAVING:
429       // my predecessor is about to quit
430       DEBUG1("Receiving a 'Predecessor Leaving' message from %s", task_data->issuer_host_name);
431       // modify my predecessor
432       set_predecessor(node, task_data->request_id);
433       task_data_destroy(task_data);
434       MSG_task_destroy(task);
435       /*TODO :
436       >> notify my new predecessor
437       >> send a notify_predecessors !!
438        */
439       break;
440
441     case TASK_SUCCESSOR_LEAVING:
442       // my successor is about to quit
443       DEBUG1("Receiving a 'Successor Leaving' message from %s", task_data->issuer_host_name);
444       // modify my successor FIXME : this should be implicit ?
445       set_finger(node, 0, task_data->request_id);
446       task_data_destroy(task_data);
447       MSG_task_destroy(task);
448       /* TODO
449       >> notify my new successor
450       >> update my table & predecessors table */
451       break;
452
453     case TASK_FIND_SUCCESSOR_ANSWER:
454     case TASK_GET_PREDECESSOR_ANSWER:
455       DEBUG2("Ignoring unexpected task of type %d (%p)", type, task);
456       break;
457   }
458 }
459
460 /**
461  * \brief Initializes the current node as the first one of the system.
462  * \param node the current node
463  */
464 static void create(node_t node)
465 {
466   DEBUG0("Create a new Chord ring...");
467   set_predecessor(node, -1); // -1 means that I have no predecessor
468   print_finger_table(node);
469 }
470
471 /**
472  * \brief Makes the current node join the ring, knowing the id of a node
473  * already in the ring
474  * \param node the current node
475  * \param known_id id of a node already in the ring
476  * \return 1 if the join operation succeeded, 0 otherwise
477  */
478 static int join(node_t node, int known_id)
479 {
480   INFO2("Joining the ring with id %d, knowing node %d", node->id, known_id);
481   set_predecessor(node, -1); // no predecessor (yet)
482
483   int successor_id = remote_find_successor(node, known_id, node->id);
484   if (successor_id == -1) {
485     INFO0("Cannot join the ring.");
486   }
487   else {
488     set_finger(node, 0, successor_id);
489     print_finger_table(node);
490   }
491
492   return successor_id != -1;
493 }
494
495 /**
496  * \brief Makes the current node quit the system
497  * \param node the current node
498  */
499 static void leave(node_t node)
500 {
501   DEBUG0("Well Guys! I Think it's time for me to quit ;)");
502   quit_notify(node, 1);  // notify to my successor ( >>> 1 );
503   quit_notify(node, -1); // notify my predecessor  ( >>> -1);
504   // TODO ...
505 }
506
507 /*
508  * \brief Notifies the successor or the predecessor of the current node
509  * of the departure
510  * \param node the current node
511  * \param to 1 to notify the successor, -1 to notify the predecessor
512  * FIXME: notify both nodes with only one call
513  */
514 static void quit_notify(node_t node, int to)
515 {
516   /* TODO
517   task_data_t req_data = xbt_new0(s_task_data_t, 1);
518   req_data->request_id = node->id;
519   req_data->successor_id = node->fingers[0].id;
520   req_data->pred_id = node->pred_id;
521   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
522   req_data->answer_to = NULL;
523   const char* task_name = NULL;
524   const char* to_mailbox = NULL;
525   if (to == 1) {    // notify my successor
526     to_mailbox = node->fingers[0].mailbox;
527     INFO2("Telling my Successor %d about my departure via mailbox %s",
528           node->fingers[0].id, to_mailbox);
529     req_data->type = TASK_PREDECESSOR_LEAVING;
530   }
531   else if (to == -1) {    // notify my predecessor
532
533     if (node->pred_id == -1) {
534       return;
535     }
536
537     to_mailbox = node->pred_mailbox;
538     INFO2("Telling my Predecessor %d about my departure via mailbox %s",
539           node->pred_id, to_mailbox);
540     req_data->type = TASK_SUCCESSOR_LEAVING;
541   }
542   m_task_t task = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
543   //char* mailbox = get_mailbox(to_mailbox);
544   msg_comm_t comm = MSG_task_isend(task, to_mailbox);
545   xbt_dynar_push(node->comms, &comm);
546   */
547 }
548
549 /**
550  * \brief Makes the current node find the successor node of an id.
551  * \param node the current node
552  * \param id the id to find
553  * \return the id of the successor node, or -1 if the request failed
554  */
555 static int find_successor(node_t node, int id)
556 {
557   // is my successor the successor?
558   if (is_in_interval(id, node->id + 1, node->fingers[0].id)) {
559     return node->fingers[0].id;
560   }
561
562   // otherwise, ask the closest preceding finger in my table
563   int closest = closest_preceding_node(node, id);
564   return remote_find_successor(node, closest, id);
565 }
566
567 /**
568  * \brief Asks another node the successor node of an id.
569  * \param node the current node
570  * \param ask_to the node to ask to
571  * \param id the id to find
572  * \return the id of the successor node, or -1 if the request failed
573  */
574 static int remote_find_successor(node_t node, int ask_to, int id)
575 {
576   int successor = -1;
577   int stop = 0;
578   char* mailbox = get_mailbox(ask_to);
579   task_data_t req_data = xbt_new0(s_task_data_t, 1);
580   req_data->type = TASK_FIND_SUCCESSOR;
581   req_data->request_id = id;
582   req_data->answer_to = xbt_strdup(node->mailbox);
583   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
584
585   // send a "Find Successor" request to ask_to_id
586   m_task_t task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
587   DEBUG3("Sending a 'Find Successor' request (task %p) to %d for id %d", task_sent, ask_to, id);
588   MSG_error_t res = MSG_task_send_with_timeout(task_sent, mailbox, TIMEOUT);
589
590   if (res != MSG_OK) {
591     DEBUG3("Failed to send the 'Find Successor' request (task %p) to %d for id %d",
592         task_sent, ask_to, id);
593     MSG_task_destroy(task_sent);
594     task_data_destroy(req_data);
595   }
596   else {
597
598     // receive the answer
599     DEBUG3("Sent a 'Find Successor' request (task %p) to %d for key %d, waiting for the answer",
600         task_sent, ask_to, id);
601
602     do {
603       if (node->comm_receive == NULL) {
604         m_task_t task_received = NULL;
605         node->comm_receive = MSG_task_irecv(&task_received, node->mailbox);
606       }
607
608       res = MSG_comm_wait(node->comm_receive, TIMEOUT);
609
610       if (res != MSG_OK) {
611         DEBUG2("Failed to receive the answer to my 'Find Successor' request (task %p): %d",
612             task_sent, res);
613         stop = 1;
614         //MSG_comm_destroy(node->comm_receive);
615       }
616       else {
617         m_task_t task_received = MSG_comm_get_task(node->comm_receive);
618         DEBUG1("Received a task (%p)", task_received);
619         task_data_t ans_data = MSG_task_get_data(task_received);
620
621         if (task_received != task_sent) {
622           // this is not the expected answer
623           handle_task(node, task_received);
624         }
625         else {
626           // this is our answer
627           DEBUG4("Received the answer to my 'Find Successor' request for id %d (task %p): the successor of key %d is %d",
628               ans_data->request_id, task_received, id, ans_data->answer_id);
629           successor = ans_data->answer_id;
630           stop = 1;
631           MSG_task_destroy(task_received);
632           task_data_destroy(req_data);
633         }
634       }
635       node->comm_receive = NULL;
636     } while (!stop);
637   }
638
639   xbt_free(mailbox);
640   return successor;
641 }
642
643 /**
644  * \brief Asks another node its predecessor.
645  * \param node the current node
646  * \param ask_to the node to ask to
647  * \return the id of its predecessor node, or -1 if the request failed
648  * (or if the node does not know its predecessor)
649  */
650 static int remote_get_predecessor(node_t node, int ask_to)
651 {
652   int predecessor_id = -1;
653   int stop = 0;
654   char* mailbox = get_mailbox(ask_to);
655   task_data_t req_data = xbt_new0(s_task_data_t, 1);
656   req_data->type = TASK_GET_PREDECESSOR;
657   req_data->answer_to = xbt_strdup(node->mailbox);
658   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
659
660   // send a "Get Predecessor" request to ask_to_id
661   DEBUG1("Sending a 'Get Predecessor' request to %d", ask_to);
662   m_task_t task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
663   MSG_error_t res = MSG_task_send_with_timeout(task_sent, mailbox, TIMEOUT);
664
665   if (res != MSG_OK) {
666     DEBUG2("Failed to send the 'Get Predecessor' request (task %p) to %d",
667         task_sent, ask_to);
668     MSG_task_destroy(task_sent);
669     task_data_destroy(req_data);
670   }
671   else {
672
673     // receive the answer
674     DEBUG3("Sent 'Get Predecessor' request (task %p) to %d, waiting for the answer on my mailbox '%s'",
675         task_sent, ask_to, req_data->answer_to);
676
677     do {
678       if (node->comm_receive == NULL) { // FIXME simplify this
679         m_task_t task_received = NULL;
680         node->comm_receive = MSG_task_irecv(&task_received, node->mailbox);
681       }
682
683       res = MSG_comm_wait(node->comm_receive, TIMEOUT);
684
685       if (res != MSG_OK) {
686         DEBUG2("Failed to receive the answer to my 'Get Predecessor' request (task %p): %d",
687             task_sent, res);
688         stop = 1;
689         //MSG_comm_destroy(node->comm_receive);
690       }
691       else {
692         m_task_t task_received = MSG_comm_get_task(node->comm_receive);
693         task_data_t ans_data = MSG_task_get_data(task_received);
694
695         if (task_received != task_sent) {
696           handle_task(node, task_received);
697         }
698         else {
699           DEBUG3("Received the answer to my 'Get Predecessor' request (task %p): the predecessor of node %d is %d",
700               task_received, ask_to, ans_data->answer_id);
701           predecessor_id = ans_data->answer_id;
702           stop = 1;
703           MSG_task_destroy(task_received);
704           task_data_destroy(req_data);
705         }
706       }
707       node->comm_receive = NULL;
708     } while (!stop);
709   }
710
711   xbt_free(mailbox);
712   return predecessor_id;
713 }
714
715 /**
716  * \brief Returns the closest preceding finger of an id
717  * with respect to the finger table of the current node.
718  * \param node the current node
719  * \param id the id to find
720  * \return the closest preceding finger of that id
721  */
722 int closest_preceding_node(node_t node, int id)
723 {
724   int i;
725   for (i = NB_BITS - 1; i >= 0; i--) {
726     if (is_in_interval(node->fingers[i].id, node->id + 1, id - 1)) {
727       return node->fingers[i].id;
728     }
729   }
730   return node->id;
731 }
732
733 /**
734  * \brief This function is called periodically. It checks the immediate
735  * successor of the current node.
736  * \param node the current node
737  */
738 static void stabilize(node_t node)
739 {
740   DEBUG0("Stabilizing node");
741
742   // get the predecessor of my immediate successor
743   int candidate_id;
744   int successor_id = node->fingers[0].id;
745   if (successor_id != node->id) {
746     candidate_id = remote_get_predecessor(node, successor_id);
747   }
748   else {
749     candidate_id = node->pred_id;
750   }
751
752   // this node is a candidate to become my new successor
753   if (candidate_id != -1
754       && is_in_interval(candidate_id, node->id + 1, successor_id - 1)) {
755     set_finger(node, 0, candidate_id);
756   }
757   if (successor_id != node->id) {
758     remote_notify(node, successor_id, node->id);
759   }
760 }
761
762 /**
763  * \brief Notifies the current node that its predecessor may have changed.
764  * \param node the current node
765  * \param candidate_id the possible new predecessor
766  */
767 static void notify(node_t node, int predecessor_candidate_id) {
768
769   if (node->pred_id == -1
770     || is_in_interval(predecessor_candidate_id, node->pred_id + 1, node->id - 1)) {
771
772     set_predecessor(node, predecessor_candidate_id);
773     print_finger_table(node);
774   }
775   else {
776     DEBUG1("I don't have to change my predecessor to %d", predecessor_candidate_id);
777   }
778 }
779
780 /**
781  * \brief Notifies a remote node that its predecessor may have changed.
782  * \param node the current node
783  * \param notify_id id of the node to notify
784  * \param candidate_id the possible new predecessor
785  */
786 static void remote_notify(node_t node, int notify_id, int predecessor_candidate_id) {
787
788   task_data_t req_data = xbt_new0(s_task_data_t, 1);
789   req_data->type = TASK_NOTIFY;
790   req_data->request_id = predecessor_candidate_id;
791   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
792   req_data->answer_to = NULL;
793
794   // send a "Notify" request to notify_id
795   m_task_t task = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
796   DEBUG2("Sending a 'Notify' request (task %p) to %d", task, notify_id);
797   char* mailbox = get_mailbox(notify_id);
798   msg_comm_t comm = MSG_task_isend(task, mailbox);
799   xbt_dynar_push(node->comms, &comm);
800   xbt_free(mailbox);
801 }
802
803 /**
804  * \brief This function is called periodically.
805  * It refreshes the finger table of the current node.
806  * \param node the current node
807  */
808 static void fix_fingers(node_t node) {
809
810   DEBUG0("Fixing fingers");
811   int i = node->next_finger_to_fix;
812   int id = find_successor(node, node->id + powers2[i]);
813   if (id != -1) {
814
815     if (id != node->fingers[i].id) {
816       set_finger(node, i, id);
817       print_finger_table(node);
818     }
819     node->next_finger_to_fix = (i + 1) % NB_BITS;
820   }
821 }
822
823 /**
824  * \brief This function is called periodically.
825  * It checks whether the predecessor has failed
826  * \param node the current node
827  */
828 static void check_predecessor(node_t node)
829 {
830   DEBUG0("Checking whether my predecessor is alive");
831   // TODO
832 }
833
834 /**
835  * \brief Main function.
836  */
837 int main(int argc, char *argv[])
838 {
839   if (argc < 3) {
840     printf("Usage: %s platform_file deployment_file\n", argv[0]);
841     printf("example: %s ../msg_platform.xml chord.xml\n", argv[0]);
842     exit(1);
843   }
844
845   const char* platform_file = argv[1];
846   const char* application_file = argv[2];
847
848   chord_initialize();
849
850   MSG_global_init(&argc, argv);
851   MSG_set_channel_number(0);
852   MSG_create_environment(platform_file);
853
854   MSG_function_register("node", node);
855   MSG_launch_application(application_file);
856
857   MSG_error_t res = MSG_main();
858   INFO1("Simulation time: %g", MSG_get_clock());
859
860   MSG_clean();
861
862   if (res == MSG_OK)
863     return 0;
864   else
865     return 1;
866 }