Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update for chord example, indentation, cleaning out...
[simgrid.git] / examples / gras / p2p / chord / chord.c
1 /* 
2  * vim:ts=2:sw=2:expandtab
3  */
4
5 #include "xbt/sysdep.h"
6 #include "gras.h"
7
8 static int closest_preceding_node(int id);
9 static void check_predecessor(void);
10
11 XBT_LOG_NEW_DEFAULT_CATEGORY(chord,"Messages specific to this example");
12
13 typedef enum msg_typus{
14   PING,
15   PONG,
16   GET_PRE,
17   REP_PRE,
18   GET_SUC,
19   REP_SUC,
20   STD,
21 }msg_typus;
22
23 /*GRAS_DEFINE_TYPE(s_pbio,
24   struct s_pbio{
25     msg_typus type;
26     int dest;
27     char msg[1024];
28   };
29 );
30 typedef struct s_pbio pbio_t;*/
31
32 /*GRAS_DEFINE_TYPE(s_ping,*/
33   struct s_ping{
34     int id;
35   };
36 /*);*/
37 typedef struct s_ping ping_t;
38
39 /*GRAS_DEFINE_TYPE(s_pong,*/
40   struct s_pong{
41     int id;
42     int failed;
43   };
44 /*);*/
45 typedef struct s_pong pong_t;
46
47 GRAS_DEFINE_TYPE(s_notify,
48   struct s_notify{
49     int id;
50     char host[1024];
51     int port;
52   };
53 );
54 typedef struct s_notify notify_t;
55
56 GRAS_DEFINE_TYPE(s_get_suc,
57   struct s_get_suc{
58     int id;
59   };
60 );
61 typedef struct s_get_suc get_suc_t;
62
63 GRAS_DEFINE_TYPE(s_rep_suc,
64   struct s_rep_suc{
65     int id;
66     char host[1024];
67     int port;
68   };
69 );
70 typedef struct s_rep_suc rep_suc_t;
71
72 typedef struct finger_elem{
73   int id;
74   char host[1024];
75   int port;
76 }finger_elem;
77
78
79
80 static void register_messages(){
81 /*  gras_msgtype_declare("chord",gras_datadesc_by_symbol(s_pbio));*/
82   gras_msgtype_declare("chord_get_suc",gras_datadesc_by_symbol(s_get_suc));
83   gras_msgtype_declare("chord_rep_suc",gras_datadesc_by_symbol(s_rep_suc));
84   gras_msgtype_declare("chord_notify",gras_datadesc_by_symbol(s_notify));
85 }
86
87 /* Global private data */
88 typedef struct{
89   gras_socket_t sock; /* server socket on which I'm listening */
90   int id; /* my id number */
91   char host[1024]; /* my host name */
92   int port; /* port on which I'm listening FIXME */
93   int fingers; /* how many fingers */
94   finger_elem *finger; /* finger table */
95   int next_to_fix; /* next in the finger list to be checked */
96   int pre_id; /* predecessor id */
97   char pre_host[1024]; /* predecessor host */
98   int pre_port; /* predecessor port */
99 }node_data_t;
100
101
102 int node(int argc,char **argv);
103
104 /*static int node_cb_chord_handler(gras_socket_t expeditor,void *payload_data){
105   xbt_ex_t e;
106   pbio_t pbio_i=*(pbio_t*)payload_data;
107
108   node_data_t *globals=(node_data_t*)gras_userdata_get();
109
110   INFO4(">>> %d : received %d message from %s to %d <<<",globals->id,pbio_i.type,gras_socket_peer_name(expeditor),pbio_i.dest);
111
112
113
114 }*/
115
116 static int node_cb_get_suc_handler(gras_msg_cb_ctx_t ctx,void *payload_data){
117   gras_socket_t expeditor=gras_msg_cb_ctx_from(ctx);   
118   xbt_ex_t e;
119   get_suc_t incoming=*(get_suc_t*)payload_data;
120   rep_suc_t outgoing;
121   node_data_t *globals=(node_data_t*)gras_userdata_get();
122   INFO2("Received a get_successor message from %s for %d",
123         gras_socket_peer_name(expeditor),incoming.id);
124   if((globals->id==globals->finger[0].id)||
125      (incoming.id>globals->id&&incoming.id<=globals->finger[0].id)){
126     outgoing.id=globals->finger[0].id;
127     snprintf(outgoing.host,1024,globals->finger[0].host);
128     outgoing.port=globals->finger[0].port;
129     INFO0("My successor is his successor!");
130   }else{
131     gras_socket_t temp_sock;
132     int contact=closest_preceding_node(incoming.id);
133     if(contact==-1){
134       outgoing.id=globals->finger[0].id;
135       snprintf(outgoing.host,1024,globals->finger[0].host);
136       outgoing.port=globals->finger[0].port;
137       INFO0("My successor is his successor!");
138     }else{
139       get_suc_t asking;asking.id=incoming.id;
140       TRY{
141         temp_sock=gras_socket_client(globals->finger[contact].host,
142                                      globals->finger[contact].port);
143       }CATCH(e){
144         RETHROW0("Unable to connect!: %s");
145       }
146       TRY{
147         gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),&asking);
148       }CATCH(e){
149         RETHROW0("Unable to ask!: %s");
150       }
151       gras_msg_wait(10.,gras_msgtype_by_name("chord_rep_suc"),&temp_sock,
152                     &outgoing);
153     }
154   }
155   
156   TRY{
157     gras_msg_send(expeditor,gras_msgtype_by_name("chord_rep_suc"),&outgoing);
158     INFO0("Successor information sent!");
159   }CATCH(e){
160     RETHROW2("%s:Timeout sending successor information to %s: %s",
161              globals->host,gras_socket_peer_name(expeditor));
162   }
163   gras_socket_close(expeditor);
164   return(1);
165 }
166
167 static int closest_preceding_node(int id){
168   node_data_t *globals=(node_data_t*)gras_userdata_get();
169   int i;
170   for(i=globals->fingers-1;i>=0;i--){
171     if(globals->finger[i].id>globals->id&&globals->finger[i].id<id){
172       return(i);
173     }
174   }
175   
176   return i;
177 }
178
179 static int node_cb_notify_handler(gras_msg_cb_ctx_t ctx,void *payload_data){
180   gras_socket_t expeditor=gras_msg_cb_ctx_from(ctx);   
181   /*xbt_ex_t e;*/
182   notify_t incoming=*(notify_t*)payload_data;
183   node_data_t *globals=(node_data_t*)gras_userdata_get();
184   INFO2("Received a notifying message from %s as %d",
185         gras_socket_peer_name(expeditor),incoming.id);
186   if(globals->pre_id==-1||
187      (incoming.id>globals->pre_id&&incoming.id<globals->id)){
188     globals->pre_id=incoming.id;
189     snprintf(globals->pre_host,1024,incoming.host);
190     globals->pre_port=incoming.port;
191     INFO0("Set as my new predecessor!");
192   }
193   return(1);
194 }
195
196 static void fix_fingers(){
197   xbt_ex_t e;
198   gras_socket_t temp_sock=NULL;
199   gras_socket_t temp_sock2=NULL;
200   node_data_t *globals=(node_data_t*)gras_userdata_get();
201
202   TRY{
203     temp_sock=gras_socket_client(globals->host,globals->port);
204   }CATCH(e){
205     RETHROW0("Unable to contact known host: %s");
206   }
207   get_suc_t get_suc_msg;get_suc_msg.id=globals->id;
208   TRY{
209     gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),&get_suc_msg);
210   }CATCH(e){
211     gras_socket_close(temp_sock);
212     RETHROW0("Unable to contact known host to get successor!: %s");
213   }
214   rep_suc_t rep_suc_msg;
215   TRY{
216     INFO0("Waiting for reply!");
217     gras_msg_wait(6000,gras_msgtype_by_name("chord_rep_suc"),&temp_sock2,
218                   &rep_suc_msg);
219   }CATCH(e){
220     RETHROW1("%s: Error waiting for successor:%s",globals->host);
221   }
222   globals->finger[0].id=rep_suc_msg.id;
223   snprintf(globals->finger[0].host,1024,rep_suc_msg.host);
224   globals->finger[0].port=rep_suc_msg.port;
225   INFO1("→ Finger %d fixed!",globals->next_to_fix);
226   gras_socket_close(temp_sock);
227   
228   globals->next_to_fix=(++globals->next_to_fix==globals->fingers)?
229                        0:globals->next_to_fix;
230 }
231
232 static void check_predecessor(){
233   node_data_t *globals = (node_data_t*)gras_userdata_get();
234   gras_socket_t temp_sock;
235   xbt_ex_t e;
236   if (globals->pre_id == -1){
237     return;
238   }
239   TRY{
240     temp_sock = gras_socket_client( globals->pre_host, globals->pre_port );
241   }CATCH(e){
242     globals->pre_id = -1;
243     globals->pre_host[0] = 0;
244     globals->pre_port = 0;
245   }
246   ping_t ping;
247   pong_t pong;
248   ping.id = 0;
249   TRY{
250     gras_msg_send( temp_sock, gras_msgtype_by_name("chord_ping"),&ping);
251   }CATCH(e){
252     globals->pre_id = -1;
253     globals->pre_host[0] = 0;
254     globals->pre_port = 0;
255   }
256   TRY{
257     gras_msg_wait( 60, gras_msgtype_by_name("chord_pong"), &temp_sock, &pong);
258   }CATCH(e){
259     globals->pre_id = -1;
260     globals->pre_host[0] = 0;
261     globals->pre_port = 0;
262   }
263   gras_socket_close(temp_sock);
264 }
265
266 int node(int argc,char **argv){
267   node_data_t *globals=NULL;
268   gras_socket_t temp_sock=NULL;
269   gras_socket_t temp_sock2=NULL;
270
271   xbt_ex_t e;
272
273   int create=0;
274   int other_port=-1;
275   char *other_host;
276
277   /* 1. Init the GRAS infrastructure and declare my globals */
278   gras_init(&argc,argv);
279    
280   gras_os_sleep((15-gras_os_getpid())*20);
281    
282   globals=gras_userdata_new(node_data_t);
283
284   globals->id=atoi(argv[1]);
285   globals->port=atoi(argv[2]);
286   globals->fingers=0;
287   globals->finger=NULL;
288   globals->pre_id=-1;
289   globals->pre_host[0]=0;
290   globals->pre_port=-1;
291   
292   snprintf(globals->host,1024,gras_os_myname());
293
294   if(argc==3){
295     create=1;
296   }else{
297     asprintf(&other_host,"%s",argv[3]);
298     other_port=atoi(argv[4]);
299   }
300   
301   globals->sock=gras_socket_server(globals->port);
302   gras_os_sleep(1.0);
303
304   register_messages();
305   register_messages();
306
307   globals->finger=(finger_elem*)calloc(1,sizeof(finger_elem));
308   INFO2("Launching node %s:%d",globals->host,globals->port);
309   if(create){
310     INFO0("→Creating ring");
311     globals->finger[0].id=globals->id;
312     snprintf(globals->finger[0].host,1024,globals->host);
313     globals->finger[0].port=globals->port;
314   }else{
315     INFO2("→Known node %s:%d",other_host,other_port);
316     INFO0("→Contacting to determine successor");
317     TRY{
318       temp_sock=gras_socket_client(other_host,other_port);
319     }CATCH(e){
320       RETHROW0("Unable to contact known host: %s");
321     }
322     get_suc_t get_suc_msg;get_suc_msg.id=globals->id;
323     TRY{
324       gras_msg_send(temp_sock,gras_msgtype_by_name("chord_get_suc"),
325                     &get_suc_msg);
326     }CATCH(e){
327       gras_socket_close(temp_sock);
328       RETHROW0("Unable to contact known host to get successor!: %s");
329     }
330     rep_suc_t rep_suc_msg;
331     TRY{
332       INFO0("Waiting for reply!");
333       gras_msg_wait(10.,gras_msgtype_by_name("chord_rep_suc"),&temp_sock2,
334                     &rep_suc_msg);
335     }CATCH(e){
336       RETHROW1("%s: Error waiting for successor:%s",globals->host);
337     }
338     globals->finger[0].id=rep_suc_msg.id;
339     snprintf(globals->finger[0].host,1024,rep_suc_msg.host);
340     globals->finger[0].port=rep_suc_msg.port;
341     INFO3("→Got successor : %d-%s:%d",globals->finger[0].id,
342           globals->finger[0].host,globals->finger[0].port);
343     gras_socket_close(temp_sock);
344     TRY{
345       temp_sock=gras_socket_client(globals->finger[0].host,
346                                    globals->finger[0].port);
347     }CATCH(e){
348       RETHROW0("Unable to contact successor: %s");
349     }
350     notify_t notify_msg;
351     notify_msg.id=globals->id;
352     snprintf(notify_msg.host,1024,globals->host);
353     notify_msg.port=globals->port;
354     TRY{
355       gras_msg_send(temp_sock,gras_msgtype_by_name("chord_notify"),&notify_msg);
356     }CATCH(e){
357       RETHROW0("Unable to notify successor! %s");
358     }
359   }
360   
361   gras_cb_register(gras_msgtype_by_name("chord_get_suc"),
362                    &node_cb_get_suc_handler);
363   gras_cb_register(gras_msgtype_by_name("chord_notify"),
364                    &node_cb_notify_handler);
365   /*gras_cb_register(gras_msgtype_by_name("chord_ping"),&node_cb_ping_handler);*/
366  /* gras_timer_repeat(600.,fix_fingers);*/
367   /*while(1){*/
368   int l;
369   for(l=0;l<50;l++){
370     TRY{
371       gras_msg_handle(6000000.0);
372     }CATCH(e){
373     }
374   }
375   /*}*/
376   
377   gras_socket_close(globals->sock);
378   free(globals);
379   gras_exit();
380   INFO0("Done");
381   return(0);
382 }