1 /* Copyright (c) 2006, 2007, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include "xbt/sysdep.h"
11 static int closest_preceding_node(int id);
12 static void check_predecessor(void);
14 XBT_LOG_NEW_DEFAULT_CATEGORY(chord, "Messages specific to this example");
16 typedef enum msg_typus {
26 /*XBT_DEFINE_TYPE(s_pbio,
33 typedef struct s_pbio pbio_t;*/
35 /*XBT_DEFINE_TYPE(s_ping,*/
40 typedef struct s_ping ping_t;
42 /*XBT_DEFINE_TYPE(s_pong,*/
48 typedef struct s_pong pong_t;
50 XBT_DEFINE_TYPE(s_notify, struct s_notify {
51 int id; char host[1024]; int port;};);
53 typedef struct s_notify notify_t;
55 XBT_DEFINE_TYPE(s_get_suc, struct s_get_suc {
58 typedef struct s_get_suc get_suc_t;
60 XBT_DEFINE_TYPE(s_rep_suc, struct s_rep_suc {
61 int id; char host[1024]; int port;};);
63 typedef struct s_rep_suc rep_suc_t;
65 typedef struct finger_elem {
73 static void register_messages()
75 /* gras_msgtype_declare("chord",xbt_datadesc_by_symbol(s_pbio));*/
76 gras_msgtype_declare("chord_get_suc",
77 xbt_datadesc_by_symbol(s_get_suc));
78 gras_msgtype_declare("chord_rep_suc",
79 xbt_datadesc_by_symbol(s_rep_suc));
80 gras_msgtype_declare("chord_notify", xbt_datadesc_by_symbol(s_notify));
83 /* Global private data */
85 xbt_socket_t sock; /* server socket on which I'm listening */
86 int id; /* my id number */
87 char host[1024]; /* my host name */
88 int port; /* port on which I'm listening FIXME */
89 int fingers; /* how many fingers */
90 finger_elem *finger; /* finger table */
91 int next_to_fix; /* next in the finger list to be checked */
92 int pre_id; /* predecessor id */
93 char pre_host[1024]; /* predecessor host */
94 int pre_port; /* predecessor port */
98 int node(int argc, char **argv);
100 /*static int node_cb_chord_handler(xbt_socket_t expeditor,void *payload_data){
102 pbio_t pbio_i=*(pbio_t*)payload_data;
104 node_data_t *globals=(node_data_t*)gras_userdata_get();
106 XBT_INFO(">>> %d : received %d message from %s to %d <<<",globals->id,pbio_i.type,xbt_socket_peer_name(expeditor),pbio_i.dest);
112 static int node_cb_get_suc_handler(gras_msg_cb_ctx_t ctx,
115 xbt_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
116 get_suc_t incoming = *(get_suc_t *) payload_data;
118 node_data_t *globals = (node_data_t *) gras_userdata_get();
119 XBT_INFO("Received a get_successor message from %s for %d",
120 xbt_socket_peer_name(expeditor), incoming.id);
121 if ((globals->id == globals->finger[0].id) ||
122 (incoming.id > globals->id
123 && incoming.id <= globals->finger[0].id)) {
124 outgoing.id = globals->finger[0].id;
125 snprintf(outgoing.host, 1024, globals->finger[0].host);
126 outgoing.port = globals->finger[0].port;
127 XBT_INFO("My successor is his successor!");
129 xbt_socket_t temp_sock;
130 int contact = closest_preceding_node(incoming.id);
132 outgoing.id = globals->finger[0].id;
133 snprintf(outgoing.host, 1024, globals->finger[0].host);
134 outgoing.port = globals->finger[0].port;
135 XBT_INFO("My successor is his successor!");
138 asking.id = incoming.id;
140 temp_sock = gras_socket_client(globals->finger[contact].host,
141 globals->finger[contact].port);
144 RETHROWF("Unable to connect!: %s");
147 gras_msg_send(temp_sock, "chord_get_suc", &asking);
150 RETHROWF("Unable to ask!: %s");
152 gras_msg_wait(10., "chord_rep_suc", &temp_sock, &outgoing);
157 gras_msg_send(expeditor, "chord_rep_suc", &outgoing);
158 XBT_INFO("Successor information sent!");
161 RETHROWF("%s:Timeout sending successor information to %s: %s",
162 globals->host, xbt_socket_peer_name(expeditor));
164 gras_socket_close(expeditor);
168 static int closest_preceding_node(int id)
170 node_data_t *globals = (node_data_t *) gras_userdata_get();
172 for (i = globals->fingers - 1; i >= 0; i--) {
173 if (globals->finger[i].id > globals->id && globals->finger[i].id < id) {
181 static int node_cb_notify_handler(gras_msg_cb_ctx_t ctx,
184 xbt_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
186 notify_t incoming = *(notify_t *) payload_data;
187 node_data_t *globals = (node_data_t *) gras_userdata_get();
188 XBT_INFO("Received a notifying message from %s as %d",
189 xbt_socket_peer_name(expeditor), incoming.id);
190 if (globals->pre_id == -1 ||
191 (incoming.id > globals->pre_id && incoming.id < globals->id)) {
192 globals->pre_id = incoming.id;
193 snprintf(globals->pre_host, 1024, incoming.host);
194 globals->pre_port = incoming.port;
195 XBT_INFO("Set as my new predecessor!");
200 static void fix_fingers()
202 get_suc_t get_suc_msg;
203 xbt_socket_t temp_sock = NULL;
204 xbt_socket_t temp_sock2 = NULL;
205 rep_suc_t rep_suc_msg;
206 node_data_t *globals = (node_data_t *) gras_userdata_get();
209 temp_sock = gras_socket_client(globals->host, globals->port);
212 RETHROWF("Unable to contact known host: %s");
215 get_suc_msg.id = globals->id;
217 gras_msg_send(temp_sock, "chord_get_suc", &get_suc_msg);
220 gras_socket_close(temp_sock);
221 RETHROWF("Unable to contact known host to get successor!: %s");
225 XBT_INFO("Waiting for reply!");
226 gras_msg_wait(6000, "chord_rep_suc", &temp_sock2, &rep_suc_msg);
229 RETHROWF("%s: Error waiting for successor:%s", globals->host);
231 globals->finger[0].id = rep_suc_msg.id;
232 snprintf(globals->finger[0].host, 1024, rep_suc_msg.host);
233 globals->finger[0].port = rep_suc_msg.port;
234 XBT_INFO("→ Finger %d fixed!", globals->next_to_fix);
235 gras_socket_close(temp_sock);
237 globals->next_to_fix = (++globals->next_to_fix == globals->fingers) ?
238 0 : globals->next_to_fix;
241 static void check_predecessor()
243 node_data_t *globals = (node_data_t *) gras_userdata_get();
244 xbt_socket_t temp_sock;
248 if (globals->pre_id == -1) {
252 temp_sock = gras_socket_client(globals->pre_host, globals->pre_port);
255 globals->pre_id = -1;
256 globals->pre_host[0] = 0;
257 globals->pre_port = 0;
263 gras_msg_send(temp_sock, "chord_ping", &ping);
266 globals->pre_id = -1;
267 globals->pre_host[0] = 0;
268 globals->pre_port = 0;
272 gras_msg_wait(60, "chord_pong", &temp_sock, &pong);
275 globals->pre_id = -1;
276 globals->pre_host[0] = 0;
277 globals->pre_port = 0;
280 gras_socket_close(temp_sock);
283 int node(int argc, char **argv)
285 node_data_t *globals = NULL;
286 xbt_socket_t temp_sock = NULL;
287 xbt_socket_t temp_sock2 = NULL;
288 get_suc_t get_suc_msg;
289 rep_suc_t rep_suc_msg;
299 /* 1. Init the GRAS infrastructure and declare my globals */
300 gras_init(&argc, argv);
302 gras_os_sleep((15 - gras_os_getpid()) * 20);
304 globals = gras_userdata_new(node_data_t);
306 globals->id = atoi(argv[1]);
307 globals->port = atoi(argv[2]);
308 globals->fingers = 0;
309 globals->finger = NULL;
310 globals->pre_id = -1;
311 globals->pre_host[0] = 0;
312 globals->pre_port = -1;
314 snprintf(globals->host, 1024, gras_os_myname());
319 other_host = xbt_strdup(argv[3]);
320 other_port = atoi(argv[4]);
323 globals->sock = gras_socket_server(globals->port);
328 globals->finger = (finger_elem *) calloc(1, sizeof(finger_elem));
329 XBT_INFO("Launching node %s:%d", globals->host, globals->port);
331 XBT_INFO("→Creating ring");
332 globals->finger[0].id = globals->id;
333 snprintf(globals->finger[0].host, 1024, globals->host);
334 globals->finger[0].port = globals->port;
336 XBT_INFO("→Known node %s:%d", other_host, other_port);
337 XBT_INFO("→Contacting to determine successor");
339 temp_sock = gras_socket_client(other_host, other_port);
342 RETHROWF("Unable to contact known host: %s");
345 get_suc_msg.id = globals->id;
347 gras_msg_send(temp_sock, "chord_get_suc", &get_suc_msg);
350 gras_socket_close(temp_sock);
351 RETHROWF("Unable to contact known host to get successor!: %s");
355 XBT_INFO("Waiting for reply!");
356 gras_msg_wait(10., "chord_rep_suc", &temp_sock2, &rep_suc_msg);
359 RETHROWF("%s: Error waiting for successor:%s", globals->host);
361 globals->finger[0].id = rep_suc_msg.id;
362 snprintf(globals->finger[0].host, 1024, rep_suc_msg.host);
363 globals->finger[0].port = rep_suc_msg.port;
364 XBT_INFO("→Got successor : %d-%s:%d", globals->finger[0].id,
365 globals->finger[0].host, globals->finger[0].port);
366 gras_socket_close(temp_sock);
368 temp_sock = gras_socket_client(globals->finger[0].host,
369 globals->finger[0].port);
372 RETHROWF("Unable to contact successor: %s");
375 notify_msg.id = globals->id;
376 snprintf(notify_msg.host, 1024, globals->host);
377 notify_msg.port = globals->port;
379 gras_msg_send(temp_sock, "chord_notify", ¬ify_msg);
382 RETHROWF("Unable to notify successor! %s");
386 gras_cb_register("chord_get_suc", &node_cb_get_suc_handler);
387 gras_cb_register("chord_notify", &node_cb_notify_handler);
388 /*gras_cb_register("chord_ping",&node_cb_ping_handler); */
389 /* gras_timer_repeat(600.,fix_fingers); */
392 for (l = 0; l < 50; l++) {
394 gras_msg_handle(6000000.0);
402 gras_socket_close(globals->sock);