Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
prepare MSG_storages_as_dynar function
[simgrid.git] / examples / msg / mc / chord / chord.c
1
2 /* Copyright (c) 2010. The SimGrid Team.
3  * All rights reserved.                                                     */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include <stdio.h>
9 #include "msg/msg.h"
10 #include "xbt/log.h"
11 #include "xbt/asserts.h"
12 #include "simgrid/modelchecker.h"
13 #include "mc/mc.h"
14
15 /** @addtogroup MSG_examples
16  *
17  *  - <b>chord/chord.c: Classical Chord P2P protocol</b>
18  *    This example implements the well known Chord P2P protocol. Its
19  *    main advantage is that it constitute a fully working non-trivial
20  *    example. In addition, its implementation is rather efficient, as 
21  *    demonstrated in http://hal.inria.fr/inria-00602216/
22  */
23
24  
25 XBT_LOG_NEW_DEFAULT_CATEGORY(mc_chord,
26                              "Messages specific for this example");
27
28 #define COMM_SIZE 10
29 #define COMP_SIZE 0
30 #define MAILBOX_NAME_SIZE 10
31
32 static int nb_bits = 24;
33 static int nb_keys = 0;
34 static int timeout = 50;
35 //static int max_simulation_time = 1000;
36 static int periodic_stabilize_delay = 20;
37 static int periodic_fix_fingers_delay = 120;
38 static int periodic_check_predecessor_delay = 120;
39 static int periodic_lookup_delay = 10;
40
41 // Liveness properties verification
42 static int j = 0;
43
44 static int predJ(){
45   return j;
46 }
47
48 //extern long int smx_total_comms;
49
50 /*
51  * Finger element.
52  */
53 typedef struct s_finger {
54   int id;
55   char mailbox[MAILBOX_NAME_SIZE]; // string representation of the id
56 } s_finger_t, *finger_t;
57
58 /*
59  * Node data.
60  */
61 typedef struct s_node {
62   int id;                                 // my id
63   char mailbox[MAILBOX_NAME_SIZE];        // my mailbox name (string representation of the id)
64   s_finger_t *fingers;                    // finger table, of size nb_bits (fingers[0] is my successor)
65   int pred_id;                            // predecessor id
66   char pred_mailbox[MAILBOX_NAME_SIZE];   // predecessor's mailbox name
67   int next_finger_to_fix;                 // index of the next finger to fix in fix_fingers()
68   msg_comm_t comm_receive;                // current communication to receive
69   double last_change_date;                // last time I changed a finger or my predecessor
70 } s_node_t, *node_t;
71
72 /**
73  * Types of tasks exchanged between nodes.
74  */
75 typedef enum {
76   TASK_FIND_SUCCESSOR,
77   TASK_FIND_SUCCESSOR_ANSWER,
78   TASK_GET_PREDECESSOR,
79   TASK_GET_PREDECESSOR_ANSWER,
80   TASK_NOTIFY,
81   TASK_SUCCESSOR_LEAVING,
82   TASK_PREDECESSOR_LEAVING
83 } e_task_type_t;
84
85 /*
86  * Data attached with the tasks sent and received
87  */
88 typedef struct s_task_data {
89   e_task_type_t type;                     // type of task
90   int request_id;                         // id paramater (used by some types of tasks)
91   int request_finger;                     // finger parameter (used by some types of tasks)
92   int answer_id;                          // answer (used by some types of tasks)
93   char answer_to[MAILBOX_NAME_SIZE];      // mailbox to send an answer to (if any)
94   const char* issuer_host_name;           // used for logging
95 } s_task_data_t, *task_data_t;
96
97 static int *powers2;
98
99 // utility functions
100 static void chord_initialize(void);
101 static void chord_exit(void);
102 static int normalize(int id);
103 static int is_in_interval(int id, int start, int end);
104 static void get_mailbox(int host_id, char* mailbox);
105 static void task_free(void* task);
106 static void print_finger_table(node_t node);
107 static void set_finger(node_t node, int finger_index, int id);
108 static void set_predecessor(node_t node, int predecessor_id);
109
110 // process functions
111 static int node(int argc, char *argv[]);
112 static void handle_task(node_t node, msg_task_t task);
113
114 // Chord core
115 static void create(node_t node);
116 static int join(node_t node, int known_id);
117 static void leave(node_t node);
118 static int find_successor(node_t node, int id);
119 static int remote_find_successor(node_t node, int ask_to_id, int id);
120 static int remote_get_predecessor(node_t node, int ask_to_id);
121 static int closest_preceding_node(node_t node, int id);
122 static void stabilize(node_t node);
123 static void notify(node_t node, int predecessor_candidate_id);
124 static void remote_notify(node_t node, int notify_to, int predecessor_candidate_id);
125 static void fix_fingers(node_t node);
126 static void check_predecessor(node_t node);
127 static void random_lookup(node_t);
128 static void quit_notify(node_t node);
129
130
131 /**
132  * \brief Global initialization of the Chord simulation.
133  */
134 static void chord_initialize(void)
135 {
136   // compute the powers of 2 once for all
137   powers2 = xbt_new(int, nb_bits);
138   int pow = 1;
139   int i;
140   for (i = 0; i < nb_bits; i++) {
141     powers2[i] = pow;
142     pow = pow << 1;
143   }
144   nb_keys = pow;
145   XBT_DEBUG("Sets nb_keys to %d", nb_keys);
146 }
147
148 static void chord_exit(void)
149 {
150   xbt_free(powers2);
151 }
152
153 /**
154  * \brief Turns an id into an equivalent id in [0, nb_keys).
155  * \param id an id
156  * \return the corresponding normalized id
157  */
158 static int normalize(int id)
159 {
160   // like id % nb_keys, but works with negatives numbers (and faster)
161   return id & (nb_keys - 1);
162 }
163
164 /**
165  * \brief Returns whether an id belongs to the interval [start, end].
166  *
167  * The parameters are noramlized to make sure they are between 0 and nb_keys - 1).
168  * 1 belongs to [62, 3]
169  * 1 does not belong to [3, 62]
170  * 63 belongs to [62, 3]
171  * 63 does not belong to [3, 62]
172  * 24 belongs to [21, 29]
173  * 24 does not belong to [29, 21]
174  *
175  * \param id id to check
176  * \param start lower bound
177  * \param end upper bound
178  * \return a non-zero value if id in in [start, end]
179  */
180 static int is_in_interval(int id, int start, int end)
181 {
182   id = normalize(id);
183   start = normalize(start);
184   end = normalize(end);
185
186   // make sure end >= start and id >= start
187   if (end < start) {
188     end += nb_keys;
189   }
190
191   if (id < start) {
192     id += nb_keys;
193   }
194
195   return id <= end;
196 }
197
198 /**
199  * \brief Gets the mailbox name of a host given its chord id.
200  * \param node_id id of a node
201  * \param mailbox pointer to where the mailbox name should be written
202  * (there must be enough space)
203  */
204 static void get_mailbox(int node_id, char* mailbox)
205 {
206   snprintf(mailbox, MAILBOX_NAME_SIZE - 1, "%d", node_id);
207 }
208
209 /**
210  * \brief Frees the memory used by a task.
211  * \param task the MSG task to destroy
212  */
213 static void task_free(void* task)
214 {
215   // TODO add a parameter data_free_function to MSG_task_create?
216   if(task != NULL){
217     xbt_free(MSG_task_get_data(task));
218     MSG_task_destroy(task);
219     task = NULL;
220   }
221 }
222
223 /**
224  * \brief Displays the finger table of a node.
225  * \param node a node
226  */
227 static void print_finger_table(node_t node)
228 {
229   if (XBT_LOG_ISENABLED(mc_chord, xbt_log_priority_verbose)) {
230     int i;
231     XBT_VERB("My finger table:");
232     XBT_VERB("Start | Succ ");
233     for (i = 0; i < nb_bits; i++) {
234       XBT_VERB(" %3d  | %3d ", (node->id + powers2[i]) % nb_keys, node->fingers[i].id);
235     }
236     XBT_VERB("Predecessor: %d", node->pred_id);
237   }
238 }
239
240 /**
241  * \brief Sets a finger of the current node.
242  * \param node the current node
243  * \param finger_index index of the finger to set (0 to nb_bits - 1)
244  * \param id the id to set for this finger
245  */
246 static void set_finger(node_t node, int finger_index, int id)
247 {
248   if (id != node->fingers[finger_index].id) {
249     node->fingers[finger_index].id = id;
250     get_mailbox(id, node->fingers[finger_index].mailbox);
251     node->last_change_date = MSG_get_clock();
252     XBT_DEBUG("My new finger #%d is %d", finger_index, id);
253   }
254 }
255
256 /**
257  * \brief Sets the predecessor of the current node.
258  * \param node the current node
259  * \param id the id to predecessor, or -1 to unset the predecessor
260  */
261 static void set_predecessor(node_t node, int predecessor_id)
262 {
263   if (predecessor_id != node->pred_id) {
264     node->pred_id = predecessor_id;
265
266     if (predecessor_id != -1) {
267       get_mailbox(predecessor_id, node->pred_mailbox);
268     }
269     node->last_change_date = MSG_get_clock();
270
271     XBT_DEBUG("My new predecessor is %d", predecessor_id);
272   }
273 }
274
275 /**
276  * \brief Node Function
277  * Arguments:
278  * - my id
279  * - the id of a guy I know in the system (except for the first node)
280  * - the time to sleep before I join (except for the first node)
281  */
282 int node(int argc, char *argv[])
283 {
284   /* Reduce the run size for the MC */
285   if(MC_is_active()){
286     periodic_stabilize_delay = 8;
287     periodic_fix_fingers_delay = 8;
288     periodic_check_predecessor_delay = 8;
289   }
290
291   double init_time = MSG_get_clock();
292   msg_task_t task_received = NULL;
293   int i;
294   int join_success = 0;
295   //double deadline;
296   double next_stabilize_date = init_time + periodic_stabilize_delay;
297   double next_fix_fingers_date = init_time + periodic_fix_fingers_delay;
298   double next_check_predecessor_date = init_time + periodic_check_predecessor_delay;
299   double next_lookup_date = init_time + periodic_lookup_delay;
300
301   //int stab = 0, fix = 0, check = 0, rand_look = 0;
302   int end = 0;
303
304   xbt_assert(argc == 3 || argc == 5, "Wrong number of arguments for this node");
305
306   // initialize my node
307   s_node_t node = {0};
308   node.id = atoi(argv[1]);
309   get_mailbox(node.id, node.mailbox);
310   node.next_finger_to_fix = 0;
311   node.fingers = xbt_new0(s_finger_t, nb_bits);
312   node.last_change_date = init_time;
313
314   for (i = 0; i < nb_bits; i++) {
315     node.fingers[i].id = -1;
316     set_finger(&node, i, node.id);
317   }
318
319   if (argc == 3) { // first ring
320     //deadline = atof(argv[2]);
321     create(&node);
322     join_success = 1;
323   }
324   else {
325     int known_id = atoi(argv[2]);
326     //double sleep_time = atof(argv[3]);
327     //deadline = atof(argv[4]);
328
329     /*
330     // sleep before starting
331     XBT_DEBUG("Let's sleep during %f", sleep_time);
332     MSG_process_sleep(sleep_time);
333     */
334     XBT_DEBUG("Hey! Let's join the system.");
335
336     join_success = join(&node, known_id);
337
338     if(join_success)
339       j = 1;
340
341     //MC_assert(!join_success);
342
343   }
344
345   if (join_success) {
346
347     XBT_INFO("Node %d joined the ring", node.id);
348
349     //while (MSG_get_clock() < init_time + deadline
350     //      && MSG_get_clock() < node.last_change_date + 1000
351     //&& MSG_get_clock() < max_simulation_time) {
352
353     //while (!(stab == 1 || fix == 1 || check == 1 || rand_look == 1)) {
354
355     while(1){
356
357       if (node.comm_receive == NULL) {
358         task_received = NULL;
359         node.comm_receive = MSG_task_irecv(&task_received, node.mailbox);
360         // FIXME: do not make MSG_task_irecv() calls from several functions
361       }
362
363       if (!MSG_comm_test(node.comm_receive)) {
364
365         #ifdef HAVE_MC
366           if(MC_is_active()){
367             if(end == 0 && MC_random()){
368               stabilize(&node);
369               end = 1;
370             }else if(end == 0 && MC_random()){
371               fix_fingers(&node);
372               end = 1;
373             }else if(end == 0 && MC_random()){
374               check_predecessor(&node);
375               end = 1;
376             }else if(end == 0 && MC_random()){
377               random_lookup(&node);
378               end = 1;
379             }else{
380               MSG_process_sleep(5);
381             }
382           }else{
383             if (MSG_get_clock() >= next_stabilize_date) {
384               stabilize(&node);
385               next_stabilize_date = MSG_get_clock() + periodic_stabilize_delay;
386             }
387             else if (MSG_get_clock() >= next_fix_fingers_date) {
388               fix_fingers(&node);
389               next_fix_fingers_date = MSG_get_clock() + periodic_fix_fingers_delay;
390             }
391             else if (MSG_get_clock() >= next_check_predecessor_date) {
392               check_predecessor(&node);
393               next_check_predecessor_date = MSG_get_clock() + periodic_check_predecessor_delay;
394             }
395             else if (MSG_get_clock() >= next_lookup_date) {
396               random_lookup(&node);
397               next_lookup_date = MSG_get_clock() + periodic_lookup_delay;
398             }
399             else {
400               // nothing to do: sleep for a while
401               MSG_process_sleep(5);
402             }
403           }
404         #else
405           if (MSG_get_clock() >= next_stabilize_date) {
406             stabilize(&node);
407             next_stabilize_date = MSG_get_clock() + periodic_stabilize_delay;
408           }
409           else if (MSG_get_clock() >= next_fix_fingers_date) {
410             fix_fingers(&node);
411             next_fix_fingers_date = MSG_get_clock() + periodic_fix_fingers_delay;
412           }
413           else if (MSG_get_clock() >= next_check_predecessor_date) {
414             check_predecessor(&node);
415             next_check_predecessor_date = MSG_get_clock() + periodic_check_predecessor_delay;
416           }
417           else if (MSG_get_clock() >= next_lookup_date) {
418             random_lookup(&node);
419             next_lookup_date = MSG_get_clock() + periodic_lookup_delay;
420           }
421           else {
422             // nothing to do: sleep for a while
423             MSG_process_sleep(5);
424           }
425         #endif
426
427       }else{
428
429         if (node.comm_receive) {
430
431           XBT_INFO("A transfer has occured");
432
433           // a transfer has occured
434
435           msg_error_t status = MSG_comm_get_status(node.comm_receive);
436
437           if (status != MSG_OK) {
438             XBT_INFO("Failed to receive a task. Nevermind.");
439             MSG_comm_destroy(node.comm_receive);
440             node.comm_receive = NULL;
441           }
442           else {
443             // the task was successfully received
444             XBT_INFO("The task was successfully received by node %d", node.id);
445             MSG_comm_destroy(node.comm_receive);
446             node.comm_receive = NULL;
447             handle_task(&node, task_received);
448           }
449         }
450       }
451     }
452
453     if (node.comm_receive) {
454       MSG_comm_destroy(node.comm_receive);
455       node.comm_receive = NULL;
456     }
457
458     // leave the ring
459     leave(&node);
460
461     XBT_INFO("Node %d leave the ring", node.id);
462     //sleep(15);
463   }
464
465   // stop the simulation
466   xbt_free(node.fingers);
467   return 0;
468 }
469
470 /**
471  * \brief This function is called when the current node receives a task.
472  * \param node the current node
473  * \param task the task to handle (don't touch it then:
474  * it will be destroyed, reused or forwarded)
475  */
476 static void handle_task(node_t node, msg_task_t task) {
477
478   XBT_DEBUG("Handling task %p", task);
479   char mailbox[MAILBOX_NAME_SIZE];
480   task_data_t task_data = (task_data_t) MSG_task_get_data(task);
481   e_task_type_t type = task_data->type;
482
483   switch (type) {
484
485     case TASK_FIND_SUCCESSOR:
486       XBT_DEBUG("Receiving a 'Find Successor' request from %s for id %d",
487           task_data->issuer_host_name, task_data->request_id);
488       // is my successor the successor?
489       if (is_in_interval(task_data->request_id, node->id + 1, node->fingers[0].id)) {
490         task_data->type = TASK_FIND_SUCCESSOR_ANSWER;
491         task_data->answer_id = node->fingers[0].id;
492         XBT_DEBUG("Sending back a 'Find Successor Answer' to %s (mailbox %s): the successor of %d is %d",
493             task_data->issuer_host_name,
494             task_data->answer_to,
495             task_data->request_id, task_data->answer_id);
496         MSG_task_dsend(task, task_data->answer_to, task_free);
497       }
498       else {
499         // otherwise, forward the request to the closest preceding finger in my table
500         int closest = closest_preceding_node(node, task_data->request_id);
501         XBT_DEBUG("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d",
502             task_data->request_id, closest);
503         get_mailbox(closest, mailbox);
504         MSG_task_dsend(task, mailbox, task_free);
505       }
506       break;
507
508     case TASK_GET_PREDECESSOR:
509       XBT_DEBUG("Receiving a 'Get Predecessor' request from %s", task_data->issuer_host_name);
510       task_data->type = TASK_GET_PREDECESSOR_ANSWER;
511       task_data->answer_id = node->pred_id;
512       XBT_DEBUG("Sending back a 'Get Predecessor Answer' to %s via mailbox '%s': my predecessor is %d",
513           task_data->issuer_host_name,
514           task_data->answer_to, task_data->answer_id);
515       MSG_task_dsend(task, task_data->answer_to, task_free);
516       break;
517
518     case TASK_NOTIFY:
519       // someone is telling me that he may be my new predecessor
520       XBT_DEBUG("Receiving a 'Notify' request from %s", task_data->issuer_host_name);
521       notify(node, task_data->request_id);
522       task_free(task);
523       break;
524
525     case TASK_PREDECESSOR_LEAVING:
526       // my predecessor is about to quit
527       XBT_DEBUG("Receiving a 'Predecessor Leaving' message from %s", task_data->issuer_host_name);
528       // modify my predecessor
529       set_predecessor(node, task_data->request_id);
530       task_free(task);
531       /*TODO :
532       >> notify my new predecessor
533       >> send a notify_predecessors !!
534        */
535       break;
536
537     case TASK_SUCCESSOR_LEAVING:
538       // my successor is about to quit
539       XBT_DEBUG("Receiving a 'Successor Leaving' message from %s", task_data->issuer_host_name);
540       // modify my successor FIXME : this should be implicit ?
541       set_finger(node, 0, task_data->request_id);
542       task_free(task);
543       /* TODO
544       >> notify my new successor
545       >> update my table & predecessors table */
546       break;
547
548     case TASK_FIND_SUCCESSOR_ANSWER:
549     case TASK_GET_PREDECESSOR_ANSWER:
550       XBT_DEBUG("Ignoring unexpected task of type %d (%p)", (int)type, task);
551       task_free(task);
552       break;
553   }
554 }
555
556 /**
557  * \brief Initializes the current node as the first one of the system.
558  * \param node the current node
559  */
560 static void create(node_t node)
561 {
562   XBT_DEBUG("Create a new Chord ring...");
563   set_predecessor(node, -1); // -1 means that I have no predecessor
564   print_finger_table(node);
565 }
566
567 /**
568  * \brief Makes the current node join the ring, knowing the id of a node
569  * already in the ring
570  * \param node the current node
571  * \param known_id id of a node already in the ring
572  * \return 1 if the join operation succeeded, 0 otherwise
573  */
574 static int join(node_t node, int known_id)
575 {
576   XBT_INFO("Joining the ring with id %d, knowing node %d", node->id, known_id);
577   set_predecessor(node, -1); // no predecessor (yet)
578
579   /*
580   int i;
581   for (i = 0; i < nb_bits; i++) {
582     set_finger(node, i, known_id);
583   }
584   */
585
586   int successor_id = remote_find_successor(node, known_id, node->id);
587   if (successor_id == -1) {
588     XBT_INFO("Cannot join the ring.");
589   }
590   else {
591     set_finger(node, 0, successor_id);
592     print_finger_table(node);
593   }
594
595   return successor_id != -1;
596 }
597
598 /**
599  * \brief Makes the current node quit the system
600  * \param node the current node
601  */
602 static void leave(node_t node)
603 {
604   XBT_DEBUG("Well Guys! I Think it's time for me to quit ;)");
605   quit_notify(node);
606 }
607
608 /*
609  * \brief Notifies the successor and the predecessor of the current node
610  * of the departure
611  * \param node the current node
612  */
613 static void quit_notify(node_t node)
614 {
615   char mailbox[MAILBOX_NAME_SIZE];
616   //send the PREDECESSOR_LEAVING to our successor
617   task_data_t req_data = xbt_new0(s_task_data_t,1);
618   req_data->type = TASK_PREDECESSOR_LEAVING;
619   req_data->request_id = node->pred_id;
620   get_mailbox(node->id, req_data->answer_to);
621   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
622
623   msg_task_t task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
624   XBT_DEBUG("Sending a 'PREDECESSOR_LEAVING' to my successor %d",node->fingers[0].id);
625   MSG_task_send_with_timeout(task_sent, node->fingers[0].mailbox, timeout);
626
627   //send the SUCCESSOR_LEAVING to our predecessor
628   get_mailbox(node->pred_id, mailbox);
629   task_data_t req_data_s = xbt_new0(s_task_data_t,1);
630   req_data_s->type = TASK_SUCCESSOR_LEAVING;
631   req_data_s->request_id = node->fingers[0].id;
632   req_data_s->request_id = node->pred_id;
633   get_mailbox(node->id, req_data_s->answer_to);
634   req_data_s->issuer_host_name = MSG_host_get_name(MSG_host_self());
635
636   msg_task_t task_sent_s = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data_s);
637   XBT_DEBUG("Sending a 'SUCCESSOR_LEAVING' to my predecessor %d",node->pred_id);
638   MSG_task_send_with_timeout(task_sent_s, mailbox, timeout);
639
640 }
641
642 /**
643  * \brief Makes the current node find the successor node of an id.
644  * \param node the current node
645  * \param id the id to find
646  * \return the id of the successor node, or -1 if the request failed
647  */
648 static int find_successor(node_t node, int id)
649 {
650   // is my successor the successor?
651   if (is_in_interval(id, node->id + 1, node->fingers[0].id)) {
652     return node->fingers[0].id;
653   }
654
655   // otherwise, ask the closest preceding finger in my table
656   int closest = closest_preceding_node(node, id);
657   return remote_find_successor(node, closest, id);
658 }
659
660 /**
661  * \brief Asks another node the successor node of an id.
662  * \param node the current node
663  * \param ask_to the node to ask to
664  * \param id the id to find
665  * \return the id of the successor node, or -1 if the request failed
666  */
667 static int remote_find_successor(node_t node, int ask_to, int id)
668 {
669   int successor = -1;
670   int stop = 0;
671   char mailbox[MAILBOX_NAME_SIZE];
672   get_mailbox(ask_to, mailbox);
673   task_data_t req_data = xbt_new0(s_task_data_t, 1);
674   req_data->type = TASK_FIND_SUCCESSOR;
675   req_data->request_id = id;
676   get_mailbox(node->id, req_data->answer_to);
677   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
678
679   // send a "Find Successor" request to ask_to_id
680   msg_task_t task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
681   XBT_DEBUG("Sending a 'Find Successor' request (task %p) to %d for id %d", task_sent, ask_to, id);
682   msg_error_t res = MSG_task_send_with_timeout(task_sent, mailbox, timeout);
683
684   if (res != MSG_OK) {
685     XBT_DEBUG("Failed to send the 'Find Successor' request (task %p) to %d for id %d",
686         task_sent, ask_to, id);
687     task_free(task_sent);
688   }
689   else {
690
691     // receive the answer
692     XBT_DEBUG("Sent a 'Find Successor' request (task %p) to %d for key %d, waiting for the answer",
693         task_sent, ask_to, id);
694
695     do {
696       if (node->comm_receive == NULL) {
697         msg_task_t task_received = NULL;
698         node->comm_receive = MSG_task_irecv(&task_received, node->mailbox);
699       }
700
701       res = MSG_comm_wait(node->comm_receive, timeout);
702
703       if (res != MSG_OK) {
704         XBT_DEBUG("Failed to receive the answer to my 'Find Successor' request (task %p): %d",
705                   task_sent, (int)res);
706         stop = 1;
707         MSG_comm_destroy(node->comm_receive);
708         node->comm_receive = NULL;
709       }
710       else {
711         msg_task_t task_received = MSG_comm_get_task(node->comm_receive);
712         XBT_DEBUG("Received a task (%p)", task_received);
713         task_data_t ans_data = MSG_task_get_data(task_received);
714
715         if (MC_is_active()) {
716           // the model-checker is expected to find a counter-example here. 
717           // 
718           // As you can see in the test right below, task_received is not always equal to task_sent 
719           // (as messages from differing round can interleave). But the previous version of this code 
720           // wrongly assumed that, leading to problems. But this only occured on large platforms,
721           // leading to hardly usable traces. So we used the model-checker to track down the issue, 
722           // and we came down to this test, that explained the bug in a snap.
723           //MC_assert(task_received == task_sent);
724         }
725
726         if (task_received != task_sent) {
727           // this is not the expected answer
728           MSG_comm_destroy(node->comm_receive);
729           node->comm_receive = NULL;
730           handle_task(node, task_received);
731         }
732         else {
733           // this is our answer
734           XBT_DEBUG("Received the answer to my 'Find Successor' request for id %d (task %p): the successor of key %d is %d",
735               ans_data->request_id, task_received, id, ans_data->answer_id);
736           successor = ans_data->answer_id;
737           stop = 1;
738           MSG_comm_destroy(node->comm_receive);
739           node->comm_receive = NULL;
740           task_free(task_received);
741         }
742       }
743     } while (!stop);
744   }
745
746   return successor;
747 }
748
749 /**
750  * \brief Asks another node its predecessor.
751  * \param node the current node
752  * \param ask_to the node to ask to
753  * \return the id of its predecessor node, or -1 if the request failed
754  * (or if the node does not know its predecessor)
755  */
756 static int remote_get_predecessor(node_t node, int ask_to)
757 {
758   int predecessor_id = -1;
759   int stop = 0;
760   char mailbox[MAILBOX_NAME_SIZE];
761   get_mailbox(ask_to, mailbox);
762   task_data_t req_data = xbt_new0(s_task_data_t, 1);
763   req_data->type = TASK_GET_PREDECESSOR;
764   get_mailbox(node->id, req_data->answer_to);
765   req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
766
767   // send a "Get Predecessor" request to ask_to_id
768   XBT_DEBUG("Sending a 'Get Predecessor' request to %d", ask_to);
769   msg_task_t task_sent = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
770   msg_error_t res = MSG_task_send_with_timeout(task_sent, mailbox, timeout);
771
772   if (res != MSG_OK) {
773     XBT_DEBUG("Failed to send the 'Get Predecessor' request (task %p) to %d",
774         task_sent, ask_to);
775     task_free(task_sent);
776   }
777   else {
778
779     // receive the answer
780     XBT_DEBUG("Sent 'Get Predecessor' request (task %p) to %d, waiting for the answer on my mailbox '%s'",
781         task_sent, ask_to, req_data->answer_to);
782
783     do {
784       if (node->comm_receive == NULL) { // FIXME simplify this
785         msg_task_t task_received = NULL;
786         node->comm_receive = MSG_task_irecv(&task_received, node->mailbox);
787       }
788
789       res = MSG_comm_wait(node->comm_receive, timeout);
790
791       if (res != MSG_OK) {
792         XBT_DEBUG("Failed to receive the answer to my 'Get Predecessor' request (task %p): %d",
793             task_sent, (int)res);
794         stop = 1;
795         MSG_comm_destroy(node->comm_receive);
796         node->comm_receive = NULL;
797       }
798       else {
799         msg_task_t task_received = MSG_comm_get_task(node->comm_receive);
800         task_data_t ans_data = MSG_task_get_data(task_received);
801
802         /*if (MC_is_active()) {
803           MC_assert(task_received == task_sent);
804           }*/
805
806         if (task_received != task_sent) {
807           MSG_comm_destroy(node->comm_receive);
808           node->comm_receive = NULL;
809           handle_task(node, task_received);
810         }
811         else {
812           XBT_DEBUG("Received the answer to my 'Get Predecessor' request (task %p): the predecessor of node %d is %d",
813               task_received, ask_to, ans_data->answer_id);
814           predecessor_id = ans_data->answer_id;
815           stop = 1;
816           MSG_comm_destroy(node->comm_receive);
817           node->comm_receive = NULL;
818           task_free(task_received);
819         }
820       }
821     } while (!stop);
822   }
823
824   return predecessor_id;
825 }
826
827 /**
828  * \brief Returns the closest preceding finger of an id
829  * with respect to the finger table of the current node.
830  * \param node the current node
831  * \param id the id to find
832  * \return the closest preceding finger of that id
833  */
834 int closest_preceding_node(node_t node, int id)
835 {
836   int i;
837   for (i = nb_bits - 1; i >= 0; i--) {
838     if (is_in_interval(node->fingers[i].id, node->id + 1, id - 1)) {
839       return node->fingers[i].id;
840     }
841   }
842   return node->id;
843 }
844
845 /**
846  * \brief This function is called periodically. It checks the immediate
847  * successor of the current node.
848  * \param node the current node
849  */
850 static void stabilize(node_t node)
851 {
852   XBT_DEBUG("Stabilizing node %d", node->id);
853
854   // get the predecessor of my immediate successor
855   int candidate_id;
856   int successor_id = node->fingers[0].id;
857   if (successor_id != node->id) {
858     candidate_id = remote_get_predecessor(node, successor_id);
859   }
860   else {
861     candidate_id = node->pred_id;
862   }
863
864   // this node is a candidate to become my new successor
865   if (candidate_id != -1
866       && is_in_interval(candidate_id, node->id + 1, successor_id - 1)) {
867     set_finger(node, 0, candidate_id);
868   }
869   if (successor_id != node->id) {
870     remote_notify(node, successor_id, node->id);
871   }
872 }
873
874 /**
875  * \brief Notifies the current node that its predecessor may have changed.
876  * \param node the current node
877  * \param candidate_id the possible new predecessor
878  */
879 static void notify(node_t node, int predecessor_candidate_id) {
880
881   XBT_DEBUG("Notifying node %d", node->id);
882
883   if (node->pred_id == -1
884     || is_in_interval(predecessor_candidate_id, node->pred_id + 1, node->id - 1)) {
885
886     set_predecessor(node, predecessor_candidate_id);
887     print_finger_table(node);
888   }
889   else {
890     XBT_DEBUG("I don't have to change my predecessor to %d", predecessor_candidate_id);
891   }
892 }
893
894 /**
895  * \brief Notifies a remote node that its predecessor may have changed.
896  * \param node the current node
897  * \param notify_id id of the node to notify
898  * \param candidate_id the possible new predecessor
899  */
900 static void remote_notify(node_t node, int notify_id, int predecessor_candidate_id) {
901
902       task_data_t req_data = xbt_new0(s_task_data_t, 1);
903       req_data->type = TASK_NOTIFY;
904       req_data->request_id = predecessor_candidate_id;
905       req_data->issuer_host_name = MSG_host_get_name(MSG_host_self());
906
907       // send a "Notify" request to notify_id
908       msg_task_t task = MSG_task_create(NULL, COMP_SIZE, COMM_SIZE, req_data);
909       XBT_DEBUG("Sending a 'Notify' request (task %p) to %d", task, notify_id);
910       char mailbox[MAILBOX_NAME_SIZE];
911       get_mailbox(notify_id, mailbox);
912       MSG_task_dsend(task, mailbox, task_free);
913     }
914
915 /**
916  * \brief This function is called periodically.
917  * It refreshes the finger table of the current node.
918  * \param node the current node
919  */
920 static void fix_fingers(node_t node) {
921
922   XBT_DEBUG("Fixing fingers");
923   int i = node->next_finger_to_fix;
924   int id = find_successor(node, node->id + powers2[i]);
925   if (id != -1) {
926
927     if (id != node->fingers[i].id) {
928       set_finger(node, i, id);
929       print_finger_table(node);
930     }
931     node->next_finger_to_fix = (i + 1) % nb_bits;
932   }
933 }
934
935 /**
936  * \brief This function is called periodically.
937  * It checks whether the predecessor has failed
938  * \param node the current node
939  */
940 static void check_predecessor(node_t node)
941 {
942   XBT_DEBUG("Checking whether my predecessor is alive");
943   // TODO
944 }
945
946 /**
947  * \brief Performs a find successor request to a random id.
948  * \param node the current node
949  */
950 static void random_lookup(node_t node)
951 {
952   int id = 1337; // TODO pick a pseudorandom id
953   XBT_DEBUG("Making a lookup request for id %d", id);
954   find_successor(node, id);
955 }
956
957 /**
958  * \brief Main function.
959  */
960 int main(int argc, char *argv[])
961 {
962   MSG_init(&argc, argv);
963   if (argc < 3) {
964     printf("Usage: %s [-nb_bits=n] [-timeout=t] platform_file deployment_file\n", argv[0]);
965     printf("example: %s ../../msg_platform.xml deploy_chord1.xml\n", argv[0]);
966     exit(1);
967   }
968
969   char **options = &argv[1];
970   while (!strncmp(options[0], "-", 1)) {
971
972     int length = strlen("-nb_bits=");
973     if (!strncmp(options[0], "-nb_bits=", length) && strlen(options[0]) > length) {
974       nb_bits = atoi(options[0] + length);
975       XBT_DEBUG("Set nb_bits to %d", nb_bits);
976     }
977     else {
978
979       length = strlen("-timeout=");
980       if (!strncmp(options[0], "-timeout=", length) && strlen(options[0]) > length) {
981         timeout = atoi(options[0] + length);
982         XBT_DEBUG("Set timeout to %d", timeout);
983       }
984       else {
985         xbt_die("Invalid chord option '%s'", options[0]);
986       }
987     }
988     options++;
989   }
990
991   const char* platform_file = options[0];
992   const char* application_file = options[1];
993
994   chord_initialize();
995
996   //MSG_config("model-check/property","promela_join");
997   MC_automaton_new_propositional_symbol("j", &predJ);
998
999   MSG_create_environment(platform_file);
1000   
1001   MSG_function_register("node", node);
1002   MSG_launch_application(application_file);
1003
1004   msg_error_t res = MSG_main();
1005   XBT_INFO("Simulated time: %g", MSG_get_clock());
1006
1007   chord_exit();
1008
1009   if (res == MSG_OK)
1010     return 0;
1011   else
1012     return 1;
1013 }