2 * vim:ts=2:sw=2:expandtab
5 #include "xbt/sysdep.h"
8 static int closest_preceding_node(int id);
9 static void check_predecessor(void);
11 XBT_LOG_NEW_DEFAULT_CATEGORY(chord,"Messages specific to this example");
13 typedef enum msg_typus{
23 /*GRAS_DEFINE_TYPE(s_pbio,
30 typedef struct s_pbio pbio_t;*/
32 /*GRAS_DEFINE_TYPE(s_ping,*/
37 typedef struct s_ping ping_t;
39 /*GRAS_DEFINE_TYPE(s_pong,*/
45 typedef struct s_pong pong_t;
47 GRAS_DEFINE_TYPE(s_notify,
54 typedef struct s_notify notify_t;
56 GRAS_DEFINE_TYPE(s_get_suc,
61 typedef struct s_get_suc get_suc_t;
63 GRAS_DEFINE_TYPE(s_rep_suc,
70 typedef struct s_rep_suc rep_suc_t;
72 typedef struct finger_elem{
80 static void register_messages(){
81 /* gras_msgtype_declare("chord",gras_datadesc_by_symbol(s_pbio));*/
82 gras_msgtype_declare("chord_get_suc",gras_datadesc_by_symbol(s_get_suc));
83 gras_msgtype_declare("chord_rep_suc",gras_datadesc_by_symbol(s_rep_suc));
84 gras_msgtype_declare("chord_notify",gras_datadesc_by_symbol(s_notify));
87 /* Global private data */
89 gras_socket_t sock; /* server socket on which I'm listening */
90 int id; /* my id number */
91 char host[1024]; /* my host name */
92 int port; /* port on which I'm listening FIXME */
93 int fingers; /* how many fingers */
94 finger_elem *finger; /* finger table */
95 int next_to_fix; /* next in the finger list to be checked */
96 int pre_id; /* predecessor id */
97 char pre_host[1024]; /* predecessor host */
98 int pre_port; /* predecessor port */
102 int node(int argc,char **argv);
104 /*static int node_cb_chord_handler(gras_socket_t expeditor,void *payload_data){
106 pbio_t pbio_i=*(pbio_t*)payload_data;
108 node_data_t *globals=(node_data_t*)gras_userdata_get();
110 INFO4(">>> %d : received %d message from %s to %d <<<",globals->id,pbio_i.type,gras_socket_peer_name(expeditor),pbio_i.dest);
116 static int node_cb_get_suc_handler(gras_msg_cb_ctx_t ctx,void *payload_data){
117 gras_socket_t expeditor=gras_msg_cb_ctx_from(ctx);
119 get_suc_t incoming=*(get_suc_t*)payload_data;
121 node_data_t *globals=(node_data_t*)gras_userdata_get();
122 INFO2("Received a get_successor message from %s for %d",
123 gras_socket_peer_name(expeditor),incoming.id);
124 if((globals->id==globals->finger[0].id)||
125 (incoming.id>globals->id&&incoming.id<=globals->finger[0].id)){
126 outgoing.id=globals->finger[0].id;
127 snprintf(outgoing.host,1024,globals->finger[0].host);
128 outgoing.port=globals->finger[0].port;
129 INFO0("My successor is his successor!");
131 gras_socket_t temp_sock;
132 int contact=closest_preceding_node(incoming.id);
134 outgoing.id=globals->finger[0].id;
135 snprintf(outgoing.host,1024,globals->finger[0].host);
136 outgoing.port=globals->finger[0].port;
137 INFO0("My successor is his successor!");
139 get_suc_t asking;asking.id=incoming.id;
141 temp_sock=gras_socket_client(globals->finger[contact].host,
142 globals->finger[contact].port);
144 RETHROW0("Unable to connect!: %s");
147 gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),&asking);
149 RETHROW0("Unable to ask!: %s");
151 gras_msg_wait(10.,gras_msgtype_by_name("chord_rep_suc"),&temp_sock,
157 gras_msg_send(expeditor,gras_msgtype_by_name("chord_rep_suc"),&outgoing);
158 INFO0("Successor information sent!");
160 RETHROW2("%s:Timeout sending successor information to %s: %s",
161 globals->host,gras_socket_peer_name(expeditor));
163 gras_socket_close(expeditor);
167 static int closest_preceding_node(int id){
168 node_data_t *globals=(node_data_t*)gras_userdata_get();
170 for(i=globals->fingers-1;i>=0;i--){
171 if(globals->finger[i].id>globals->id&&globals->finger[i].id<id){
179 static int node_cb_notify_handler(gras_msg_cb_ctx_t ctx,void *payload_data){
180 gras_socket_t expeditor=gras_msg_cb_ctx_from(ctx);
182 notify_t incoming=*(notify_t*)payload_data;
183 node_data_t *globals=(node_data_t*)gras_userdata_get();
184 INFO2("Received a notifying message from %s as %d",
185 gras_socket_peer_name(expeditor),incoming.id);
186 if(globals->pre_id==-1||
187 (incoming.id>globals->pre_id&&incoming.id<globals->id)){
188 globals->pre_id=incoming.id;
189 snprintf(globals->pre_host,1024,incoming.host);
190 globals->pre_port=incoming.port;
191 INFO0("Set as my new predecessor!");
196 static void fix_fingers(){
198 gras_socket_t temp_sock=NULL;
199 gras_socket_t temp_sock2=NULL;
200 node_data_t *globals=(node_data_t*)gras_userdata_get();
203 temp_sock=gras_socket_client(globals->host,globals->port);
205 RETHROW0("Unable to contact known host: %s");
207 get_suc_t get_suc_msg;get_suc_msg.id=globals->id;
209 gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),&get_suc_msg);
211 gras_socket_close(temp_sock);
212 RETHROW0("Unable to contact known host to get successor!: %s");
214 rep_suc_t rep_suc_msg;
216 INFO0("Waiting for reply!");
217 gras_msg_wait(6000,gras_msgtype_by_name("chord_rep_suc"),&temp_sock2,
220 RETHROW1("%s: Error waiting for successor:%s",globals->host);
222 globals->finger[0].id=rep_suc_msg.id;
223 snprintf(globals->finger[0].host,1024,rep_suc_msg.host);
224 globals->finger[0].port=rep_suc_msg.port;
225 INFO1("→ Finger %d fixed!",globals->next_to_fix);
226 gras_socket_close(temp_sock);
228 globals->next_to_fix=(++globals->next_to_fix==globals->fingers)?
229 0:globals->next_to_fix;
232 static void check_predecessor(){
233 node_data_t *globals = (node_data_t*)gras_userdata_get();
234 gras_socket_t temp_sock;
236 if (globals->pre_id == -1){
240 temp_sock = gras_socket_client( globals->pre_host, globals->pre_port );
242 globals->pre_id = -1;
243 globals->pre_host[0] = 0;
244 globals->pre_port = 0;
250 gras_msg_send( temp_sock, gras_msgtype_by_name("chord_ping"),&ping);
252 globals->pre_id = -1;
253 globals->pre_host[0] = 0;
254 globals->pre_port = 0;
257 gras_msg_wait( 60, gras_msgtype_by_name("chord_pong"), &temp_sock, &pong);
259 globals->pre_id = -1;
260 globals->pre_host[0] = 0;
261 globals->pre_port = 0;
263 gras_socket_close(temp_sock);
266 int node(int argc,char **argv){
267 node_data_t *globals=NULL;
268 gras_socket_t temp_sock=NULL;
269 gras_socket_t temp_sock2=NULL;
277 /* 1. Init the GRAS infrastructure and declare my globals */
278 gras_init(&argc,argv);
280 gras_os_sleep((15-gras_os_getpid())*20);
282 globals=gras_userdata_new(node_data_t);
284 globals->id=atoi(argv[1]);
285 globals->port=atoi(argv[2]);
287 globals->finger=NULL;
289 globals->pre_host[0]=0;
290 globals->pre_port=-1;
292 snprintf(globals->host,1024,gras_os_myname());
297 asprintf(&other_host,"%s",argv[3]);
298 other_port=atoi(argv[4]);
301 globals->sock=gras_socket_server(globals->port);
306 globals->finger=(finger_elem*)calloc(1,sizeof(finger_elem));
307 INFO2("Launching node %s:%d",globals->host,globals->port);
309 INFO0("→Creating ring");
310 globals->finger[0].id=globals->id;
311 snprintf(globals->finger[0].host,1024,globals->host);
312 globals->finger[0].port=globals->port;
314 INFO2("→Known node %s:%d",other_host,other_port);
315 INFO0("→Contacting to determine successor");
317 temp_sock=gras_socket_client(other_host,other_port);
319 RETHROW0("Unable to contact known host: %s");
321 get_suc_t get_suc_msg;get_suc_msg.id=globals->id;
323 gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),
326 gras_socket_close(temp_sock);
327 RETHROW0("Unable to contact known host to get successor!: %s");
329 rep_suc_t rep_suc_msg;
331 INFO0("Waiting for reply!");
332 gras_msg_wait(10.,gras_msgtype_by_name("chord_rep_suc"),&temp_sock2,
335 RETHROW1("%s: Error waiting for successor:%s",globals->host);
337 globals->finger[0].id=rep_suc_msg.id;
338 snprintf(globals->finger[0].host,1024,rep_suc_msg.host);
339 globals->finger[0].port=rep_suc_msg.port;
340 INFO3("→Got successor : %d-%s:%d",globals->finger[0].id,
341 globals->finger[0].host,globals->finger[0].port);
342 gras_socket_close(temp_sock);
344 temp_sock=gras_socket_client(globals->finger[0].host,
345 globals->finger[0].port);
347 RETHROW0("Unable to contact successor: %s");
350 notify_msg.id=globals->id;
351 snprintf(notify_msg.host,1024,globals->host);
352 notify_msg.port=globals->port;
354 gras_msg_send(temp_sock,gras_msgtype_by_name("chord_notify"),¬ify_msg);
356 RETHROW0("Unable to notify successor! %s");
360 gras_cb_register("chord_get_suc", &node_cb_get_suc_handler);
361 gras_cb_register("chord_notify", &node_cb_notify_handler);
362 /*gras_cb_register(gras_msgtype_by_name("chord_ping"),&node_cb_ping_handler);*/
363 /* gras_timer_repeat(600.,fix_fingers);*/
368 gras_msg_handle(6000000.0);
374 gras_socket_close(globals->sock);