2 * vim:ts=2:sw=2:noexpandtab
5 #include "xbt/sysdep.h"
8 static int closest_preceding_node(int id);
9 static void check_predecessor(void);
11 XBT_LOG_NEW_DEFAULT_CATEGORY(chord,"Messages specific to this example");
13 typedef enum msg_typus{
23 /*GRAS_DEFINE_TYPE(s_pbio,
30 typedef struct s_pbio pbio_t;*/
32 //GRAS_DEFINE_TYPE(s_ping,
37 typedef struct s_ping ping_t;
39 //GRAS_DEFINE_TYPE(s_pong,
45 typedef struct s_pong pong_t;
47 GRAS_DEFINE_TYPE(s_get_suc,
52 typedef struct s_get_suc get_suc_t;
54 GRAS_DEFINE_TYPE(s_rep_suc,
61 typedef struct s_rep_suc rep_suc_t;
63 typedef struct finger_elem{
71 static void register_messages(){
72 /* gras_msgtype_declare("chord",gras_datadesc_by_symbol(s_pbio));*/
73 gras_msgtype_declare("chord_get_suc",gras_datadesc_by_symbol(s_get_suc));
74 gras_msgtype_declare("chord_rep_suc",gras_datadesc_by_symbol(s_rep_suc));
77 /* Global private data */
79 gras_socket_t sock; /* server socket on which I'm listening */
80 int id; /* my id number */
81 char host[1024]; /* my host name */
82 int port; /* port on which I'm listening FIXME */
83 int fingers; /* how many fingers */
84 finger_elem *finger; /* finger table */
85 char pre_host[1024]; /* predecessor host */
86 int pre_port; /* predecessor port */
90 int node(int argc,char **argv);
92 /*static int node_cb_chord_handler(gras_socket_t expeditor,void *payload_data){
94 pbio_t pbio_i=*(pbio_t*)payload_data;
96 node_data_t *globals=(node_data_t*)gras_userdata_get();
98 INFO4(">>> %d : received %d message from %s to %d <<<",globals->id,pbio_i.type,gras_socket_peer_name(expeditor),pbio_i.dest);
104 static int node_cb_get_suc_handler(gras_msg_cb_ctx_t ctx,void *payload_data){
105 gras_socket_t expeditor=gras_msg_cb_ctx_from(ctx);
107 get_suc_t incoming=*(get_suc_t*)payload_data;
109 node_data_t *globals=(node_data_t*)gras_userdata_get();
110 INFO2("Received a get_successor message from %s for %d",gras_socket_peer_name(expeditor),incoming.id);
111 if((globals->id==globals->finger[0].id)||(incoming.id>globals->id&&incoming.id<=globals->finger[0].id)){
112 outgoing.id=globals->finger[0].id;
113 snprintf(outgoing.host,1024,globals->finger[0].host);
114 outgoing.port=globals->finger[0].port;
115 INFO0("My successor is his successor!");
117 gras_socket_t temp_sock;
118 int contact=closest_preceding_node(incoming.id);
119 get_suc_t asking;asking.id=incoming.id;
121 temp_sock=gras_socket_client(globals->finger[contact].host,globals->finger[contact].port);
123 RETHROW0("Unable to connect!: %s");
126 gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),&asking);
128 RETHROW0("Unable to ask!: %s");
130 gras_msg_wait(10.0,gras_msgtype_by_name("chord_rep_suc"),&temp_sock,&outgoing);
134 gras_msg_send(expeditor,gras_msgtype_by_name("chord_rep_suc"),&outgoing);
135 INFO0("Successor information sent!");
137 RETHROW2("%s:Timeout sending successor information to %s: %s",globals->host,gras_socket_peer_name(expeditor));
139 gras_socket_close(expeditor);
143 static int closest_preceding_node(int id){
144 node_data_t *globals=(node_data_t*)gras_userdata_get();
146 for(i=globals->fingers-1;i>=0;i--){
147 if(globals->finger[i].id>globals->id&&globals->finger[i].id<id){
155 static void check_predecessor() {
156 node_data_t *globals = (node_data_t*)gras_userdata_get();
157 gras_socket_t temp_sock;
159 if (globals->pre_host[0] == 0)
165 temp_sock = gras_socket_client( globals->pre_host, globals->pre_port );
169 globals->pre_host[0] = 0;
170 globals->pre_port = 0;
177 gras_msg_send( temp_sock, gras_msgtype_by_name("chord_ping"),&ping_kong);
181 globals->pre_host[0] = 0;
182 globals->pre_port = 0;
186 gras_msg_wait( 6969, gras_msgtype_by_name("chord_pong"), &temp_sock, &king_pong);
190 globals->pre_host[0] = 0;
191 globals->pre_port = 0;
193 gras_socket_close(temp_sock);
196 int node(int argc,char **argv){
197 node_data_t *globals=NULL;
198 gras_socket_t temp_sock=NULL;
199 gras_socket_t temp_sock2=NULL;
207 /* 1. Init the GRAS infrastructure and declare my globals */
208 gras_init(&argc,argv);
210 gras_os_sleep((15-gras_os_getpid())*20);
212 globals=gras_userdata_new(node_data_t);
214 globals->id=atoi(argv[1]);
215 globals->port=atoi(argv[2]);
217 globals->finger=NULL;
218 globals->pre_host[0]=0;
219 globals->pre_port=-1;
221 snprintf(globals->host,1024,gras_os_myname());
226 asprintf(&other_host,"%s",argv[3]);
227 other_port=atoi(argv[4]);
230 globals->sock=gras_socket_server(globals->port);
236 globals->finger=(finger_elem*)calloc(1,sizeof(finger_elem));
237 INFO2("Launching node %s:%d",globals->host,globals->port);
239 INFO0("→Creating ring");
240 globals->finger[0].id=globals->id;
241 snprintf(globals->finger[0].host,1024,globals->host);
242 globals->finger[0].port=globals->port;
244 INFO2("→Known node %s:%d",other_host,other_port);
245 INFO0("→Contacting to determine successor");
247 temp_sock=gras_socket_client(other_host,other_port);
249 RETHROW0("Unable to contact known host!: %s");
251 get_suc_t get_suc_msg;get_suc_msg.id=globals->id;
253 gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),&get_suc_msg);
255 gras_socket_close(temp_sock);
256 RETHROW0("Unable to contact known host to get successor!: %s");
258 rep_suc_t rep_suc_msg;
260 INFO0("Waiting for reply!");
261 gras_msg_wait(6000,gras_msgtype_by_name("chord_rep_suc"),&temp_sock2,&rep_suc_msg);
263 RETHROW1("%s: Error waiting for successor:%s",globals->host);
265 globals->finger[0].id=rep_suc_msg.id;
266 snprintf(globals->finger[0].host,1024,rep_suc_msg.host);
267 globals->finger[0].port=rep_suc_msg.port;
268 INFO3("→Got successor : %d-%s:%d",globals->finger[0].id,globals->finger[0].host,globals->finger[0].port);
269 gras_socket_close(temp_sock);
272 gras_cb_register(gras_msgtype_by_name("chord_get_suc"),&node_cb_get_suc_handler);
273 // gras_cb_register(gras_msgtype_by_name("chord_ping"),&node_cb_ping_handler);
275 gras_msg_handle(60.0);
277 gras_socket_close(globals->sock);