Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines with new year.
[simgrid.git] / examples / s4u / dht-chord / s4u-dht-chord-node.cpp
1 /* Copyright (c) 2010-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "s4u-dht-chord.hpp"
7
8 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(s4u_chord);
9
10 /* Returns whether an id belongs to the interval [start, end].
11  *
12  * The parameters are normalized to make sure they are between 0 and nb_keys - 1).
13  * 1 belongs to [62, 3]
14  * 1 does not belong to [3, 62]
15  * 63 belongs to [62, 3]
16  * 63 does not belong to [3, 62]
17  * 24 belongs to [21, 29]
18  * 24 does not belong to [29, 21]
19  *
20  * @param id id to check
21  * @param start lower bound
22  * @param end upper bound
23  * @return a non-zero value if id in in [start, end]
24  */
25 static int is_in_interval(int id, int start, int end)
26 {
27   int i = id % nb_keys;
28   int s = start % nb_keys;
29   int e = end % nb_keys;
30
31   // make sure end >= start and id >= start
32   if (e < s) {
33     e += nb_keys;
34   }
35
36   if (i < s) {
37     i += nb_keys;
38   }
39
40   return i <= e;
41 }
42
43 void ChordMessage::destroy(void* message)
44 {
45   delete static_cast<ChordMessage*>(message);
46 }
47
48 /* Initializes the current node as the first one of the system */
49 Node::Node(std::vector<std::string> args)
50 {
51   xbt_assert(args.size() == 3 || args.size() == 5, "Wrong number of arguments for this node");
52
53   // initialize my node
54   id_                = std::stoi(args[1]);
55   stream             = simgrid::s4u::this_actor::get_host()->extension<HostChord>()->getStream();
56   mailbox_           = simgrid::s4u::Mailbox::by_name(std::to_string(id_));
57   next_finger_to_fix = 0;
58   fingers_           = new int[nb_bits];
59
60   for (int i = 0; i < nb_bits; i++) {
61     fingers_[i] = id_;
62   }
63
64   if (args.size() == 3) { // first ring
65     deadline_   = std::stod(args[2]);
66     start_time_ = simgrid::s4u::Engine::get_clock();
67     XBT_DEBUG("Create a new Chord ring...");
68   } else {
69     known_id_   = std::stoi(args[2]);
70     start_time_ = std::stod(args[3]);
71     deadline_   = std::stod(args[4]);
72     XBT_DEBUG("Hey! Let's join the system in %f seconds (shall leave at time %f)", start_time_,
73               start_time_ + deadline_);
74   }
75 }
76
77 Node::~Node()
78 {
79   delete[] fingers_;
80 }
81 /* Makes the current node join the ring, knowing the id of a node already in the ring
82  *
83  * @param known_id id of a node already in the ring
84  * @return true if the join operation succeeded
85  *  */
86
87 void Node::join(int known_id)
88 {
89   XBT_INFO("Joining the ring with id %d, knowing node %d", id_, known_id);
90   setPredecessor(-1); // no predecessor (yet)
91
92   int successor_id = remoteFindSuccessor(known_id, id_);
93   if (successor_id == -1) {
94     XBT_INFO("Cannot join the ring.");
95   } else {
96     setFinger(0, successor_id);
97     printFingerTable();
98     joined = true;
99   }
100 }
101
102 /* Makes the current node quit the system */
103 void Node::leave()
104 {
105   XBT_INFO("Well Guys! I Think it's time for me to leave ;)");
106   notifyAndQuit();
107   joined = false;
108 }
109
110 /* Notifies the successor and the predecessor of the current node before leaving */
111 void Node::notifyAndQuit()
112 {
113   // send the PREDECESSOR_LEAVING to our successor
114   ChordMessage* pred_msg = new ChordMessage(PREDECESSOR_LEAVING);
115   pred_msg->request_id   = pred_id_;
116   pred_msg->answer_to    = mailbox_;
117
118   XBT_DEBUG("Sending a 'PREDECESSOR_LEAVING' to my successor %d", fingers_[0]);
119   try {
120     simgrid::s4u::Mailbox::by_name(std::to_string(fingers_[0]))->put(pred_msg, 10, timeout);
121   } catch (simgrid::TimeoutError& e) {
122     XBT_DEBUG("Timeout expired when sending a 'PREDECESSOR_LEAVING' to my successor %d", fingers_[0]);
123     delete pred_msg;
124   }
125
126   if (pred_id_ != -1 && pred_id_ != id_) {
127     // send the SUCCESSOR_LEAVING to our predecessor (only if I have one that is not me)
128     ChordMessage* succ_msg = new ChordMessage(SUCCESSOR_LEAVING);
129     succ_msg->request_id   = fingers_[0];
130     succ_msg->answer_to    = mailbox_;
131     XBT_DEBUG("Sending a 'SUCCESSOR_LEAVING' to my predecessor %d", pred_id_);
132
133     try {
134       simgrid::s4u::Mailbox::by_name(std::to_string(pred_id_))->put(succ_msg, 10, timeout);
135     } catch (simgrid::TimeoutError& e) {
136       XBT_DEBUG("Timeout expired when sending a 'SUCCESSOR_LEAVING' to my predecessor %d", pred_id_);
137       delete succ_msg;
138     }
139   }
140 }
141
142 /* Performs a find successor request to a random id */
143 void Node::randomLookup()
144 {
145   int res          = id_;
146   int random_index = RngStream_RandInt(stream, 0, nb_bits - 1);
147   int random_id    = fingers_[random_index];
148   XBT_DEBUG("Making a lookup request for id %d", random_id);
149   if (random_id != id_)
150     res = findSuccessor(random_id);
151   XBT_DEBUG("The successor of node %d is %d", random_id, res);
152 }
153
154 /* Sets a finger of the current node.
155  *
156  * @param node the current node
157  * @param finger_index index of the finger to set (0 to nb_bits - 1)
158  * @param id the id to set for this finger
159  */
160 void Node::setFinger(int finger_index, int id)
161 {
162   if (id != fingers_[finger_index]) {
163     fingers_[finger_index] = id;
164     XBT_VERB("My new finger #%d is %d", finger_index, id);
165   }
166 }
167
168 /* Sets the predecessor of the current node.
169  * @param id the id to predecessor, or -1 to unset the predecessor
170  */
171 void Node::setPredecessor(int predecessor_id)
172 {
173   if (predecessor_id != pred_id_) {
174     pred_id_ = predecessor_id;
175     XBT_VERB("My new predecessor is %d", predecessor_id);
176   }
177 }
178
179 /** refreshes the finger table of the current node (called periodically) */
180 void Node::fixFingers()
181 {
182   XBT_DEBUG("Fixing fingers");
183   int id = findSuccessor(id_ + (1U << next_finger_to_fix));
184   if (id != -1) {
185     if (id != fingers_[next_finger_to_fix]) {
186       setFinger(next_finger_to_fix, id);
187       printFingerTable();
188     }
189     next_finger_to_fix = (next_finger_to_fix + 1) % nb_bits;
190   }
191 }
192
193 /** Displays the finger table of a node. */
194 void Node::printFingerTable()
195 {
196   if (XBT_LOG_ISENABLED(s4u_chord, xbt_log_priority_verbose)) {
197     XBT_VERB("My finger table:");
198     XBT_VERB("Start | Succ");
199     for (int i = 0; i < nb_bits; i++) {
200       XBT_VERB(" %3u  | %3d", (id_ + (1U << i)) % nb_keys, fingers_[i]);
201     }
202
203     XBT_VERB("Predecessor: %d", pred_id_);
204   }
205 }
206
207 /* checks whether the predecessor has failed (called periodically) */
208 void Node::checkPredecessor()
209 {
210   XBT_DEBUG("Checking whether my predecessor is alive");
211   void* data = nullptr;
212   if (pred_id_ == -1)
213     return;
214
215   simgrid::s4u::MailboxPtr mailbox        = simgrid::s4u::Mailbox::by_name(std::to_string(pred_id_));
216   simgrid::s4u::MailboxPtr return_mailbox = simgrid::s4u::Mailbox::by_name(std::to_string(id_) + "_is_alive");
217
218   ChordMessage* message = new ChordMessage(PREDECESSOR_ALIVE);
219   message->request_id   = pred_id_;
220   message->answer_to    = return_mailbox;
221
222   XBT_DEBUG("Sending a 'Predecessor Alive' request to my predecessor %d", pred_id_);
223   try {
224     mailbox->put(message, 10, timeout);
225   } catch (simgrid::TimeoutError& e) {
226     XBT_DEBUG("Failed to send the 'Predecessor Alive' request to %d", pred_id_);
227     delete message;
228     return;
229   }
230
231   // receive the answer
232   XBT_DEBUG("Sent 'Predecessor Alive' request to %d, waiting for the answer on my mailbox '%s'", pred_id_,
233             message->answer_to->get_cname());
234   simgrid::s4u::CommPtr comm = return_mailbox->get_async(&data);
235
236   try {
237     comm->wait_for(timeout);
238     XBT_DEBUG("Received the answer to my 'Predecessor Alive': my predecessor %d is alive", pred_id_);
239     delete static_cast<ChordMessage*>(data);
240   } catch (simgrid::TimeoutError& e) {
241     XBT_DEBUG("Failed to receive the answer to my 'Predecessor Alive' request");
242     pred_id_ = -1;
243   }
244 }
245
246 /* Asks its predecessor to a remote node
247  *
248  * @param ask_to the node to ask to
249  * @return the id of its predecessor node, or -1 if the request failed (or if the node does not know its predecessor)
250  */
251 int Node::remoteGetPredecessor(int ask_to)
252 {
253   int predecessor_id                      = -1;
254   void* data                              = nullptr;
255   simgrid::s4u::MailboxPtr mailbox        = simgrid::s4u::Mailbox::by_name(std::to_string(ask_to));
256   simgrid::s4u::MailboxPtr return_mailbox = simgrid::s4u::Mailbox::by_name(std::to_string(id_) + "_pred");
257
258   ChordMessage* message = new ChordMessage(GET_PREDECESSOR);
259   message->request_id   = id_;
260   message->answer_to    = return_mailbox;
261
262   // send a "Get Predecessor" request to ask_to_id
263   XBT_DEBUG("Sending a 'Get Predecessor' request to %d", ask_to);
264   try {
265     mailbox->put(message, 10, timeout);
266   } catch (simgrid::TimeoutError& e) {
267     XBT_DEBUG("Failed to send the 'Get Predecessor' request to %d", ask_to);
268     delete message;
269     return predecessor_id;
270   }
271
272   // receive the answer
273   XBT_DEBUG("Sent 'Get Predecessor' request to %d, waiting for the answer on my mailbox '%s'", ask_to,
274             message->answer_to->get_cname());
275   simgrid::s4u::CommPtr comm = return_mailbox->get_async(&data);
276
277   try {
278     comm->wait_for(timeout);
279     ChordMessage* answer = static_cast<ChordMessage*>(data);
280     XBT_DEBUG("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to,
281               answer->answer_id);
282     predecessor_id = answer->answer_id;
283     delete answer;
284   } catch (simgrid::TimeoutError& e) {
285     XBT_DEBUG("Failed to receive the answer to my 'Get Predecessor' request");
286     delete static_cast<ChordMessage*>(data);
287   }
288
289   return predecessor_id;
290 }
291
292 /* Returns the closest preceding finger of an id with respect to the finger table of the current node.
293  *
294  * @param id the id to find
295  * @return the closest preceding finger of that id
296  */
297 int Node::closestPrecedingFinger(int id)
298 {
299   for (int i = nb_bits - 1; i >= 0; i--) {
300     if (is_in_interval(fingers_[i], id_ + 1, id - 1)) {
301       return fingers_[i];
302     }
303   }
304   return id_;
305 }
306
307 /* Makes the current node find the successor node of an id.
308  *
309  * @param id the id to find
310  * @return the id of the successor node, or -1 if the request failed
311  */
312 int Node::findSuccessor(int id)
313 {
314   // is my successor the successor?
315   if (is_in_interval(id, id_ + 1, fingers_[0])) {
316     return fingers_[0];
317   }
318
319   // otherwise, ask the closest preceding finger in my table
320   return remoteFindSuccessor(closestPrecedingFinger(id), id);
321 }
322
323 int Node::remoteFindSuccessor(int ask_to, int id)
324 {
325   int successor                           = -1;
326   void* data                              = nullptr;
327   simgrid::s4u::MailboxPtr mailbox        = simgrid::s4u::Mailbox::by_name(std::to_string(ask_to));
328   simgrid::s4u::MailboxPtr return_mailbox = simgrid::s4u::Mailbox::by_name(std::to_string(id_) + "_succ");
329
330   ChordMessage* message = new ChordMessage(FIND_SUCCESSOR);
331   message->request_id   = id_;
332   message->answer_to    = return_mailbox;
333
334   // send a "Find Successor" request to ask_to_id
335   XBT_DEBUG("Sending a 'Find Successor' request to %d for id %d", ask_to, id);
336   try {
337     mailbox->put(message, 10, timeout);
338   } catch (simgrid::TimeoutError& e) {
339     XBT_DEBUG("Failed to send the 'Find Successor' request to %d for id %d", ask_to, id_);
340     delete message;
341     return successor;
342   }
343   // receive the answer
344   XBT_DEBUG("Sent a 'Find Successor' request to %d for key %d, waiting for the answer", ask_to, id);
345   simgrid::s4u::CommPtr comm = return_mailbox->get_async(&data);
346
347   try {
348     comm->wait_for(timeout);
349     ChordMessage* answer = static_cast<ChordMessage*>(data);
350     XBT_DEBUG("Received the answer to my 'Find Successor' request for id %d: the successor of key %d is %d",
351               answer->request_id, id_, answer->answer_id);
352     successor = answer->answer_id;
353     delete answer;
354   } catch (simgrid::TimeoutError& e) {
355     XBT_DEBUG("Failed to receive the answer to my 'Find Successor' request");
356     delete static_cast<ChordMessage*>(data);
357   }
358
359   return successor;
360 }
361
362 /* Notifies the current node that its predecessor may have changed. */
363 void Node::notify(int predecessor_candidate_id)
364 {
365   if (pred_id_ == -1 || is_in_interval(predecessor_candidate_id, pred_id_ + 1, id_ - 1)) {
366     setPredecessor(predecessor_candidate_id);
367     printFingerTable();
368   } else {
369     XBT_DEBUG("I don't have to change my predecessor to %d", predecessor_candidate_id);
370   }
371 }
372
373 /* Notifies a remote node that its predecessor may have changed. */
374 void Node::remoteNotify(int notify_id, int predecessor_candidate_id)
375 {
376   ChordMessage* message = new ChordMessage(NOTIFY);
377   message->request_id   = predecessor_candidate_id;
378   message->answer_to    = nullptr;
379
380   // send a "Notify" request to notify_id
381   XBT_DEBUG("Sending a 'Notify' request to %d", notify_id);
382   simgrid::s4u::MailboxPtr mailbox = simgrid::s4u::Mailbox::by_name(std::to_string(notify_id));
383   mailbox->put_init(message, 10)->detach(ChordMessage::destroy);
384 }
385
386 /* This function is called periodically. It checks the immediate successor of the current node. */
387 void Node::stabilize()
388 {
389   XBT_DEBUG("Stabilizing node");
390
391   // get the predecessor of my immediate successor
392   int candidate_id = pred_id_;
393   int successor_id = fingers_[0];
394   if (successor_id != id_)
395     candidate_id = remoteGetPredecessor(successor_id);
396
397   // this node is a candidate to become my new successor
398   if (candidate_id != -1 && is_in_interval(candidate_id, id_ + 1, successor_id - 1)) {
399     setFinger(0, candidate_id);
400   }
401   if (successor_id != id_) {
402     remoteNotify(successor_id, id_);
403   }
404 }
405
406 /* This function is called when a node receives a message.
407  *
408  * @param message the message to handle (don't touch it afterward: it will be destroyed, reused or forwarded)
409  */
410 void Node::handleMessage(ChordMessage* message)
411 {
412   switch (message->type) {
413   case FIND_SUCCESSOR:
414     XBT_DEBUG("Received a 'Find Successor' request from %s for id %d", message->issuer_host_name.c_str(),
415         message->request_id);
416     // is my successor the successor?
417     if (is_in_interval(message->request_id, id_ + 1, fingers_[0])) {
418       message->type = FIND_SUCCESSOR_ANSWER;
419       message->answer_id = fingers_[0];
420       XBT_DEBUG("Sending back a 'Find Successor Answer' to %s (mailbox %s): the successor of %d is %d",
421                 message->issuer_host_name.c_str(), message->answer_to->get_cname(), message->request_id,
422                 message->answer_id);
423       message->answer_to->put_init(message, 10)->detach(ChordMessage::destroy);
424     } else {
425       // otherwise, forward the request to the closest preceding finger in my table
426       int closest = closestPrecedingFinger(message->request_id);
427       XBT_DEBUG("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d",
428           message->request_id, closest);
429       simgrid::s4u::MailboxPtr mailbox = simgrid::s4u::Mailbox::by_name(std::to_string(closest));
430       mailbox->put_init(message, 10)->detach(ChordMessage::destroy);
431     }
432     break;
433
434   case GET_PREDECESSOR:
435     XBT_DEBUG("Receiving a 'Get Predecessor' request from %s", message->issuer_host_name.c_str());
436     message->type = GET_PREDECESSOR_ANSWER;
437     message->answer_id = pred_id_;
438     XBT_DEBUG("Sending back a 'Get Predecessor Answer' to %s via mailbox '%s': my predecessor is %d",
439               message->issuer_host_name.c_str(), message->answer_to->get_cname(), message->answer_id);
440     message->answer_to->put_init(message, 10)->detach(ChordMessage::destroy);
441     break;
442
443   case NOTIFY:
444     // someone is telling me that he may be my new predecessor
445     XBT_DEBUG("Receiving a 'Notify' request from %s", message->issuer_host_name.c_str());
446     notify(message->request_id);
447     delete message;
448     break;
449
450   case PREDECESSOR_LEAVING:
451     // my predecessor is about to quit
452     XBT_DEBUG("Receiving a 'Predecessor Leaving' message from %s", message->issuer_host_name.c_str());
453     // modify my predecessor
454     setPredecessor(message->request_id);
455     delete message;
456     /*TODO :
457       >> notify my new predecessor
458       >> send a notify_predecessors !!
459      */
460     break;
461
462   case SUCCESSOR_LEAVING:
463     // my successor is about to quit
464     XBT_DEBUG("Receiving a 'Successor Leaving' message from %s", message->issuer_host_name.c_str());
465     // modify my successor FIXME : this should be implicit ?
466     setFinger(0, message->request_id);
467     delete message;
468     /* TODO
469        >> notify my new successor
470        >> update my table & predecessors table */
471     break;
472
473   case PREDECESSOR_ALIVE:
474     XBT_DEBUG("Receiving a 'Predecessor Alive' request from %s", message->issuer_host_name.c_str());
475     message->type = PREDECESSOR_ALIVE_ANSWER;
476     XBT_DEBUG("Sending back a 'Predecessor Alive Answer' to %s (mailbox %s)", message->issuer_host_name.c_str(),
477               message->answer_to->get_cname());
478     message->answer_to->put_init(message, 10)->detach(ChordMessage::destroy);
479     break;
480
481   default:
482     XBT_DEBUG("Ignoring unexpected message: %d from %s", message->type, message->issuer_host_name.c_str());
483     delete message;
484   }
485 }