1 /* Copyright (c) 2006, 2007, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include "xbt/sysdep.h"
11 static int closest_preceding_node(int id);
12 static void check_predecessor(void);
14 XBT_LOG_NEW_DEFAULT_CATEGORY(chord,"Messages specific to this example");
16 typedef enum msg_typus{
26 /*GRAS_DEFINE_TYPE(s_pbio,
33 typedef struct s_pbio pbio_t;*/
35 /*GRAS_DEFINE_TYPE(s_ping,*/
40 typedef struct s_ping ping_t;
42 /*GRAS_DEFINE_TYPE(s_pong,*/
48 typedef struct s_pong pong_t;
50 GRAS_DEFINE_TYPE(s_notify,
57 typedef struct s_notify notify_t;
59 GRAS_DEFINE_TYPE(s_get_suc,
64 typedef struct s_get_suc get_suc_t;
66 GRAS_DEFINE_TYPE(s_rep_suc,
73 typedef struct s_rep_suc rep_suc_t;
75 typedef struct finger_elem{
83 static void register_messages(){
84 /* gras_msgtype_declare("chord",gras_datadesc_by_symbol(s_pbio));*/
85 gras_msgtype_declare("chord_get_suc",gras_datadesc_by_symbol(s_get_suc));
86 gras_msgtype_declare("chord_rep_suc",gras_datadesc_by_symbol(s_rep_suc));
87 gras_msgtype_declare("chord_notify",gras_datadesc_by_symbol(s_notify));
90 /* Global private data */
92 gras_socket_t sock; /* server socket on which I'm listening */
93 int id; /* my id number */
94 char host[1024]; /* my host name */
95 int port; /* port on which I'm listening FIXME */
96 int fingers; /* how many fingers */
97 finger_elem *finger; /* finger table */
98 int next_to_fix; /* next in the finger list to be checked */
99 int pre_id; /* predecessor id */
100 char pre_host[1024]; /* predecessor host */
101 int pre_port; /* predecessor port */
105 int node(int argc,char **argv);
107 /*static int node_cb_chord_handler(gras_socket_t expeditor,void *payload_data){
109 pbio_t pbio_i=*(pbio_t*)payload_data;
111 node_data_t *globals=(node_data_t*)gras_userdata_get();
113 INFO4(">>> %d : received %d message from %s to %d <<<",globals->id,pbio_i.type,gras_socket_peer_name(expeditor),pbio_i.dest);
119 static int node_cb_get_suc_handler(gras_msg_cb_ctx_t ctx,void *payload_data){
120 gras_socket_t expeditor=gras_msg_cb_ctx_from(ctx);
122 get_suc_t incoming=*(get_suc_t*)payload_data;
124 node_data_t *globals=(node_data_t*)gras_userdata_get();
125 INFO2("Received a get_successor message from %s for %d",
126 gras_socket_peer_name(expeditor),incoming.id);
127 if((globals->id==globals->finger[0].id)||
128 (incoming.id>globals->id&&incoming.id<=globals->finger[0].id)){
129 outgoing.id=globals->finger[0].id;
130 snprintf(outgoing.host,1024,globals->finger[0].host);
131 outgoing.port=globals->finger[0].port;
132 INFO0("My successor is his successor!");
134 gras_socket_t temp_sock;
135 int contact=closest_preceding_node(incoming.id);
137 outgoing.id=globals->finger[0].id;
138 snprintf(outgoing.host,1024,globals->finger[0].host);
139 outgoing.port=globals->finger[0].port;
140 INFO0("My successor is his successor!");
142 get_suc_t asking;asking.id=incoming.id;
144 temp_sock=gras_socket_client(globals->finger[contact].host,
145 globals->finger[contact].port);
147 RETHROW0("Unable to connect!: %s");
150 gras_msg_send(temp_sock,"chord_get_suc",&asking);
152 RETHROW0("Unable to ask!: %s");
154 gras_msg_wait(10.,"chord_rep_suc",&temp_sock, &outgoing);
159 gras_msg_send(expeditor,"chord_rep_suc",&outgoing);
160 INFO0("Successor information sent!");
162 RETHROW2("%s:Timeout sending successor information to %s: %s",
163 globals->host,gras_socket_peer_name(expeditor));
165 gras_socket_close(expeditor);
169 static int closest_preceding_node(int id){
170 node_data_t *globals=(node_data_t*)gras_userdata_get();
172 for(i=globals->fingers-1;i>=0;i--){
173 if(globals->finger[i].id>globals->id&&globals->finger[i].id<id){
181 static int node_cb_notify_handler(gras_msg_cb_ctx_t ctx,void *payload_data){
182 gras_socket_t expeditor=gras_msg_cb_ctx_from(ctx);
184 notify_t incoming=*(notify_t*)payload_data;
185 node_data_t *globals=(node_data_t*)gras_userdata_get();
186 INFO2("Received a notifying message from %s as %d",
187 gras_socket_peer_name(expeditor),incoming.id);
188 if(globals->pre_id==-1||
189 (incoming.id>globals->pre_id&&incoming.id<globals->id)){
190 globals->pre_id=incoming.id;
191 snprintf(globals->pre_host,1024,incoming.host);
192 globals->pre_port=incoming.port;
193 INFO0("Set as my new predecessor!");
198 static void fix_fingers(){
199 get_suc_t get_suc_msg;
201 gras_socket_t temp_sock=NULL;
202 gras_socket_t temp_sock2=NULL;
203 rep_suc_t rep_suc_msg;
204 node_data_t *globals=(node_data_t*)gras_userdata_get();
207 temp_sock=gras_socket_client(globals->host,globals->port);
209 RETHROW0("Unable to contact known host: %s");
212 get_suc_msg.id=globals->id;
214 gras_msg_send(temp_sock,"chord_get_suc",&get_suc_msg);
216 gras_socket_close(temp_sock);
217 RETHROW0("Unable to contact known host to get successor!: %s");
221 INFO0("Waiting for reply!");
222 gras_msg_wait(6000,"chord_rep_suc",&temp_sock2, &rep_suc_msg);
224 RETHROW1("%s: Error waiting for successor:%s",globals->host);
226 globals->finger[0].id=rep_suc_msg.id;
227 snprintf(globals->finger[0].host,1024,rep_suc_msg.host);
228 globals->finger[0].port=rep_suc_msg.port;
229 INFO1("→ Finger %d fixed!",globals->next_to_fix);
230 gras_socket_close(temp_sock);
232 globals->next_to_fix=(++globals->next_to_fix==globals->fingers)?
233 0:globals->next_to_fix;
236 static void check_predecessor(){
237 node_data_t *globals = (node_data_t*)gras_userdata_get();
238 gras_socket_t temp_sock;
242 if (globals->pre_id == -1){
246 temp_sock = gras_socket_client( globals->pre_host, globals->pre_port );
248 globals->pre_id = -1;
249 globals->pre_host[0] = 0;
250 globals->pre_port = 0;
255 gras_msg_send( temp_sock, "chord_ping",&ping);
257 globals->pre_id = -1;
258 globals->pre_host[0] = 0;
259 globals->pre_port = 0;
262 gras_msg_wait( 60, "chord_pong", &temp_sock, &pong);
264 globals->pre_id = -1;
265 globals->pre_host[0] = 0;
266 globals->pre_port = 0;
268 gras_socket_close(temp_sock);
271 int node(int argc,char **argv){
272 node_data_t *globals=NULL;
273 gras_socket_t temp_sock=NULL;
274 gras_socket_t temp_sock2=NULL;
275 get_suc_t get_suc_msg;
276 rep_suc_t rep_suc_msg;
286 /* 1. Init the GRAS infrastructure and declare my globals */
287 gras_init(&argc,argv);
289 gras_os_sleep((15-gras_os_getpid())*20);
291 globals=gras_userdata_new(node_data_t);
293 globals->id=atoi(argv[1]);
294 globals->port=atoi(argv[2]);
296 globals->finger=NULL;
298 globals->pre_host[0]=0;
299 globals->pre_port=-1;
301 snprintf(globals->host,1024,gras_os_myname());
306 asprintf(&other_host,"%s",argv[3]);
307 other_port=atoi(argv[4]);
310 globals->sock=gras_socket_server(globals->port);
315 globals->finger=(finger_elem*)calloc(1,sizeof(finger_elem));
316 INFO2("Launching node %s:%d",globals->host,globals->port);
318 INFO0("→Creating ring");
319 globals->finger[0].id=globals->id;
320 snprintf(globals->finger[0].host,1024,globals->host);
321 globals->finger[0].port=globals->port;
323 INFO2("→Known node %s:%d",other_host,other_port);
324 INFO0("→Contacting to determine successor");
326 temp_sock=gras_socket_client(other_host,other_port);
328 RETHROW0("Unable to contact known host: %s");
331 get_suc_msg.id=globals->id;
333 gras_msg_send(temp_sock,"chord_get_suc", &get_suc_msg);
335 gras_socket_close(temp_sock);
336 RETHROW0("Unable to contact known host to get successor!: %s");
340 INFO0("Waiting for reply!");
341 gras_msg_wait(10.,"chord_rep_suc",&temp_sock2, &rep_suc_msg);
343 RETHROW1("%s: Error waiting for successor:%s",globals->host);
345 globals->finger[0].id=rep_suc_msg.id;
346 snprintf(globals->finger[0].host,1024,rep_suc_msg.host);
347 globals->finger[0].port=rep_suc_msg.port;
348 INFO3("→Got successor : %d-%s:%d",globals->finger[0].id,
349 globals->finger[0].host,globals->finger[0].port);
350 gras_socket_close(temp_sock);
352 temp_sock=gras_socket_client(globals->finger[0].host,
353 globals->finger[0].port);
355 RETHROW0("Unable to contact successor: %s");
358 notify_msg.id=globals->id;
359 snprintf(notify_msg.host,1024,globals->host);
360 notify_msg.port=globals->port;
362 gras_msg_send(temp_sock,"chord_notify",¬ify_msg);
364 RETHROW0("Unable to notify successor! %s");
368 gras_cb_register("chord_get_suc", &node_cb_get_suc_handler);
369 gras_cb_register("chord_notify", &node_cb_notify_handler);
370 /*gras_cb_register("chord_ping",&node_cb_ping_handler);*/
371 /* gras_timer_repeat(600.,fix_fingers);*/
376 gras_msg_handle(6000000.0);
382 gras_socket_close(globals->sock);